From c1d0abbfa58a90a86911247057e57deb947f185a Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 14:40:07 -0300 Subject: [PATCH] emit onFinish when thrown --- .../src/helper/workflow-export.ts | 13 +++++++-- .../src/services/workflow-orchestrator.ts | 28 +++++++++++++++---- .../integration-tests/__tests__/index.spec.ts | 12 ++++++++ .../src/services/workflow-orchestrator.ts | 28 +++++++++++++++---- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index be5a18a0245f5..cac64fec83eb0 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -123,14 +123,20 @@ function createContextualWorkflowRunner< method === originalRegisterStepFailure && transaction.getState() === TransactionState.REVERTED + let thrownError = null + if ( - throwOnError && failedStatus.includes(transaction.getState()) && !isCancelled && !isRegisterStepFailure ) { const firstError = errors?.[0]?.error ?? new Error("Unknown error") - throw firstError + + thrownError = firstError + + if (throwOnError) { + throw firstError + } } let result @@ -138,6 +144,8 @@ function createContextualWorkflowRunner< result = resolveValue(resultFrom, transaction.getContext()) if (result instanceof Promise) { result = await result.catch((e) => { + thrownError = e + if (throwOnError) { throw e } @@ -154,6 +162,7 @@ function createContextualWorkflowRunner< errors, transaction, result, + thrownError, } } diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 14b5a7027f4fd..421cb022a303b 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -142,6 +142,8 @@ export class WorkflowOrchestratorService { container, } = options ?? {} + throwOnError ??= true + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -172,7 +174,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -210,6 +212,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -260,7 +266,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -269,6 +275,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -287,7 +295,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -308,6 +316,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -324,7 +336,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -333,6 +345,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -351,7 +365,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -372,6 +386,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index e3d6851985322..c48d4450c197f 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -213,6 +213,17 @@ moduleIntegrationTestRunner({ } ) + let done = false + void workflowOrcModule.subscribe({ + workflowId: "workflow_2_revert_fail", + transactionId: acknowledgement.transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + done = true + } + }, + }) + let executionsList = await query({ workflow_executions: { fields: ["id"], @@ -245,6 +256,7 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("failed") + expect(done).toBe(true) }) it("should revert the entire transaction when a step timeout expires", async () => { diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 3a0269ba0c357..9b8c8a6ff10aa 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -186,6 +186,8 @@ export class WorkflowOrchestratorService { container, } = options ?? {} + throwOnError ??= true + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -210,7 +212,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -248,6 +250,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -297,7 +303,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -306,6 +312,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -324,7 +332,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -345,6 +353,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -361,7 +373,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -370,6 +382,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -388,7 +402,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -409,6 +423,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret }