Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notice): record recently sent notice ids of given tag in redis for server #152

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions handlers/notify.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
import Redis from 'ioredis'
import { SQSEvent } from 'aws-lambda'
import { getKnexClient } from '../lib/utils/db.js'
import { genMD5 } from '../lib/utils/hash.js'
import { NotificationService } from '../lib/notification/index.js'

const knexConnectionUrl = process.env.MATTERS_PG_CONNECTION_STRING || ''
const knexROConnectionUrl = process.env.MATTERS_PG_RO_CONNECTION_STRING || ''
const redisHost = process.env.MATTERS_REDIS_HOST || ''
const redisPort = parseInt(process.env.MATTERS_REDIS_PORT || '6379', 10)
const deleteNoticeCacheTTL = parseInt(
process.env.MATTERS_DELETE_NOTICE_CACHE_TTL || '180',
10
) // 3 minutes by default

const SKIP_NOTICE_FLAG_PREFIX = 'skip-notice'
const DELETE_NOTICE_KEY_PREFIX = 'delete-notice'

const knex = getKnexClient(knexConnectionUrl)
const knexRO = getKnexClient(knexROConnectionUrl)
const redis = new Redis(redisPort, redisHost)

const notificationService = new NotificationService({ knex, knexRO })

const DEDUPLICATION_CACHE_EXPIRE = 60 * 10 // 10 minutes

