From bb3d16cc23020b5988530e2d524cdb37965ad208 Mon Sep 17 00:00:00 2001 From: jorenn92 Date: Fri, 26 Jan 2024 17:42:10 +0100 Subject: [PATCH] fix(tasks): Improved task management by limiting the simultaneous execution of rule and collection handler tasks to one. Additionally, ensured that collection handling cannot occur concurrently with rule handling --- ...06275100801-Tasks_add_taskRunning_table.ts | 24 +++++++ .../collections/collection-worker.service.ts | 72 +++++++++---------- .../collections/collections.controller.ts | 6 +- .../modules/collections/collections.module.ts | 4 +- .../entities/collection.entities.ts | 2 +- .../entities/collection_log.entities.ts | 2 +- .../interfaces/collection-media.interface.ts | 2 +- .../tasks/collection-log-cleaner.service.ts | 47 +++++------- .../src/modules/rules/dtos/exclusion.dto.ts | 2 +- .../rules/entities/exclusion.entities.ts | 2 +- .../modules/rules/getter/getter.service.ts | 2 +- .../rules/getter/plex-getter.service.ts | 4 +- .../rules/helpers/rule.comparator.service.ts | 4 +- server/src/modules/rules/rules.controller.ts | 2 +- server/src/modules/rules/rules.service.ts | 4 +- .../rules/tasks/rule-executor.service.ts | 57 +++++++-------- .../rules/tasks/rule-maintenance.service.ts | 40 ++++------- .../tasks/entities/task_running.entities.ts | 20 ++++++ server/src/modules/tasks/task.base.ts | 56 +++++++++++++++ server/src/modules/tasks/tasks.module.ts | 4 +- server/src/modules/tasks/tasks.service.ts | 63 ++++++++++++++++ 21 files changed, 272 insertions(+), 147 deletions(-) create mode 100644 server/src/database/migrations/1706275100801-Tasks_add_taskRunning_table.ts create mode 100644 server/src/modules/tasks/entities/task_running.entities.ts create mode 100644 server/src/modules/tasks/task.base.ts diff --git a/server/src/database/migrations/1706275100801-Tasks_add_taskRunning_table.ts b/server/src/database/migrations/1706275100801-Tasks_add_taskRunning_table.ts new file mode 100644 index 00000000..45f35830 --- /dev/null +++ b/server/src/database/migrations/1706275100801-Tasks_add_taskRunning_table.ts @@ -0,0 +1,24 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class TasksAddTaskRunningTable1706275100801 + implements MigrationInterface +{ + name = 'TasksAddTaskRunningTable1706275100801'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "task_running" ( + "id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, + "name" varchar NOT NULL, + "runningSince" datetime DEFAULT NULL, + "running" boolean NOT NULL DEFAULT (0) + ) + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + DROP TABLE "task_running" + `); + } +} diff --git a/server/src/modules/collections/collection-worker.service.ts b/server/src/modules/collections/collection-worker.service.ts index 9349e446..538fe86c 100644 --- a/server/src/modules/collections/collection-worker.service.ts +++ b/server/src/modules/collections/collection-worker.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { OverseerrApiService } from '../api/overseerr-api/overseerr-api.service'; @@ -14,12 +14,15 @@ import { ServarrAction } from './interfaces/collection.interface'; import { PlexMetadata } from '../api/plex-api/interfaces/media.interface'; import { EPlexDataType } from '../api/plex-api/enums/plex-data-type-enum'; import { TmdbIdService } from '../api/tmdb-api/tmdb-id.service'; -import cacheManager from '../api/lib/cache'; +import { TaskBase } from '../tasks/task.base'; @Injectable() -export class CollectionWorkerService implements OnApplicationBootstrap { - private readonly logger = new Logger(CollectionWorkerService.name); - private jobCreationAttempts = 0; +export class CollectionWorkerService extends TaskBase { + protected logger = new Logger(CollectionWorkerService.name); + + protected name = 'Collection Handler'; + protected cronSchedule = ''; // overriden in onBootstrapHook + constructor( @InjectRepository(Collection) private readonly collectionRepo: Repository, @@ -30,50 +33,38 @@ export class CollectionWorkerService implements OnApplicationBootstrap { private readonly overseerrApi: OverseerrApiService, private readonly servarrApi: ServarrService, private readonly tmdbApi: TmdbApiService, - private readonly taskService: TasksService, + protected readonly taskService: TasksService, private readonly settings: SettingsService, private readonly tmdbIdService: TmdbIdService, private readonly tmdbIdHelper: TmdbIdService, - ) {} - - onApplicationBootstrap() { - this.jobCreationAttempts++; - const state = this.taskService.createJob( - 'Collection Handler', - this.settings.collection_handler_job_cron, - this.handle.bind(this), - ); - if (state.code === 0) { - if (this.jobCreationAttempts <= 3) { - this.logger.log( - 'Creation of job Collection Handler failed. Retrying in 10s..', - ); - setTimeout(() => { - this.onApplicationBootstrap(); - }, 10000); - } else { - this.logger.error(`Creation of job Collection Handler failed.`); - } - } + ) { + super(taskService); } - public updateJob(cron: string) { - return this.taskService.updateJob( - 'Collection Handler', - cron, - this.handle.bind(this), - ); + protected onBootstrapHook(): void { + this.cronSchedule = this.settings.collection_handler_job_cron; } - public async handle() { - const appStatus = await this.settings.testConnections(); - - // reset API caches, make sure latest data is used - for (const [key, value] of Object.entries(cacheManager.getAllCaches())) { - value.flush(); + public async execute() { + // check if another instance of this task is already running + if (await this.isRunning()) { + this.logger.log( + `Another instance of the ${this.name} task is currently running. Skipping this execution`, + ); + return; } - this.logger.log('Start handling all collections.'); + await super.execute(); + + // wait 5 seconds to make sure we're not executing together with the rule handler + setTimeout(() => {}, 5000); + // if we are, then wait.. + await this.taskService.waitUntilTaskIsFinished('Rule Handler', this.name); + + // Start actual task + const appStatus = await this.settings.testConnections(); + + this.logger.log('Start handling all collections'); let handledCollections = 0; if (appStatus) { // loop over all active collections @@ -123,6 +114,7 @@ export class CollectionWorkerService implements OnApplicationBootstrap { 'Not all applications are reachable.. Skipping collection handling', ); } + this.finish(); } private async handleMedia(collection: Collection, media: CollectionMedia) { diff --git a/server/src/modules/collections/collections.controller.ts b/server/src/modules/collections/collections.controller.ts index 497e7adb..ba4c0e97 100644 --- a/server/src/modules/collections/collections.controller.ts +++ b/server/src/modules/collections/collections.controller.ts @@ -15,7 +15,7 @@ import { AddCollectionMedia, IAlterableMediaDto, } from './interfaces/collection-media.interface'; -import { ECollectionLogType } from 'src/modules/collections/entities/collection_log.entities'; +import { ECollectionLogType } from '../collections/entities/collection_log.entities'; @Controller('api/collections') export class CollectionsController { @@ -63,8 +63,8 @@ export class CollectionsController { } @Post('/handle') - handleCollection(@Body() request: any) { - return this.collectionWorkerService.handle(); + handleCollection() { + return this.collectionWorkerService.execute(); } @Put('/schedule/update') diff --git a/server/src/modules/collections/collections.module.ts b/server/src/modules/collections/collections.module.ts index 982bf6a2..e6093e86 100644 --- a/server/src/modules/collections/collections.module.ts +++ b/server/src/modules/collections/collections.module.ts @@ -12,8 +12,8 @@ import { ServarrApiModule } from '../api/servarr-api/servarr-api.module'; import { RuleGroup } from '../rules/entities/rule-group.entities'; import { TasksModule } from '../tasks/tasks.module'; import { Exclusion } from '../rules/entities/exclusion.entities'; -import { CollectionLog } from 'src/modules/collections/entities/collection_log.entities'; -import { CollectionLogCleanerService } from 'src/modules/collections/tasks/collection-log-cleaner.service'; +import { CollectionLog } from '../collections/entities/collection_log.entities'; +import { CollectionLogCleanerService } from '../collections/tasks/collection-log-cleaner.service'; @Module({ imports: [ diff --git a/server/src/modules/collections/entities/collection.entities.ts b/server/src/modules/collections/entities/collection.entities.ts index 970aae21..3bab8461 100644 --- a/server/src/modules/collections/entities/collection.entities.ts +++ b/server/src/modules/collections/entities/collection.entities.ts @@ -9,7 +9,7 @@ import { } from 'typeorm'; import { CollectionMedia } from './collection_media.entities'; import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; -import { CollectionLog } from 'src/modules/collections/entities/collection_log.entities'; +import { CollectionLog } from '../../collections/entities/collection_log.entities'; @Entity() export class Collection { diff --git a/server/src/modules/collections/entities/collection_log.entities.ts b/server/src/modules/collections/entities/collection_log.entities.ts index 25aa1b8c..daed305d 100644 --- a/server/src/modules/collections/entities/collection_log.entities.ts +++ b/server/src/modules/collections/entities/collection_log.entities.ts @@ -1,4 +1,4 @@ -import { Collection } from 'src/modules/collections/entities/collection.entities'; +import { Collection } from '../../collections/entities/collection.entities'; import { Column, Entity, diff --git a/server/src/modules/collections/interfaces/collection-media.interface.ts b/server/src/modules/collections/interfaces/collection-media.interface.ts index 3da9d200..390868f3 100644 --- a/server/src/modules/collections/interfaces/collection-media.interface.ts +++ b/server/src/modules/collections/interfaces/collection-media.interface.ts @@ -1,4 +1,4 @@ -import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum'; +import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; export interface ICollectionMedia { id: number; diff --git a/server/src/modules/collections/tasks/collection-log-cleaner.service.ts b/server/src/modules/collections/tasks/collection-log-cleaner.service.ts index eeb5dd3b..e8864b4a 100644 --- a/server/src/modules/collections/tasks/collection-log-cleaner.service.ts +++ b/server/src/modules/collections/tasks/collection-log-cleaner.service.ts @@ -1,40 +1,27 @@ -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; -import { CollectionsService } from 'src/modules/collections/collections.service'; -import { TasksService } from 'src/modules/tasks/tasks.service'; +import { Injectable, Logger } from '@nestjs/common'; +import { CollectionsService } from '../../collections/collections.service'; +import { TaskBase } from '../../tasks/task.base'; +import { TasksService } from '../..//tasks/tasks.service'; @Injectable() -export class CollectionLogCleanerService implements OnApplicationBootstrap { - private readonly logger = new Logger(CollectionLogCleanerService.name); - private jobCreationAttempts = 0; +export class CollectionLogCleanerService extends TaskBase { + protected logger = new Logger(CollectionLogCleanerService.name); + + protected name = 'Collection Log Cleaner'; + protected cronSchedule = '45 5 * * *'; constructor( private readonly collectionService: CollectionsService, - private readonly taskService: TasksService, - ) {} - - onApplicationBootstrap() { - this.jobCreationAttempts++; - const state = this.taskService.createJob( - 'Collection Log Cleaner', - '45 5 * * *', - this.execute.bind(this), - ); - if (state.code === 0) { - if (this.jobCreationAttempts <= 3) { - this.logger.log( - 'Creation of job Collection Log Cleaner failed. Retrying in 10s..', - ); - setTimeout(() => { - this.onApplicationBootstrap(); - }, 10000); - } else { - this.logger.error(`Creation of job Collection Log Cleaner failed.`); - } - } + protected readonly taskService: TasksService, + ) { + super(taskService); } public async execute() { try { + await super.execute(); + + // start execution // get all collections const collections = await this.collectionService.getAllCollections(); @@ -42,8 +29,12 @@ export class CollectionLogCleanerService implements OnApplicationBootstrap { for (const collection of collections) { this.collectionService.removeOldCollectionLogs(collection); } + + // clean up + this.finish(); } catch (e) { this.logger.debug(e); + this.finish(); } } } diff --git a/server/src/modules/rules/dtos/exclusion.dto.ts b/server/src/modules/rules/dtos/exclusion.dto.ts index 190ea13c..61c41bb6 100644 --- a/server/src/modules/rules/dtos/exclusion.dto.ts +++ b/server/src/modules/rules/dtos/exclusion.dto.ts @@ -1,4 +1,4 @@ -import { IAlterableMediaDto } from 'src/modules/collections/interfaces/collection-media.interface'; +import { IAlterableMediaDto } from '../../collections/interfaces/collection-media.interface'; export class ExclusionDto { plexId: number; diff --git a/server/src/modules/rules/entities/exclusion.entities.ts b/server/src/modules/rules/entities/exclusion.entities.ts index 644a35f5..47c92691 100644 --- a/server/src/modules/rules/entities/exclusion.entities.ts +++ b/server/src/modules/rules/entities/exclusion.entities.ts @@ -1,4 +1,4 @@ -import { PlexMetadata } from 'src/modules/api/plex-api/interfaces/media.interface'; +import { PlexMetadata } from '../../api/plex-api/interfaces/media.interface'; import { Entity, Column, PrimaryGeneratedColumn } from 'typeorm'; @Entity() diff --git a/server/src/modules/rules/getter/getter.service.ts b/server/src/modules/rules/getter/getter.service.ts index aa0e02d1..21cf27cb 100644 --- a/server/src/modules/rules/getter/getter.service.ts +++ b/server/src/modules/rules/getter/getter.service.ts @@ -6,7 +6,7 @@ import { PlexGetterService } from './plex-getter.service'; import { RadarrGetterService } from './radarr-getter.service'; import { SonarrGetterService } from './sonarr-getter.service'; import { RulesDto } from '../dtos/rules.dto'; -import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum'; +import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; @Injectable() export class ValueGetterService { diff --git a/server/src/modules/rules/getter/plex-getter.service.ts b/server/src/modules/rules/getter/plex-getter.service.ts index f05b9389..f7ec2311 100644 --- a/server/src/modules/rules/getter/plex-getter.service.ts +++ b/server/src/modules/rules/getter/plex-getter.service.ts @@ -11,8 +11,8 @@ import { RuleConstants, } from '../constants/rules.constants'; import { RulesDto } from '../dtos/rules.dto'; -import { PlexMetadata } from 'src/modules/api/plex-api/interfaces/media.interface'; -import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum'; +import { PlexMetadata } from '../../api/plex-api/interfaces/media.interface'; +import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; @Injectable() export class PlexGetterService { diff --git a/server/src/modules/rules/helpers/rule.comparator.service.ts b/server/src/modules/rules/helpers/rule.comparator.service.ts index 65143c30..0c93abbb 100644 --- a/server/src/modules/rules/helpers/rule.comparator.service.ts +++ b/server/src/modules/rules/helpers/rule.comparator.service.ts @@ -6,10 +6,10 @@ import { } from '../constants/rules.constants'; import { RuleDto } from '../dtos/rule.dto'; import _ from 'lodash'; -import { PlexLibraryItem } from 'src/modules/api/plex-api/interfaces/library.interfaces'; +import { PlexLibraryItem } from '../../api/plex-api/interfaces/library.interfaces'; import { RulesDto } from '../dtos/rules.dto'; import { ValueGetterService } from '../getter/getter.service'; -import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum'; +import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; import { RuleDbDto } from '../dtos/ruleDb.dto'; import { RuleConstanstService } from '../constants/constants.service'; diff --git a/server/src/modules/rules/rules.controller.ts b/server/src/modules/rules/rules.controller.ts index fe608e6b..1f07011b 100644 --- a/server/src/modules/rules/rules.controller.ts +++ b/server/src/modules/rules/rules.controller.ts @@ -75,7 +75,7 @@ export class RulesController { } @Post('/execute') executeRules() { - this.ruleExecutorService.executeAllRules(); + this.ruleExecutorService.execute(); } @Post() async setRules(@Body() body: RulesDto): Promise { diff --git a/server/src/modules/rules/rules.service.ts b/server/src/modules/rules/rules.service.ts index e94fe53b..3c5b9fa5 100644 --- a/server/src/modules/rules/rules.service.ts +++ b/server/src/modules/rules/rules.service.ts @@ -28,8 +28,8 @@ import { AddCollectionMedia } from '../collections/interfaces/collection-media.i import { RuleYamlService } from './helpers/yaml.service'; import { RuleComparatorService } from './helpers/rule.comparator.service'; import { PlexLibraryItem } from '../api/plex-api/interfaces/library.interfaces'; -import { ECollectionLogType } from 'src/modules/collections/entities/collection_log.entities'; -import cacheManager from 'src/modules/api/lib/cache'; +import { ECollectionLogType } from '../collections/entities/collection_log.entities'; +import cacheManager from '../api/lib/cache'; export interface ReturnStatus { code: 0 | 1; diff --git a/server/src/modules/rules/tasks/rule-executor.service.ts b/server/src/modules/rules/tasks/rule-executor.service.ts index 371ff281..331b089b 100644 --- a/server/src/modules/rules/tasks/rule-executor.service.ts +++ b/server/src/modules/rules/tasks/rule-executor.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import _ from 'lodash'; import { PlexLibraryItem } from '../../api/plex-api/interfaces/library.interfaces'; import { PlexApiService } from '../../api/plex-api/plex-api.service'; @@ -14,7 +14,8 @@ import { RulesService } from '../rules.service'; import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum'; import cacheManager from '../../api/lib/cache'; import { RuleComparatorService } from '../helpers/rule.comparator.service'; -import { Collection } from 'src/modules/collections/entities/collection.entities'; +import { Collection } from '../../collections/entities/collection.entities'; +import { TaskBase } from '../../tasks/task.base'; interface PlexData { page: number; @@ -23,9 +24,11 @@ interface PlexData { } @Injectable() -export class RuleExecutorService implements OnApplicationBootstrap { - private readonly logger = new Logger(RuleExecutorService.name); - private jobCreationAttempts = 0; +export class RuleExecutorService extends TaskBase { + protected logger = new Logger(RuleExecutorService.name); + + protected name = 'Rule Handler'; + protected cronSchedule = ''; // overriden in onBootstrapHook ruleConstants: RuleConstants; userId: string; @@ -39,44 +42,30 @@ export class RuleExecutorService implements OnApplicationBootstrap { private readonly rulesService: RulesService, private readonly plexApi: PlexApiService, private readonly collectionService: CollectionsService, - private readonly taskService: TasksService, + protected readonly taskService: TasksService, private readonly settings: SettingsService, private readonly comparator: RuleComparatorService, ) { + super(taskService); this.ruleConstants = new RuleConstants(); this.plexData = { page: 1, finished: false, data: [] }; } - onApplicationBootstrap() { - this.jobCreationAttempts++; - const state = this.taskService.createJob( - 'Rule Handler', - this.settings.rules_handler_job_cron, - this.executeAllRules.bind(this), - ); - if (state.code === 0) { - if (this.jobCreationAttempts <= 3) { - this.logger.log( - 'Creation of job Rule Handler failed. Retrying in 10s..', - ); - setTimeout(() => { - this.onApplicationBootstrap(); - }, 10000); - } else { - this.logger.error(`Creation of job Rule Handler failed`); - } - } + protected onBootstrapHook(): void { + this.cronSchedule = this.settings.rules_handler_job_cron; } - public updateJob(cron: string) { - return this.taskService.updateJob( - 'Rule Handler', - cron, - this.executeAllRules.bind(this), - ); - } + public async execute() { + // check if another instance of this task is already running + if (await this.isRunning()) { + this.logger.log( + `Another instance of the ${this.name} task is currently running. Skipping this execution`, + ); + return; + } + + await super.execute(); - public async executeAllRules() { this.logger.log('Starting Execution of all active rules'); const appStatus = await this.settings.testConnections(); @@ -131,6 +120,8 @@ export class RuleExecutorService implements OnApplicationBootstrap { 'Not all applications are reachable.. Skipped rule execution.', ); } + // clean up + this.finish(); } private async syncManualPlexMediaToCollectionDB(rulegroup: RuleGroup) { diff --git a/server/src/modules/rules/tasks/rule-maintenance.service.ts b/server/src/modules/rules/tasks/rule-maintenance.service.ts index 42ed9aa3..3adb8089 100644 --- a/server/src/modules/rules/tasks/rule-maintenance.service.ts +++ b/server/src/modules/rules/tasks/rule-maintenance.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { TasksService } from '../../tasks/tasks.service'; import { SettingsService } from '../../settings/settings.service'; import { RulesService } from '../rules.service'; @@ -6,43 +6,28 @@ import { PlexApiService } from '../../api/plex-api/plex-api.service'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { Collection } from '../../collections/entities/collection.entities'; +import { TaskBase } from '../../tasks/task.base'; @Injectable() -export class RuleMaintenanceService implements OnApplicationBootstrap { - private readonly logger = new Logger(RuleMaintenanceService.name); - private jobCreationAttempts = 0; +export class RuleMaintenanceService extends TaskBase { + protected logger = new Logger(RuleMaintenanceService.name); + + protected name = 'Rule Maintenance'; + protected cronSchedule = '20 4 * * *'; constructor( - private readonly taskService: TasksService, + protected readonly taskService: TasksService, private readonly settings: SettingsService, private readonly rulesService: RulesService, @InjectRepository(Collection) private readonly collectionRepo: Repository, private readonly plexApi: PlexApiService, - ) {} - - onApplicationBootstrap() { - this.jobCreationAttempts++; - const state = this.taskService.createJob( - 'Rule Maintenance', - '20 4 * * *', - this.execute.bind(this), - ); - if (state.code === 0) { - if (this.jobCreationAttempts <= 3) { - this.logger.log( - 'Creation of job Rule Maintenance failed. Retrying in 10s..', - ); - setTimeout(() => { - this.onApplicationBootstrap(); - }, 10000); - } else { - this.logger.error(`Creation of job Rule Maintenance failed.`); - } - } + ) { + super(taskService); } - private async execute() { + public async execute() { + await super.execute(); try { this.logger.log('Starting maintenance'); const appStatus = await this.settings.testConnections(); @@ -60,6 +45,7 @@ export class RuleMaintenanceService implements OnApplicationBootstrap { } catch (e) { this.logger.error(`Rule Maintenance failed : ${e.message}`); } + this.finish(); } private async removeLeftoverExclusions() { diff --git a/server/src/modules/tasks/entities/task_running.entities.ts b/server/src/modules/tasks/entities/task_running.entities.ts new file mode 100644 index 00000000..f1ff8341 --- /dev/null +++ b/server/src/modules/tasks/entities/task_running.entities.ts @@ -0,0 +1,20 @@ +import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm'; + +@Entity() +export class TaskRunning { + @PrimaryGeneratedColumn() + id: number; + + @Column({ nullable: false }) + name: string; + + @Column({ + type: 'datetime', + nullable: true, + default: () => 'CURRENT_TIMESTAMP', + }) + runningSince: Date; + + @Column({ nullable: false, default: false }) + running: boolean; +} diff --git a/server/src/modules/tasks/task.base.ts b/server/src/modules/tasks/task.base.ts new file mode 100644 index 00000000..71eba651 --- /dev/null +++ b/server/src/modules/tasks/task.base.ts @@ -0,0 +1,56 @@ +import { Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { TasksService } from './tasks.service'; + +export class TaskBase implements OnApplicationBootstrap { + protected logger = new Logger(TaskBase.name); + private jobCreationAttempts = 0; + protected name = ''; + protected cronSchedule = ''; + + constructor(protected readonly taskService: TasksService) {} + + onApplicationBootstrap() { + this.jobCreationAttempts++; + this.onBootstrapHook(); + const state = this.taskService.createJob( + this.name, + this.cronSchedule, + this.execute.bind(this), + ); + if (state.code === 0) { + if (this.jobCreationAttempts <= 3) { + this.logger.log( + `Creation of ${this.name} task failed. Retrying in 10s..`, + ); + setTimeout(() => { + this.onApplicationBootstrap(); + }, 10000); + } else { + this.logger.error(`Creation of ${this.name} task failed.`); + } + } + } + + // implement this on subclasses to do things in onApplicationBootstrap + protected onBootstrapHook() {} + + public async execute() { + await this.prepare(); + } + + protected prepare = async () => { + await this.taskService.setRunning(this.name); + }; + + protected finish = async () => { + await this.taskService.clearRunning(this.name); + }; + + public updateJob(cron: string) { + return this.taskService.updateJob(this.name, cron, this.execute.bind(this)); + } + + protected async isRunning() { + return await this.taskService.isRunning(this.name); + } +} diff --git a/server/src/modules/tasks/tasks.module.ts b/server/src/modules/tasks/tasks.module.ts index 68a03eee..61ae2124 100644 --- a/server/src/modules/tasks/tasks.module.ts +++ b/server/src/modules/tasks/tasks.module.ts @@ -2,9 +2,11 @@ import { Module } from '@nestjs/common'; import { ScheduleModule } from '@nestjs/schedule'; import { StatusService } from './status.service'; import { TasksService } from './tasks.service'; +import { TaskRunning } from '../tasks/entities/task_running.entities'; +import { TypeOrmModule } from '@nestjs/typeorm'; @Module({ - imports: [ScheduleModule.forRoot()], + imports: [ScheduleModule.forRoot(), TypeOrmModule.forFeature([TaskRunning])], providers: [TasksService, StatusService], exports: [TasksService], }) diff --git a/server/src/modules/tasks/tasks.service.ts b/server/src/modules/tasks/tasks.service.ts index 24fbcb26..13ac512c 100644 --- a/server/src/modules/tasks/tasks.service.ts +++ b/server/src/modules/tasks/tasks.service.ts @@ -4,6 +4,9 @@ import { CronJob } from 'cron'; import { Status } from './interfaces/status.interface'; import { TaskScheduler } from './interfaces/task-scheduler.interface'; import { StatusService } from './status.service'; +import { InjectRepository } from '@nestjs/typeorm'; +import { TaskRunning } from '../tasks/entities/task_running.entities'; +import { Repository } from 'typeorm'; @Injectable() export class TasksService implements TaskScheduler { @@ -12,6 +15,8 @@ export class TasksService implements TaskScheduler { constructor( private schedulerRegistry: SchedulerRegistry, private readonly status: StatusService, + @InjectRepository(TaskRunning) + private readonly taskRunningRepo: Repository, ) {} public createJob( @@ -27,6 +32,16 @@ export class TasksService implements TaskScheduler { this.schedulerRegistry.addCronJob(name, job); job.start(); + // create database running entry + this.taskRunningRepo.findOne({ where: { name: name } }).then((resp) => { + this.taskRunningRepo.save({ + id: resp ? resp.id : null, + name: name, + running: false, + runningSince: null, + }); + }); + this.logger.log(`Task ${name} created successfully`); return this.status.createStatus( true, @@ -91,4 +106,52 @@ export class TasksService implements TaskScheduler { ); } } + + public async setRunning(name: string) { + const resp = await this.taskRunningRepo.findOne({ where: { name: name } }); + if (resp) { + await this.taskRunningRepo.update( + { id: resp.id }, + { + running: true, + runningSince: new Date(), + }, + ); + } + } + + public async isRunning(name: string) { + const resp = await this.taskRunningRepo.findOne({ where: { name: name } }); + return resp.running; + } + + public async clearRunning(name: string) { + const resp = await this.taskRunningRepo.findOne({ where: { name: name } }); + if (resp) { + await this.taskRunningRepo.update( + { id: resp.id }, + { + running: false, + runningSince: null, + }, + ); + } + } + + public async waitUntilTaskIsFinished( + name: string, + myname: string = undefined, + ) { + let task = await this.taskRunningRepo.findOne({ where: { name: name } }); + + if (task && task.running) { + this.logger.log( + `${myname ? `Task ${myname} is waiting` : `Waiting`} for task ${name} to finish...`, + ); + while (task.running) { + await new Promise((resolve) => setTimeout(resolve, 10000)); + task = await this.taskRunningRepo.findOne({ where: { name: name } }); + } + } + } }