Skip to content

Commit

Permalink
Merge pull request #803 from jorenn92/fix-stop-tasks-running-simultan…
Browse files Browse the repository at this point in the history
…eous

fix(tasks): Improved task management by limiting the simultaneous exe…
  • Loading branch information
jorenn92 authored Jan 26, 2024
2 parents c1b6f8a + bb3d16c commit b721d20
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class TasksAddTaskRunningTable1706275100801
implements MigrationInterface
{
name = 'TasksAddTaskRunningTable1706275100801';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "task_running" (
"id" integer PRIMARY KEY AUTOINCREMENT NOT NULL,
"name" varchar NOT NULL,
"runningSince" datetime DEFAULT NULL,
"running" boolean NOT NULL DEFAULT (0)
)
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
DROP TABLE "task_running"
`);
}
}
72 changes: 32 additions & 40 deletions server/src/modules/collections/collection-worker.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { OverseerrApiService } from '../api/overseerr-api/overseerr-api.service';
Expand All @@ -14,12 +14,15 @@ import { ServarrAction } from './interfaces/collection.interface';
import { PlexMetadata } from '../api/plex-api/interfaces/media.interface';
import { EPlexDataType } from '../api/plex-api/enums/plex-data-type-enum';
import { TmdbIdService } from '../api/tmdb-api/tmdb-id.service';
import cacheManager from '../api/lib/cache';
import { TaskBase } from '../tasks/task.base';

@Injectable()
export class CollectionWorkerService implements OnApplicationBootstrap {
private readonly logger = new Logger(CollectionWorkerService.name);
private jobCreationAttempts = 0;
export class CollectionWorkerService extends TaskBase {
protected logger = new Logger(CollectionWorkerService.name);

protected name = 'Collection Handler';
protected cronSchedule = ''; // overriden in onBootstrapHook

constructor(
@InjectRepository(Collection)
private readonly collectionRepo: Repository<Collection>,
Expand All @@ -30,50 +33,38 @@ export class CollectionWorkerService implements OnApplicationBootstrap {
private readonly overseerrApi: OverseerrApiService,
private readonly servarrApi: ServarrService,
private readonly tmdbApi: TmdbApiService,
private readonly taskService: TasksService,
protected readonly taskService: TasksService,
private readonly settings: SettingsService,
private readonly tmdbIdService: TmdbIdService,
private readonly tmdbIdHelper: TmdbIdService,
) {}

onApplicationBootstrap() {
this.jobCreationAttempts++;
const state = this.taskService.createJob(
'Collection Handler',
this.settings.collection_handler_job_cron,
this.handle.bind(this),
);
if (state.code === 0) {
if (this.jobCreationAttempts <= 3) {
this.logger.log(
'Creation of job Collection Handler failed. Retrying in 10s..',
);
setTimeout(() => {
this.onApplicationBootstrap();
}, 10000);
} else {
this.logger.error(`Creation of job Collection Handler failed.`);
}
}
) {
super(taskService);
}

public updateJob(cron: string) {
return this.taskService.updateJob(
'Collection Handler',
cron,
this.handle.bind(this),
);
protected onBootstrapHook(): void {
this.cronSchedule = this.settings.collection_handler_job_cron;
}

public async handle() {
const appStatus = await this.settings.testConnections();

// reset API caches, make sure latest data is used
for (const [key, value] of Object.entries(cacheManager.getAllCaches())) {
value.flush();
public async execute() {
// check if another instance of this task is already running
if (await this.isRunning()) {
this.logger.log(
`Another instance of the ${this.name} task is currently running. Skipping this execution`,
);
return;
}

this.logger.log('Start handling all collections.');
await super.execute();

// wait 5 seconds to make sure we're not executing together with the rule handler
setTimeout(() => {}, 5000);
// if we are, then wait..
await this.taskService.waitUntilTaskIsFinished('Rule Handler', this.name);

// Start actual task
const appStatus = await this.settings.testConnections();

this.logger.log('Start handling all collections');
let handledCollections = 0;
if (appStatus) {
// loop over all active collections
Expand Down Expand Up @@ -123,6 +114,7 @@ export class CollectionWorkerService implements OnApplicationBootstrap {
'Not all applications are reachable.. Skipping collection handling',
);
}
this.finish();
}

private async handleMedia(collection: Collection, media: CollectionMedia) {
Expand Down
6 changes: 3 additions & 3 deletions server/src/modules/collections/collections.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
AddCollectionMedia,
IAlterableMediaDto,
} from './interfaces/collection-media.interface';
import { ECollectionLogType } from 'src/modules/collections/entities/collection_log.entities';
import { ECollectionLogType } from '../collections/entities/collection_log.entities';

@Controller('api/collections')
export class CollectionsController {
Expand Down Expand Up @@ -63,8 +63,8 @@ export class CollectionsController {
}

@Post('/handle')
handleCollection(@Body() request: any) {
return this.collectionWorkerService.handle();
handleCollection() {
return this.collectionWorkerService.execute();
}

@Put('/schedule/update')
Expand Down
4 changes: 2 additions & 2 deletions server/src/modules/collections/collections.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ServarrApiModule } from '../api/servarr-api/servarr-api.module';
import { RuleGroup } from '../rules/entities/rule-group.entities';
import { TasksModule } from '../tasks/tasks.module';
import { Exclusion } from '../rules/entities/exclusion.entities';
import { CollectionLog } from 'src/modules/collections/entities/collection_log.entities';
import { CollectionLogCleanerService } from 'src/modules/collections/tasks/collection-log-cleaner.service';
import { CollectionLog } from '../collections/entities/collection_log.entities';
import { CollectionLogCleanerService } from '../collections/tasks/collection-log-cleaner.service';

@Module({
imports: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from 'typeorm';
import { CollectionMedia } from './collection_media.entities';
import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum';
import { CollectionLog } from 'src/modules/collections/entities/collection_log.entities';
import { CollectionLog } from '../../collections/entities/collection_log.entities';

@Entity()
export class Collection {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Collection } from 'src/modules/collections/entities/collection.entities';
import { Collection } from '../../collections/entities/collection.entities';
import {
Column,
Entity,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum';
import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum';

export interface ICollectionMedia {
id: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,40 @@
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { CollectionsService } from 'src/modules/collections/collections.service';
import { TasksService } from 'src/modules/tasks/tasks.service';
import { Injectable, Logger } from '@nestjs/common';
import { CollectionsService } from '../../collections/collections.service';
import { TaskBase } from '../../tasks/task.base';
import { TasksService } from '../..//tasks/tasks.service';

@Injectable()
export class CollectionLogCleanerService implements OnApplicationBootstrap {
private readonly logger = new Logger(CollectionLogCleanerService.name);
private jobCreationAttempts = 0;
export class CollectionLogCleanerService extends TaskBase {
protected logger = new Logger(CollectionLogCleanerService.name);

protected name = 'Collection Log Cleaner';
protected cronSchedule = '45 5 * * *';

constructor(
private readonly collectionService: CollectionsService,
private readonly taskService: TasksService,
) {}

onApplicationBootstrap() {
this.jobCreationAttempts++;
const state = this.taskService.createJob(
'Collection Log Cleaner',
'45 5 * * *',
this.execute.bind(this),
);
if (state.code === 0) {
if (this.jobCreationAttempts <= 3) {
this.logger.log(
'Creation of job Collection Log Cleaner failed. Retrying in 10s..',
);
setTimeout(() => {
this.onApplicationBootstrap();
}, 10000);
} else {
this.logger.error(`Creation of job Collection Log Cleaner failed.`);
}
}
protected readonly taskService: TasksService,
) {
super(taskService);
}

public async execute() {
try {
await super.execute();

// start execution
// get all collections
const collections = await this.collectionService.getAllCollections();

// for each collection
for (const collection of collections) {
this.collectionService.removeOldCollectionLogs(collection);
}

// clean up
this.finish();
} catch (e) {
this.logger.debug(e);
this.finish();
}
}
}
2 changes: 1 addition & 1 deletion server/src/modules/rules/dtos/exclusion.dto.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IAlterableMediaDto } from 'src/modules/collections/interfaces/collection-media.interface';
import { IAlterableMediaDto } from '../../collections/interfaces/collection-media.interface';

export class ExclusionDto {
plexId: number;
Expand Down
2 changes: 1 addition & 1 deletion server/src/modules/rules/entities/exclusion.entities.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PlexMetadata } from 'src/modules/api/plex-api/interfaces/media.interface';
import { PlexMetadata } from '../../api/plex-api/interfaces/media.interface';
import { Entity, Column, PrimaryGeneratedColumn } from 'typeorm';

@Entity()
Expand Down
2 changes: 1 addition & 1 deletion server/src/modules/rules/getter/getter.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { PlexGetterService } from './plex-getter.service';
import { RadarrGetterService } from './radarr-getter.service';
import { SonarrGetterService } from './sonarr-getter.service';
import { RulesDto } from '../dtos/rules.dto';
import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum';
import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum';

@Injectable()
export class ValueGetterService {
Expand Down
4 changes: 2 additions & 2 deletions server/src/modules/rules/getter/plex-getter.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import {
RuleConstants,
} from '../constants/rules.constants';
import { RulesDto } from '../dtos/rules.dto';
import { PlexMetadata } from 'src/modules/api/plex-api/interfaces/media.interface';
import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum';
import { PlexMetadata } from '../../api/plex-api/interfaces/media.interface';
import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum';

@Injectable()
export class PlexGetterService {
Expand Down
4 changes: 2 additions & 2 deletions server/src/modules/rules/helpers/rule.comparator.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import {
} from '../constants/rules.constants';
import { RuleDto } from '../dtos/rule.dto';
import _ from 'lodash';
import { PlexLibraryItem } from 'src/modules/api/plex-api/interfaces/library.interfaces';
import { PlexLibraryItem } from '../../api/plex-api/interfaces/library.interfaces';
import { RulesDto } from '../dtos/rules.dto';
import { ValueGetterService } from '../getter/getter.service';
import { EPlexDataType } from 'src/modules/api/plex-api/enums/plex-data-type-enum';
import { EPlexDataType } from '../../api/plex-api/enums/plex-data-type-enum';
import { RuleDbDto } from '../dtos/ruleDb.dto';
import { RuleConstanstService } from '../constants/constants.service';

Expand Down
2 changes: 1 addition & 1 deletion server/src/modules/rules/rules.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class RulesController {
}
@Post('/execute')
executeRules() {
this.ruleExecutorService.executeAllRules();
this.ruleExecutorService.execute();
}
@Post()
async setRules(@Body() body: RulesDto): Promise<ReturnStatus> {
Expand Down
4 changes: 2 additions & 2 deletions server/src/modules/rules/rules.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import { AddCollectionMedia } from '../collections/interfaces/collection-media.i
import { RuleYamlService } from './helpers/yaml.service';
import { RuleComparatorService } from './helpers/rule.comparator.service';
import { PlexLibraryItem } from '../api/plex-api/interfaces/library.interfaces';
import { ECollectionLogType } from 'src/modules/collections/entities/collection_log.entities';
import cacheManager from 'src/modules/api/lib/cache';
import { ECollectionLogType } from '../collections/entities/collection_log.entities';
import cacheManager from '../api/lib/cache';

export interface ReturnStatus {
code: 0 | 1;
Expand Down
Loading

0 comments on commit b721d20

Please sign in to comment.