export const handler = async (event: SQSEvent) => {
const results = await Promise.allSettled(
event.Records.map(async ({ body }: { body: string }) => {
// skip canceled
console.log(body)
const params = JSON.parse(body)
if ('tag' in params) {
if (await redis.exists(params.tag)) {
console.info(`Tag ${params.tag} exists, skipped`)
const skipFlag = `${SKIP_NOTICE_FLAG_PREFIX}:${params.tag}`
if (await redis.exists(skipFlag)) {
console.info(`Tag ${skipFlag} exists, skipped`)
return
}
}
// deduplication: skip if notice exists
const noticeHashKey = 'notice:' + genMD5(body)
if (await redis.exists(noticeHashKey)) {
console.info(`Notice duplicated, skipped`)
return
}

await notificationService.trigger(params)
const notices = await notificationService.trigger(params)

// deduplication: set notice hash
await redis.set(noticeHashKey, 1, 'EX', DEDUPLICATION_CACHE_EXPIRE)
if (notices.length > 0 && 'tag' in params) {
const deleteKey = `${DELETE_NOTICE_KEY_PREFIX}:${params.tag}`
Promise.all(
notices.map(async (notice) => {
redis.sadd(deleteKey, notice.id)
redis.expire(deleteKey, deleteNoticeCacheTTL)
})
)
}
})
)
// print failed reason
Expand Down
13 changes: 9 additions & 4 deletions lib/__test__/notificationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ describe('find users', () => {
describe('trigger notifications', () => {
test('trigger `write_challenge_applied` notice', async () => {
// no error
await notificationService.trigger({
const [notice] = await notificationService.trigger({
event: OFFICIAL_NOTICE_EXTEND_TYPE.write_challenge_applied,
recipientId: '1',
entities: [
Expand All @@ -206,17 +206,19 @@ describe('trigger notifications', () => {
],
data: { link: 'https://example.com' },
})
expect(notice.id).toBeDefined()
})
test('trigger `badge_grand_slam_awarded` notice', async () => {
// no errors
await notificationService.trigger({
const [notice] = await notificationService.trigger({
event: OFFICIAL_NOTICE_EXTEND_TYPE.badge_grand_slam_awarded,
recipientId: '1',
})
expect(notice.id).toBeDefined()
})
test('trigger `collection_liked` notice', async () => {
// no errors
await notificationService.trigger({
const [notice] = await notificationService.trigger({
event: NOTICE_TYPE.collection_liked,
actorId: '1',
recipientId: '1',
Expand All @@ -228,6 +230,8 @@ describe('trigger notifications', () => {
},
],
})
// actorId is same as recipientId, notice is not created
expect(notice).toBeUndefined()
})
test('trigger `write_challenge_announcement` notice', async () => {
const [{ id: campaignId }] = await knex('campaign')
Expand All @@ -246,7 +250,7 @@ describe('trigger notifications', () => {
state: 'succeeded',
})
// no errors
await notificationService.trigger({
const [notice] = await notificationService.trigger({
event: OFFICIAL_NOTICE_EXTEND_TYPE.write_challenge_announcement,
data: {
link: 'https://example.com',
Expand All @@ -258,5 +262,6 @@ describe('trigger notifications', () => {
},
},
})
expect(notice.id).toBeDefined()
})
})
133 changes: 74 additions & 59 deletions lib/notification/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ export class NotificationService {
const noticeParams = await this.getNoticeParams(params)

if (!noticeParams) {
return
return []
}

const notices = []
for (const [index, recipientId] of noticeParams.recipientIds.entries()) {
// skip if actor === recipient
if ('actorId' in params && params.actorId === recipientId) {
Expand Down Expand Up @@ -91,7 +92,7 @@ export class NotificationService {
}

// Put Notice to DB
const { created, bundled } = await this.process({
const { created, bundled, notice } = await this.process({
...noticeParams,
recipientId,
message: noticeParams.messages ? noticeParams.messages[index] : null,
Expand All @@ -101,7 +102,10 @@ export class NotificationService {
console.info(`Notice ${params.event} to ${recipientId} skipped`)
continue
}

notices.push(notice)
}
return notices
}

public async findActors(
Expand Down Expand Up @@ -168,10 +172,14 @@ export class NotificationService {
*/
private process = async (
params: PutNoticeParams
): Promise<{ created: boolean; bundled: boolean }> => {
): Promise<{
created: boolean
bundled: boolean
notice: { id: string }
}> => {
if (params.bundle?.disabled === true) {
await this.create(params)
return { created: true, bundled: false }
const notice = await this.create(params)
return { created: true, bundled: false, notice }
} else {
const bundleables = await this.findBundleables(params)

Expand All @@ -189,12 +197,16 @@ export class NotificationService {
})
}

return { created: false, bundled: true }
return {
created: false,
bundled: true,
notice: { id: bundleables[0].id },
}
}

// create new notice
await this.create(params)
return { created: true, bundled: false }
const notice = await this.create(params)
return { created: true, bundled: false, notice }
}
}

Expand All @@ -208,67 +220,70 @@ export class NotificationService {
entities,
message,
data,
}: PutNoticeParams): Promise<void> {
await this.knex.transaction(async (trx) => {
// create notice detail
const [{ id: noticeDetailId }] = await trx
.insert({
noticeType: type,
message,
data,
})
.into('notice_detail')
.returning('*')
}: PutNoticeParams): Promise<{ id: string }> {
const trx = await this.knex.transaction()
// create notice detail
const [{ id: noticeDetailId }] = await trx
.insert({
noticeType: type,
message,
data,
})
.into('notice_detail')
.returning('*')

// create notice
const [{ id: noticeId }] = await trx
// create notice
const noticeId = (
await trx
.insert({
uuid: v4(),
noticeDetailId,
recipientId,
})
.into('notice')
.returning('*')
.returning('id')
)[0].id

// create notice actorId
if (actorId) {
await trx
.insert({
noticeId,
actorId,
})
.into('notice_actor')
.returning('*')
}
// create notice actorId
if (actorId) {
await trx
.insert({
noticeId,
actorId,
})
.into('notice_actor')
.returning('*')
}

// create notice entities
if (entities) {
await Promise.all(
entities.map(
async ({
type: entityType,
entityTable,
entity,
}: NotificationEntity) => {
const { id: entityTypeId } = await trx
.select('id')
.from('entity_type')
.where({ table: entityTable })
.first()
await trx
.insert({
type: entityType,
entityTypeId,
entityId: entity.id,
noticeId,
})
.into('notice_entity')
.returning('*')
}
)
// create notice entities
if (entities) {
await Promise.all(
entities.map(
async ({
type: entityType,
entityTable,
entity,
}: NotificationEntity) => {
const { id: entityTypeId } = await trx
.select('id')
.from('entity_type')
.where({ table: entityTable })
.first()
await trx
.insert({
type: entityType,
entityTypeId,
entityId: entity.id,
noticeId,
})
.into('notice_entity')
.returning('*')
}
)
}
})
)
}
await trx.commit()
return { id: noticeId }
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "lambda-handlers-image",
"version": "0.10.4",
"version": "0.10.5",
"private": true,
"type": "module",
"scripts": {
Expand Down
Loading