From 58e774610071dd40a82e178f44b9742173e2b08c Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Sun, 12 May 2024 19:09:00 +0100 Subject: [PATCH] feat(queue): improve jobs --- package.json | 5 +---- src/helpers/queue.ts | 11 +++++++--- src/jobs/base.job.ts | 19 ++++++++++++++++ src/jobs/mail.job.ts | 31 +++++++++++++++++++++++++++ src/jobs/userconfirm.job.ts | 20 ----------------- src/jobs/useremail.job.ts | 22 ------------------- src/jobs/useremailpassword.job.ts | 23 -------------------- src/jobs/userpassword.job.ts | 22 ------------------- src/providers/queueworker.provider.ts | 3 +-- src/services/auth.service.ts | 8 +++++-- src/services/user.service.ts | 25 +++++++++++++++------ tests/e2e/auth.controller.test.ts | 2 +- tests/e2e/user.controller.test.ts | 6 +++--- 13 files changed, 89 insertions(+), 108 deletions(-) create mode 100644 src/jobs/base.job.ts create mode 100644 src/jobs/mail.job.ts delete mode 100644 src/jobs/userconfirm.job.ts delete mode 100644 src/jobs/useremail.job.ts delete mode 100644 src/jobs/useremailpassword.job.ts delete mode 100644 src/jobs/userpassword.job.ts diff --git a/package.json b/package.json index d45b490..b85aed5 100644 --- a/package.json +++ b/package.json @@ -289,10 +289,7 @@ "#src/validators/update.validator" ], "jobs": [ - "#src/jobs/userconfirm.job", - "#src/jobs/useremail.job", - "#src/jobs/useremailpassword.job", - "#src/jobs/userpassword.job" + "#src/jobs/mail.job" ] } } diff --git a/src/helpers/queue.ts b/src/helpers/queue.ts index 97d5134..3b5a37d 100644 --- a/src/helpers/queue.ts +++ b/src/helpers/queue.ts @@ -3,12 +3,17 @@ import { Service } from '@athenna/ioc' import { Database, type DatabaseImpl } from '@athenna/database' class VanillaQueue { + public connection: string private queueName = 'default' private queues: Record = { default: [], deadletter: [] } + public constructor(connection: string) { + this.connection = connection + } + public async truncate() { Object.keys(this.queues).forEach(key => (this.queues[key] = [])) } @@ -179,11 +184,11 @@ class DatabaseQueue { @Service({ alias: 'App/Helpers/Queue', type: 'singleton' }) export class QueueImpl { - public driver: any = new VanillaQueue() + public driver: any = new VanillaQueue('vanilla') public connection(name: string) { - if (name === 'vanilla') { - this.driver = new VanillaQueue() + if (name === 'vanilla' || name === 'default') { + this.driver = new VanillaQueue('vanilla') } if (name === 'database') { diff --git a/src/jobs/base.job.ts b/src/jobs/base.job.ts new file mode 100644 index 0000000..6809939 --- /dev/null +++ b/src/jobs/base.job.ts @@ -0,0 +1,19 @@ +import { Queue } from '#src/providers/facades/queue' + +export class BaseJob { + public static connection() { + return 'default' + } + + public static queue() { + const connection = this.connection() + + return Config.get(`queue.connections.${connection}.queue`) + } + + public queue() { + const Job = this.constructor as typeof BaseJob + + return Queue.connection(Job.connection()).queue(Job.queue()) + } +} diff --git a/src/jobs/mail.job.ts b/src/jobs/mail.job.ts new file mode 100644 index 0000000..94666c6 --- /dev/null +++ b/src/jobs/mail.job.ts @@ -0,0 +1,31 @@ +import { Mail } from '@athenna/mail' +import type { User } from '#src/models/user' +import { BaseJob } from '#src/jobs/base.job' + +type Item = { + view: string + subject: string + user: User + email: string + password: string + token: string +} + +export class MailJob extends BaseJob { + public static queue() { + return 'mail' + } + + public async handle(item: Item) { + await Mail.from('noreply@athenna.io') + .to(item.user.email) + .subject(item.subject) + .view(item.view, { + user: item.user, + email: item.email, + password: item.password, + token: item.token + }) + .send() + } +} diff --git a/src/jobs/userconfirm.job.ts b/src/jobs/userconfirm.job.ts deleted file mode 100644 index 6205d1b..0000000 --- a/src/jobs/userconfirm.job.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Mail } from '@athenna/mail' -import type { User } from '#src/models/user' - -type Item = { - user: User -} - -export class UserConfirmJob { - public static queue() { - return 'user:confirm' - } - - public async handle({ user }: Item) { - await Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Account Confirmation') - .view('mail/confirm', { user }) - .send() - } -} diff --git a/src/jobs/useremail.job.ts b/src/jobs/useremail.job.ts deleted file mode 100644 index f2888e8..0000000 --- a/src/jobs/useremail.job.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Mail } from '@athenna/mail' -import type { User } from '#src/models/user' - -type Item = { - user: User - email: string - token: string -} - -export class UserEmailJob { - public static queue() { - return 'user:email' - } - - public async handle({ user, email, token }: Item) { - await Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Email Change') - .view('mail/change-email', { user, email, token }) - .send() - } -} diff --git a/src/jobs/useremailpassword.job.ts b/src/jobs/useremailpassword.job.ts deleted file mode 100644 index 4738395..0000000 --- a/src/jobs/useremailpassword.job.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { Mail } from '@athenna/mail' -import type { User } from '#src/models/user' - -type Item = { - user: User - email: string - password: string - token: string -} - -export class UserEmailPasswordJob { - public static queue() { - return 'user:email:password' - } - - public async handle({ user, email, password, token }: Item) { - await Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Email & Password Change') - .view('mail/change-email-password', { user, email, password, token }) - .send() - } -} diff --git a/src/jobs/userpassword.job.ts b/src/jobs/userpassword.job.ts deleted file mode 100644 index b6d8d24..0000000 --- a/src/jobs/userpassword.job.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Mail } from '@athenna/mail' -import type { User } from '#src/models/user' - -type Item = { - user: User - password: string - token: string -} - -export class UserPasswordJob { - public static queue() { - return 'user:password' - } - - public async handle({ user, password, token }: Item) { - await Mail.from('noreply@athenna.io') - .to(user.email) - .subject('Athenna Password Change') - .view('mail/change-password', { user, password, token }) - .send() - } -} diff --git a/src/providers/queueworker.provider.ts b/src/providers/queueworker.provider.ts index 3e049ab..59edba2 100644 --- a/src/providers/queueworker.provider.ts +++ b/src/providers/queueworker.provider.ts @@ -1,7 +1,6 @@ import { Log } from '@athenna/logger' import { ServiceProvider } from '@athenna/ioc' import { Exec, Module } from '@athenna/common' -import { Queue } from '#src/providers/facades/queue' export default class QueueWorkerProvider extends ServiceProvider { public intervals = [] @@ -17,7 +16,7 @@ export default class QueueWorkerProvider extends ServiceProvider { const job = this.container.transient(Job, alias).use(alias) const interval = setInterval(async () => { - const queue = Queue.queue(queueName) + const queue = job.queue() if (queue.isEmpty()) { return diff --git a/src/services/auth.service.ts b/src/services/auth.service.ts index d801f4b..02e6e16 100644 --- a/src/services/auth.service.ts +++ b/src/services/auth.service.ts @@ -5,9 +5,9 @@ import { Uuid } from '@athenna/common' import { Service } from '@athenna/ioc' import { Config } from '@athenna/config' import { User } from '#src/models/user' -import { Queue } from '#src/providers/facades/queue' import { UnauthorizedException } from '@athenna/http' import type { UserService } from '#src/services/user.service' +import { Queue } from '#src/providers/facades/queue' @Service() export class AuthService { @@ -47,7 +47,11 @@ export class AuthService { const user = await this.userService.create(data) - await Queue.queue('user:confirm').add(user) + await Queue.connection('vanilla').queue('mail').add({ + user, + view: 'mail/confirm', + subject: 'Athenna Account Confirmation' + }) return user } diff --git a/src/services/user.service.ts b/src/services/user.service.ts index 66d257d..88760f7 100644 --- a/src/services/user.service.ts +++ b/src/services/user.service.ts @@ -5,8 +5,8 @@ import { Role } from '#src/models/role' import { RoleUser } from '#src/models/roleuser' import { RoleEnum } from '#src/enums/role.enum' import { NotFoundException } from '@athenna/http' -import { Queue } from '#src/providers/facades/queue' import { Json, type PaginationOptions } from '@athenna/common' +import { Queue } from '#src/providers/facades/queue' @Service() export class UserService { @@ -62,25 +62,38 @@ export class UserService { switch (`${isEmailEqual}:${isPasswordEqual}`) { case 'false:true': - await Queue.queue('user:email').add({ user, token, email: data.email }) + await Queue.connection('vanilla').queue('mail').add({ + user, + token, + email: data.email, + view: 'mail/change-email', + subject: 'Athenna Email Change' + }) + break case 'true:false': + // TODO create a password_resets table to save the password data.password = await bcrypt.hash(data.password, 10) - await Queue.queue('user:password').add({ + await Queue.connection('vanilla').queue('mail').add({ user, token, - password: data.password + password: data.password, + view: 'mail/change-password', + subject: 'Athenna Email Change' }) break case 'false:false': + // TODO create a password_resets table to save the password data.password = await bcrypt.hash(data.password, 10) - await Queue.queue('user:email:password').add({ + await Queue.connection('vanilla').queue('mail').add({ user, token, email: data.email, - password: data.password + password: data.password, + view: 'mail/change-email-password', + subject: 'Athenna Email & Password Change' }) } diff --git a/tests/e2e/auth.controller.test.ts b/tests/e2e/auth.controller.test.ts index 9f039fe..b33ebac 100644 --- a/tests/e2e/auth.controller.test.ts +++ b/tests/e2e/auth.controller.test.ts @@ -127,7 +127,7 @@ export default class AuthControllerTest extends BaseE2ETest { } }) - const queue = Queue.queue('user:confirm') + const queue = Queue.queue('mail') assert.deepEqual(await queue.length(), 1) assert.isTrue(await User.exists({ email: 'test@athenna.io' })) diff --git a/tests/e2e/user.controller.test.ts b/tests/e2e/user.controller.test.ts index fbfabdc..32c7580 100644 --- a/tests/e2e/user.controller.test.ts +++ b/tests/e2e/user.controller.test.ts @@ -155,7 +155,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = Queue.queue('user:email') + const queue = Queue.queue('mail') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated') @@ -177,7 +177,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = Queue.queue('user:password') + const queue = Queue.queue('mail') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated') @@ -204,7 +204,7 @@ export default class UserControllerTest extends BaseE2ETest { await user.refresh() - const queue = Queue.queue('user:email:password') + const queue = Queue.queue('mail') assert.deepEqual(await queue.length(), 1) assert.deepEqual(user.name, 'Customer Updated')