Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event-bus): event bus error handling #10085

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/orange-donkeys-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/event-bus-redis": patch
"@medusajs/orchestration": patch
"@medusajs/utils": patch
---

Improve event bus error handling
27 changes: 0 additions & 27 deletions packages/core/orchestration/src/transaction/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,3 @@ export class TransactionTimeoutError extends Error {
this.name = "TransactionTimeoutError"
}
}

export function serializeError(error) {
const serialized = {
message: error.message,
name: error.name,
stack: error.stack,
}

Object.getOwnPropertyNames(error).forEach((key) => {
// eslint-disable-next-line no-prototype-builtins
if (!serialized.hasOwnProperty(key)) {
serialized[key] = error[key]
}
})

return serialized
}

export function isErrorLike(value) {
return (
!!value &&
typeof value === "object" &&
"name" in value &&
"message" in value &&
"stack" in value
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import {
TransactionStepStatus,
} from "./types"

import { MedusaError, promiseAll, TransactionStepState } from "@medusajs/utils"
import { EventEmitter } from "events"
import {
isErrorLike,
PermanentStepFailureError,
MedusaError,
promiseAll,
serializeError,
TransactionStepState,
} from "@medusajs/utils"
import { EventEmitter } from "events"
import {
PermanentStepFailureError,
SkipStepResponse,
TransactionStepTimeoutError,
TransactionTimeoutError,
Expand Down
4 changes: 3 additions & 1 deletion packages/core/utils/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export * from "./dynamic-import"
export * from "./env-editor"
export * from "./errors"
export * from "./file-system"
export * from "./filter-operator-map"
export * from "./generate-entity-id"
export * from "./get-caller-file-path"
export * from "./get-config-file"
Expand All @@ -34,6 +35,7 @@ export * from "./is-boolean"
export * from "./is-date"
export * from "./is-defined"
export * from "./is-email"
export * from "./is-error-like"
export * from "./is-object"
export * from "./is-present"
export * from "./is-string"
Expand Down Expand Up @@ -62,6 +64,7 @@ export * from "./remove-undefined-properties"
export * from "./resolve-exports"
export * from "./rules"
export * from "./selector-constraints-to-string"
export * from "./serialize-error"
export * from "./set-metadata"
export * from "./simple-hash"
export * from "./string-to-select-relation-object"
Expand All @@ -74,4 +77,3 @@ export * from "./trim-zeros"
export * from "./upper-case-first"
export * from "./validate-handle"
export * from "./wrap-handler"
export * from "./filter-operator-map"
9 changes: 9 additions & 0 deletions packages/core/utils/src/common/is-error-like.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export function isErrorLike(value) {
return (
!!value &&
typeof value === "object" &&
"name" in value &&
"message" in value &&
"stack" in value
)
}
16 changes: 16 additions & 0 deletions packages/core/utils/src/common/serialize-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export function serializeError(error) {
const serialized = {
message: error.message,
name: error.name,
stack: error.stack,
}

Object.getOwnPropertyNames(error).forEach((key) => {
// eslint-disable-next-line no-prototype-builtins
if (!serialized.hasOwnProperty(key)) {
serialized[key] = error[key]
}
})

return serialized
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
this.eventEmitter_.on(event, async (data: Event) => {
try {
await subscriber(data)
} catch (e) {
} catch (err) {
this.logger_?.error(
`An error occurred while processing ${event.toString()}: ${e}`
`An error occurred while processing ${event.toString()}:`
)
this.logger_?.error(err)
}
})
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jest.mock("bullmq")
jest.mock("ioredis")

const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
info: jest.fn().mockImplementation(console.log),
warn: jest.fn().mockImplementation(console.warn),
error: jest.fn().mockImplementation(console.error),
} as unknown as Logger

const redisMock = {
Expand Down Expand Up @@ -376,7 +376,7 @@ describe("RedisEventBusService", () => {
})
eventBus.subscribe("eventName", () => {
test.push("fail1")
return Promise.reject("fail1")
throw new Error("fail1")
})
eventBus.subscribe("eventName", () => {
test.push("hi2")
Expand All @@ -399,15 +399,21 @@ describe("RedisEventBusService", () => {
"Processing eventName which has 4 subscribers"
)

expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
expect(loggerMock.warn).toHaveBeenCalledTimes(5)
expect(loggerMock.warn).toHaveBeenNthCalledWith(
1,
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail2"
expect(loggerMock.warn).toHaveBeenNthCalledWith(2, new Error("fail1"))

expect(loggerMock.warn).toHaveBeenNthCalledWith(
3,
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenNthCalledWith(4, "fail2")

expect(loggerMock.warn).toHaveBeenCalledWith(
expect(loggerMock.warn).toHaveBeenNthCalledWith(
5,
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
)

Expand Down Expand Up @@ -439,10 +445,11 @@ describe("RedisEventBusService", () => {
} as any)
.catch((error) => void 0)

expect(loggerMock.warn).toHaveBeenCalledTimes(1)
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")

expect(loggerMock.info).toHaveBeenCalledTimes(2)
expect(loggerMock.info).toHaveBeenCalledWith(
Expand Down Expand Up @@ -478,10 +485,12 @@ describe("RedisEventBusService", () => {
} as any)
.catch((err) => void 0)

expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
"An error occurred while processing eventName:"
)
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")

expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying..."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
metadata: data.metadata,
}

return await subscriber(event)
.then(async (data) => {
try {
return await subscriber(event).then((data) => {
// For every subscriber that completes successfully, add their id to the list of completed subscribers
completedSubscribersInCurrentAttempt.push(id)
return data
})
.catch((err) => {
this.logger_.warn(
`An error occurred while processing ${name}: ${err}`
)
return err
})
} catch (err) {
this.logger_?.warn(`An error occurred while processing ${name}:`)
this.logger_?.warn(err)

return err
}
})
)

Expand Down
Loading