diff --git a/.changeset/chilled-dingos-learn.md b/.changeset/chilled-dingos-learn.md new file mode 100644 index 0000000000000..6a226da9f6096 --- /dev/null +++ b/.changeset/chilled-dingos-learn.md @@ -0,0 +1,6 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +Add response to permanent failure diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index e41f160150260..9fe72059bf580 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -1,4 +1,18 @@ -export class PermanentStepFailureError extends Error { +class BaseStepErrror extends Error { + #stepResponse: unknown + + constructor(name, message?: string, stepResponse?: unknown) { + super(message) + this.name = name + this.#stepResponse = stepResponse + } + + getStepResponse(): unknown { + return this.#stepResponse + } +} + +export class PermanentStepFailureError extends BaseStepErrror { static isPermanentStepFailureError( error: Error ): error is PermanentStepFailureError { @@ -8,26 +22,24 @@ export class PermanentStepFailureError extends Error { ) } - constructor(message?: string) { - super(message) - this.name = "PermanentStepFailure" + constructor(message?: string, stepResponse?: unknown) { + super("PermanentStepFailure", message, stepResponse) } } -export class SkipStepResponse extends Error { +export class SkipStepResponse extends BaseStepErrror { static isSkipStepResponse(error: Error): error is SkipStepResponse { return ( error instanceof SkipStepResponse || error?.name === "SkipStepResponse" ) } - constructor(message?: string) { - super(message) - this.name = "SkipStepResponse" + constructor(message?: string, stepResponse?: unknown) { + super("SkipStepResponse", message, stepResponse) } } -export class TransactionStepTimeoutError extends Error { +export class TransactionStepTimeoutError extends BaseStepErrror { static isTransactionStepTimeoutError( error: Error ): error is TransactionStepTimeoutError { @@ -37,13 +49,12 @@ export class TransactionStepTimeoutError extends Error { ) } - constructor(message?: string) { - super(message) - this.name = "TransactionStepTimeoutError" + constructor(message?: string, stepResponse?: unknown) { + super("TransactionStepTimeoutError", message, stepResponse) } } -export class TransactionTimeoutError extends Error { +export class TransactionTimeoutError extends BaseStepErrror { static isTransactionTimeoutError( error: Error ): error is TransactionTimeoutError { @@ -53,8 +64,7 @@ export class TransactionTimeoutError extends Error { ) } - constructor(message?: string) { - super(message) - this.name = "TransactionTimeoutError" + constructor(message?: string, stepResponse?: unknown) { + super("TransactionTimeoutError", message, stepResponse) } } diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 912a8e16f2202..7bb4ebd241cfb 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -18,6 +18,7 @@ import { } from "./types" import { + isDefined, isErrorLike, MedusaError, promiseAll, @@ -764,8 +765,24 @@ export class TransactionOrchestrator extends EventEmitter { const setStepFailure = async ( error: Error | any, - { endRetry }: { endRetry?: boolean } = {} + { + endRetry, + response, + }: { + endRetry?: boolean + response?: unknown + } = {} ) => { + if (isDefined(response) && step.saveResponse) { + transaction.addResponse( + step.definition.action!, + step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE, + response + ) + } + await TransactionOrchestrator.setStepFailure( transaction, step, @@ -841,6 +858,8 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + const response = error?.getStepResponse?.() + if (this.hasExpired({ transaction, step }, Date.now())) { await this.checkStepTimeout(transaction, step) await this.checkTransactionTimeout( @@ -852,11 +871,14 @@ export class TransactionOrchestrator extends EventEmitter { if ( PermanentStepFailureError.isPermanentStepFailureError(error) ) { - await setStepFailure(error, { endRetry: true }) + await setStepFailure(error, { + endRetry: true, + response, + }) return } - await setStepFailure(error) + await setStepFailure(error, { response }) }) ) } else { @@ -917,14 +939,19 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + const response = error?.getStepResponse?.() + if ( PermanentStepFailureError.isPermanentStepFailureError(error) ) { - await setStepFailure(error, { endRetry: true }) + await setStepFailure(error, { + endRetry: true, + response, + }) return } - await setStepFailure(error) + await setStepFailure(error, { response }) }) }) ) diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index f3ffe2cec1bfd..db58a02064232 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -2522,4 +2522,78 @@ describe("Workflow composer", function () { "event-group-id" ) }) + + it("should fail step and return response to compensate partial data", async () => { + const maxRetries = 3 + + const mockStep1Fn = jest.fn().mockImplementation(async (input, context) => { + const ok: number[] = [] + const errors: number[] = [] + const toInsert = [1, 2, 3, 4, 5, 6, 7, 8] + + await promiseAll( + toInsert.map(async (i) => { + // fail on odd numbers + if (i % 2 === 0) { + ok.push(i) + return i + } + + errors.push(i) + throw new Error("failed") + }) + ).catch((e) => {}) + + if (errors.length > 0) { + return StepResponse.permanentFailure( + "Error inserting " + errors.join(", "), + ok + ) + } + + return new StepResponse(ok) + }) + + const mockStep1CompensateFn = jest + .fn() + .mockImplementation((input, context) => { + return input + }) + + const step1 = createStep( + { name: "step1", maxRetries }, + mockStep1Fn, + mockStep1CompensateFn + ) + + const step2 = createStep("step2", () => { + throw new Error("failed") + }) + + const workflow = createWorkflow("workflow1", function (input) { + step1(input) + step2() + }) + + const workflowInput = { test: "payload1" } + const { errors } = await workflow().run({ + input: workflowInput, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep1CompensateFn.mock.calls[0][0]).toEqual([2, 4, 6, 8]) + + expect(errors).toHaveLength(1) + expect(errors[0]).toEqual({ + action: "step1", + handlerType: "invoke", + error: expect.objectContaining({ + message: "Error inserting 1, 3, 5, 7", + }), + }) + }) }) diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts index 88f2becc0e55b..76a14e3325550 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -115,8 +115,14 @@ export class StepResponse { * console.log(result) * }) */ - static permanentFailure(message = "Permanent failure"): never { - throw new PermanentStepFailureError(message) + static permanentFailure( + message = "Permanent failure", + compensateInput?: unknown + ): never { + const response = isDefined(compensateInput) + ? new StepResponse(compensateInput) + : undefined + throw new PermanentStepFailureError(message, response) } static skip(): SkipStepResponse {