Skip to content

Commit

Permalink
feat: Add Twitch Viewer Cronjob
Browse files Browse the repository at this point in the history
  • Loading branch information
ToxicToast committed Nov 6, 2024
1 parent b83f444 commit 27fd1f5
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 2 deletions.
3 changes: 2 additions & 1 deletion apps/cronjob-service/src/app/cronjob/cronjob.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Module } from '@nestjs/common';
import { WarcraftModule } from './warcraft/warcraft.module';
import { ViewerModule } from './viewer/viewer.module';

@Module({
imports: [WarcraftModule],
imports: [WarcraftModule, ViewerModule],
})
export class CronjobModule {}
23 changes: 23 additions & 0 deletions apps/cronjob-service/src/app/cronjob/viewer/viewer.cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Injectable } from '@nestjs/common';
import { ViewerService } from './viewer.service';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class ViewerCron {
constructor(
private readonly service: ViewerService,
@InjectQueue('twitch-viewer') private readonly queue: Queue,
) {}

@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT, {
name: 'Delete Inactive Twitch Viewers',
})
async checkForInactiveViewers(): Promise<void> {
const viewers = await this.service.getAllViewers();
for (const viewer of viewers) {
await this.queue.add('twitch-viewer', viewer);
}
}
}
32 changes: 32 additions & 0 deletions apps/cronjob-service/src/app/cronjob/viewer/viewer.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Module } from '@nestjs/common';
import { QueueModule } from '../../queue/queue.module';
import { ScheduleModule } from '@nestjs/schedule';
import { ClientsModule } from '@nestjs/microservices';
import {
clientProvider,
twitch_vhost,
twitch_viewers,
} from '@toxictoast/azkaban-broker-rabbitmq';
import { brokerDefaultSettings } from '../../broker-defaults';
import { ViewerService } from './viewer.service';
import { ViewerProcessor } from './viewer.processor';
import { ViewerCron } from './viewer.cron';

@Module({
imports: [
QueueModule,
ScheduleModule.forRoot(),
ClientsModule.register([
{
name: 'VIEWER_SERVICE',
...clientProvider({
queueName: twitch_viewers,
brokerVHost: twitch_vhost,
...brokerDefaultSettings,
}),
},
]),
],
providers: [ViewerService, ViewerProcessor, ViewerCron],
})
export class ViewerModule {}
20 changes: 20 additions & 0 deletions apps/cronjob-service/src/app/cronjob/viewer/viewer.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { ViewerService } from './viewer.service';
import { Job } from 'bullmq';
import { ViewerDAO } from '@azkaban/twitch-infrastructure';

@Processor('twitch-viewer')
export class ViewerProcessor extends WorkerHost {
constructor(private readonly service: ViewerService) {
super();
}

async process(job: Job<ViewerDAO, void, string>): Promise<void> {
const viewer = job.data;
const olderThanTwoWeeks =
await this.service.isViewerLastSeen2Weeks(viewer);
if (olderThanTwoWeeks) {
await this.service.deleteViewer(viewer.id);
}
}
}
43 changes: 43 additions & 0 deletions apps/cronjob-service/src/app/cronjob/viewer/viewer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ViewerDAO } from '@azkaban/twitch-infrastructure';
import {
RmqRecordBuilderHelper,
TwitchViewerTopics,
} from '@toxictoast/azkaban-broker-rabbitmq';

@Injectable()
export class ViewerService {
constructor(
@Inject('VIEWER_SERVICE') private readonly viewerClient: ClientProxy,
) {}

async getAllViewers(): Promise<Array<ViewerDAO>> {
try {
const payload = RmqRecordBuilderHelper({});
return await this.viewerClient
.send(TwitchViewerTopics.LIST, payload)
.toPromise();
} catch (error) {
Logger.error(error);
return [];
}
}

async isViewerLastSeen2Weeks(viewer: ViewerDAO): Promise<boolean> {
const twoWeeks = 12096e5;
try {
const lastSeen = new Date(viewer.lastseen_at);
return lastSeen < new Date(Date.now() - twoWeeks);
} catch (error) {
return true;
}
}

async deleteViewer(id: string): Promise<ViewerDAO> {
const payload = RmqRecordBuilderHelper({ id });
return await this.viewerClient
.send(TwitchViewerTopics.DELETE, payload)
.toPromise();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class WarcraftProcessor extends WorkerHost {
try {
if (data) {
await this.service.updateCharacter(id, data);
await this.service.restoreCharacter(id);
} else {
await this.service.deleteCharacter(id);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject } from '@nestjs/common';
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ApiDAO, CharacterDAO } from '@azkaban/warcraft-infrastructure';
import {
Expand All @@ -8,6 +8,7 @@ import {
} from '@toxictoast/azkaban-broker-rabbitmq';
import { Nullable } from '@toxictoast/azkaban-base-types';

@Injectable()
export class WarcraftService {
constructor(
@Inject('CHARACTER_SERVICE')
Expand Down Expand Up @@ -77,4 +78,13 @@ export class WarcraftService {
.send(WarcraftCharacterTopics.DELETE, payload)
.toPromise();
}

async restoreCharacter(id: string): Promise<CharacterDAO> {
const payload = RmqRecordBuilderHelper({
id,
});
return await this.characterClient
.send(WarcraftCharacterTopics.RESTORE, payload)
.toPromise();
}
}

0 comments on commit 27fd1f5

Please sign in to comment.