Skip to content

Commit

Permalink
feat(workflows-sdk): add response to permanent failure (#10177)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlos-r-l-rodrigues authored Nov 20, 2024
1 parent 42c08fa commit aeb5b43
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 23 deletions.
6 changes: 6 additions & 0 deletions .changeset/chilled-dingos-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---

Add response to permanent failure
42 changes: 26 additions & 16 deletions packages/core/orchestration/src/transaction/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
export class PermanentStepFailureError extends Error {
class BaseStepErrror extends Error {
#stepResponse: unknown

constructor(name, message?: string, stepResponse?: unknown) {
super(message)
this.name = name
this.#stepResponse = stepResponse
}

getStepResponse(): unknown {
return this.#stepResponse
}
}

export class PermanentStepFailureError extends BaseStepErrror {
static isPermanentStepFailureError(
error: Error
): error is PermanentStepFailureError {
Expand All @@ -8,26 +22,24 @@ export class PermanentStepFailureError extends Error {
)
}

constructor(message?: string) {
super(message)
this.name = "PermanentStepFailure"
constructor(message?: string, stepResponse?: unknown) {
super("PermanentStepFailure", message, stepResponse)
}
}

export class SkipStepResponse extends Error {
export class SkipStepResponse extends BaseStepErrror {
static isSkipStepResponse(error: Error): error is SkipStepResponse {
return (
error instanceof SkipStepResponse || error?.name === "SkipStepResponse"
)
}

constructor(message?: string) {
super(message)
this.name = "SkipStepResponse"
constructor(message?: string, stepResponse?: unknown) {
super("SkipStepResponse", message, stepResponse)
}
}

export class TransactionStepTimeoutError extends Error {
export class TransactionStepTimeoutError extends BaseStepErrror {
static isTransactionStepTimeoutError(
error: Error
): error is TransactionStepTimeoutError {
Expand All @@ -37,13 +49,12 @@ export class TransactionStepTimeoutError extends Error {
)
}

constructor(message?: string) {
super(message)
this.name = "TransactionStepTimeoutError"
constructor(message?: string, stepResponse?: unknown) {
super("TransactionStepTimeoutError", message, stepResponse)
}
}

export class TransactionTimeoutError extends Error {
export class TransactionTimeoutError extends BaseStepErrror {
static isTransactionTimeoutError(
error: Error
): error is TransactionTimeoutError {
Expand All @@ -53,8 +64,7 @@ export class TransactionTimeoutError extends Error {
)
}

constructor(message?: string) {
super(message)
this.name = "TransactionTimeoutError"
constructor(message?: string, stepResponse?: unknown) {
super("TransactionTimeoutError", message, stepResponse)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from "./types"

import {
isDefined,
isErrorLike,
MedusaError,
promiseAll,
Expand Down Expand Up @@ -764,8 +765,24 @@ export class TransactionOrchestrator extends EventEmitter {

const setStepFailure = async (
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
{
endRetry,
response,
}: {
endRetry?: boolean
response?: unknown
} = {}
) => {
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
}

await TransactionOrchestrator.setStepFailure(
transaction,
step,
Expand Down Expand Up @@ -841,6 +858,8 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()

if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
Expand All @@ -852,11 +871,14 @@ export class TransactionOrchestrator extends EventEmitter {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
await setStepFailure(error, {
endRetry: true,
response,
})
return
}

await setStepFailure(error)
await setStepFailure(error, { response })
})
)
} else {
Expand Down Expand Up @@ -917,14 +939,19 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()

if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
await setStepFailure(error, {
endRetry: true,
response,
})
return
}

await setStepFailure(error)
await setStepFailure(error, { response })
})
})
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2522,4 +2522,78 @@ describe("Workflow composer", function () {
"event-group-id"
)
})

it("should fail step and return response to compensate partial data", async () => {
const maxRetries = 3

const mockStep1Fn = jest.fn().mockImplementation(async (input, context) => {
const ok: number[] = []
const errors: number[] = []
const toInsert = [1, 2, 3, 4, 5, 6, 7, 8]

await promiseAll(
toInsert.map(async (i) => {
// fail on odd numbers
if (i % 2 === 0) {
ok.push(i)
return i
}

errors.push(i)
throw new Error("failed")
})
).catch((e) => {})

if (errors.length > 0) {
return StepResponse.permanentFailure(
"Error inserting " + errors.join(", "),
ok
)
}

return new StepResponse(ok)
})

const mockStep1CompensateFn = jest
.fn()
.mockImplementation((input, context) => {
return input
})

const step1 = createStep(
{ name: "step1", maxRetries },
mockStep1Fn,
mockStep1CompensateFn
)

const step2 = createStep("step2", () => {
throw new Error("failed")
})

const workflow = createWorkflow("workflow1", function (input) {
step1(input)
step2()
})

const workflowInput = { test: "payload1" }
const { errors } = await workflow().run({
input: workflowInput,
throwOnError: false,
})

expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)

expect(mockStep1CompensateFn.mock.calls[0][0]).toEqual([2, 4, 6, 8])

expect(errors).toHaveLength(1)
expect(errors[0]).toEqual({
action: "step1",
handlerType: "invoke",
error: expect.objectContaining({
message: "Error inserting 1, 3, 5, 7",
}),
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,14 @@ export class StepResponse<TOutput, TCompensateInput = TOutput> {
* console.log(result)
* })
*/
static permanentFailure(message = "Permanent failure"): never {
throw new PermanentStepFailureError(message)
static permanentFailure(
message = "Permanent failure",
compensateInput?: unknown
): never {
const response = isDefined(compensateInput)
? new StepResponse(compensateInput)
: undefined
throw new PermanentStepFailureError(message, response)
}

static skip(): SkipStepResponse {
Expand Down

0 comments on commit aeb5b43

Please sign in to comment.