From 27fd1f51f2d81a45a696a65215c86194b487d515 Mon Sep 17 00:00:00 2001 From: Thomas Kortyka Date: Wed, 6 Nov 2024 09:59:27 +0100 Subject: [PATCH] feat: Add Twitch Viewer Cronjob --- .../src/app/cronjob/cronjob.module.ts | 3 +- .../src/app/cronjob/viewer/viewer.cron.ts | 23 ++++++++++ .../src/app/cronjob/viewer/viewer.module.ts | 32 ++++++++++++++ .../app/cronjob/viewer/viewer.processor.ts | 20 +++++++++ .../src/app/cronjob/viewer/viewer.service.ts | 43 +++++++++++++++++++ .../cronjob/warcraft/warcraft.processor.ts | 1 + .../app/cronjob/warcraft/warcraft.service.ts | 12 +++++- 7 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 apps/cronjob-service/src/app/cronjob/viewer/viewer.cron.ts create mode 100644 apps/cronjob-service/src/app/cronjob/viewer/viewer.module.ts create mode 100644 apps/cronjob-service/src/app/cronjob/viewer/viewer.processor.ts create mode 100644 apps/cronjob-service/src/app/cronjob/viewer/viewer.service.ts diff --git a/apps/cronjob-service/src/app/cronjob/cronjob.module.ts b/apps/cronjob-service/src/app/cronjob/cronjob.module.ts index bed72bbd..6537501e 100644 --- a/apps/cronjob-service/src/app/cronjob/cronjob.module.ts +++ b/apps/cronjob-service/src/app/cronjob/cronjob.module.ts @@ -1,7 +1,8 @@ import { Module } from '@nestjs/common'; import { WarcraftModule } from './warcraft/warcraft.module'; +import { ViewerModule } from './viewer/viewer.module'; @Module({ - imports: [WarcraftModule], + imports: [WarcraftModule, ViewerModule], }) export class CronjobModule {} diff --git a/apps/cronjob-service/src/app/cronjob/viewer/viewer.cron.ts b/apps/cronjob-service/src/app/cronjob/viewer/viewer.cron.ts new file mode 100644 index 00000000..00ecce3c --- /dev/null +++ b/apps/cronjob-service/src/app/cronjob/viewer/viewer.cron.ts @@ -0,0 +1,23 @@ +import { Injectable } from '@nestjs/common'; +import { ViewerService } from './viewer.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { Cron, CronExpression } from '@nestjs/schedule'; + +@Injectable() +export class ViewerCron { + constructor( + private readonly service: ViewerService, + @InjectQueue('twitch-viewer') private readonly queue: Queue, + ) {} + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT, { + name: 'Delete Inactive Twitch Viewers', + }) + async checkForInactiveViewers(): Promise { + const viewers = await this.service.getAllViewers(); + for (const viewer of viewers) { + await this.queue.add('twitch-viewer', viewer); + } + } +} diff --git a/apps/cronjob-service/src/app/cronjob/viewer/viewer.module.ts b/apps/cronjob-service/src/app/cronjob/viewer/viewer.module.ts new file mode 100644 index 00000000..e03c3e17 --- /dev/null +++ b/apps/cronjob-service/src/app/cronjob/viewer/viewer.module.ts @@ -0,0 +1,32 @@ +import { Module } from '@nestjs/common'; +import { QueueModule } from '../../queue/queue.module'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ClientsModule } from '@nestjs/microservices'; +import { + clientProvider, + twitch_vhost, + twitch_viewers, +} from '@toxictoast/azkaban-broker-rabbitmq'; +import { brokerDefaultSettings } from '../../broker-defaults'; +import { ViewerService } from './viewer.service'; +import { ViewerProcessor } from './viewer.processor'; +import { ViewerCron } from './viewer.cron'; + +@Module({ + imports: [ + QueueModule, + ScheduleModule.forRoot(), + ClientsModule.register([ + { + name: 'VIEWER_SERVICE', + ...clientProvider({ + queueName: twitch_viewers, + brokerVHost: twitch_vhost, + ...brokerDefaultSettings, + }), + }, + ]), + ], + providers: [ViewerService, ViewerProcessor, ViewerCron], +}) +export class ViewerModule {} diff --git a/apps/cronjob-service/src/app/cronjob/viewer/viewer.processor.ts b/apps/cronjob-service/src/app/cronjob/viewer/viewer.processor.ts new file mode 100644 index 00000000..6e2dcc39 --- /dev/null +++ b/apps/cronjob-service/src/app/cronjob/viewer/viewer.processor.ts @@ -0,0 +1,20 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { ViewerService } from './viewer.service'; +import { Job } from 'bullmq'; +import { ViewerDAO } from '@azkaban/twitch-infrastructure'; + +@Processor('twitch-viewer') +export class ViewerProcessor extends WorkerHost { + constructor(private readonly service: ViewerService) { + super(); + } + + async process(job: Job): Promise { + const viewer = job.data; + const olderThanTwoWeeks = + await this.service.isViewerLastSeen2Weeks(viewer); + if (olderThanTwoWeeks) { + await this.service.deleteViewer(viewer.id); + } + } +} diff --git a/apps/cronjob-service/src/app/cronjob/viewer/viewer.service.ts b/apps/cronjob-service/src/app/cronjob/viewer/viewer.service.ts new file mode 100644 index 00000000..86ecb525 --- /dev/null +++ b/apps/cronjob-service/src/app/cronjob/viewer/viewer.service.ts @@ -0,0 +1,43 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientProxy } from '@nestjs/microservices'; +import { ViewerDAO } from '@azkaban/twitch-infrastructure'; +import { + RmqRecordBuilderHelper, + TwitchViewerTopics, +} from '@toxictoast/azkaban-broker-rabbitmq'; + +@Injectable() +export class ViewerService { + constructor( + @Inject('VIEWER_SERVICE') private readonly viewerClient: ClientProxy, + ) {} + + async getAllViewers(): Promise> { + try { + const payload = RmqRecordBuilderHelper({}); + return await this.viewerClient + .send(TwitchViewerTopics.LIST, payload) + .toPromise(); + } catch (error) { + Logger.error(error); + return []; + } + } + + async isViewerLastSeen2Weeks(viewer: ViewerDAO): Promise { + const twoWeeks = 12096e5; + try { + const lastSeen = new Date(viewer.lastseen_at); + return lastSeen < new Date(Date.now() - twoWeeks); + } catch (error) { + return true; + } + } + + async deleteViewer(id: string): Promise { + const payload = RmqRecordBuilderHelper({ id }); + return await this.viewerClient + .send(TwitchViewerTopics.DELETE, payload) + .toPromise(); + } +} diff --git a/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.processor.ts b/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.processor.ts index 5aa74563..d1aeb8de 100644 --- a/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.processor.ts +++ b/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.processor.ts @@ -33,6 +33,7 @@ export class WarcraftProcessor extends WorkerHost { try { if (data) { await this.service.updateCharacter(id, data); + await this.service.restoreCharacter(id); } else { await this.service.deleteCharacter(id); } diff --git a/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.service.ts b/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.service.ts index ce10acc2..a2efee5c 100644 --- a/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.service.ts +++ b/apps/cronjob-service/src/app/cronjob/warcraft/warcraft.service.ts @@ -1,4 +1,4 @@ -import { Inject } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; import { ApiDAO, CharacterDAO } from '@azkaban/warcraft-infrastructure'; import { @@ -8,6 +8,7 @@ import { } from '@toxictoast/azkaban-broker-rabbitmq'; import { Nullable } from '@toxictoast/azkaban-base-types'; +@Injectable() export class WarcraftService { constructor( @Inject('CHARACTER_SERVICE') @@ -77,4 +78,13 @@ export class WarcraftService { .send(WarcraftCharacterTopics.DELETE, payload) .toPromise(); } + + async restoreCharacter(id: string): Promise { + const payload = RmqRecordBuilderHelper({ + id, + }); + return await this.characterClient + .send(WarcraftCharacterTopics.RESTORE, payload) + .toPromise(); + } }