From 1eef324af33cfb70b414ac564788f28dfd6d2c18 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Tue, 12 Nov 2024 07:06:36 -0300 Subject: [PATCH] fix(orchestration): fix set step failure (#10031) What: - copy data before saving checkpoint - removed unused data format function - properly handle registerStepFailure to not throw - emit onFinish event even when execution failed --- .changeset/spotty-cups-accept.md | 8 + .../transaction/distributed-transaction.ts | 6 +- .../helper/__tests__/workflow-export.spec.ts | 144 ------------------ .../src/helper/workflow-export.ts | 51 +++---- .../src/utils/composer/create-workflow.ts | 13 +- .../workflow_transaction_timeout.ts | 9 +- .../src/services/workflow-orchestrator.ts | 38 +++-- .../__fixtures__/workflow_2.ts | 25 +++ .../integration-tests/__tests__/index.spec.ts | 96 +++++++++++- .../src/services/workflow-orchestrator.ts | 39 +++-- 10 files changed, 217 insertions(+), 212 deletions(-) create mode 100644 .changeset/spotty-cups-accept.md delete mode 100644 packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts diff --git a/.changeset/spotty-cups-accept.md b/.changeset/spotty-cups-accept.md new file mode 100644 index 0000000000000..315facfa278bf --- /dev/null +++ b/.changeset/spotty-cups-accept.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +Fix set step failure for async steps diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index e75985ba1febe..0713ec94a7fcb 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -215,9 +215,11 @@ class DistributedTransaction extends EventEmitter { this.modelId, this.transactionId ) - await DistributedTransaction.keyValueStore.save(key, data, ttl, options) - return data + const rawData = JSON.parse(JSON.stringify(data)) + await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options) + + return rawData } public static async loadTransaction( diff --git a/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts deleted file mode 100644 index aa62f7a79c4cc..0000000000000 --- a/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { createMedusaContainer } from "@medusajs/utils" -import { MedusaWorkflow } from "../../medusa-workflow" -import { exportWorkflow } from "../workflow-export" - -jest.mock("@medusajs/orchestration", () => { - return { - TransactionHandlerType: { - INVOKE: "invoke", - COMPENSATE: "compensate", - }, - TransactionState: { - FAILED: "failed", - REVERTED: "reverted", - }, - LocalWorkflow: jest.fn(() => { - return { - run: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - registerStepSuccess: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - registerStepFailure: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - cancel: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "reverted"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - } - }), - } -}) - -describe("Export Workflow", function () { - afterEach(() => { - MedusaWorkflow.workflows = {} - }) - - it("should prepare the input data before initializing the transaction", async function () { - let transformedInput - const prepare = jest.fn().mockImplementation(async (data) => { - data.__transformed = true - transformedInput = data - - return data - }) - - const work = exportWorkflow("id" as any, "result_step", prepare) - - const container = createMedusaContainer() - const wfHandler = work(container) - - const input = { - test: "payload", - } - - const { result } = await wfHandler.run({ - input, - }) - - expect(input).toEqual({ - test: "payload", - }) - - expect(transformedInput).toEqual({ - test: "payload", - __transformed: true, - }) - - expect(result).toEqual("invoke_test") - }) - - describe("Using the exported workflow run", function () { - afterEach(() => { - MedusaWorkflow.workflows = {} - }) - - it("should prepare the input data before initializing the transaction", async function () { - let transformedInput - const prepare = jest.fn().mockImplementation(async (data) => { - data.__transformed = true - transformedInput = data - - return data - }) - - const work = exportWorkflow("id" as any, "result_step", prepare) - - const input = { - test: "payload", - } - - const container = createMedusaContainer() - - const { result } = await work.run({ - input, - container, - }) - - expect(input).toEqual({ - test: "payload", - }) - - expect(transformedInput).toEqual({ - test: "payload", - __transformed: true, - }) - - expect(result).toEqual("invoke_test") - }) - }) -}) diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 9434a6909d569..cac64fec83eb0 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -3,7 +3,6 @@ import { DistributedTransactionEvents, DistributedTransactionType, LocalWorkflow, - TransactionHandlerType, TransactionState, } from "@medusajs/orchestration" import { @@ -18,6 +17,7 @@ import { isPresent, MedusaContextType, Modules, + TransactionHandlerType, } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" @@ -41,13 +41,11 @@ function createContextualWorkflowRunner< >({ workflowId, defaultResult, - dataPreparation, options, container, }: { workflowId: string defaultResult?: string | Symbol - dataPreparation?: (data: TData) => Promise options?: { wrappedInput?: boolean sourcePath?: string @@ -110,7 +108,10 @@ function createContextualWorkflowRunner< events, flowMetadata, ] - const transaction = await method.apply(method, args) as DistributedTransactionType + const transaction = (await method.apply( + method, + args + )) as DistributedTransactionType let errors = transaction.getErrors(TransactionHandlerType.INVOKE) @@ -118,16 +119,24 @@ function createContextualWorkflowRunner< const isCancelled = isCancel && transaction.getState() === TransactionState.REVERTED + const isRegisterStepFailure = + method === originalRegisterStepFailure && + transaction.getState() === TransactionState.REVERTED + + let thrownError = null + if ( - !isCancelled && failedStatus.includes(transaction.getState()) && - throwOnError + !isCancelled && + !isRegisterStepFailure ) { - /*const errorMessage = errors - ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) - ?.join(`${EOL}`)*/ const firstError = errors?.[0]?.error ?? new Error("Unknown error") - throw firstError + + thrownError = firstError + + if (throwOnError) { + throw firstError + } } let result @@ -135,6 +144,8 @@ function createContextualWorkflowRunner< result = resolveValue(resultFrom, transaction.getContext()) if (result instanceof Promise) { result = await result.catch((e) => { + thrownError = e + if (throwOnError) { throw e } @@ -151,6 +162,7 @@ function createContextualWorkflowRunner< errors, transaction, result, + thrownError, } } @@ -175,22 +187,6 @@ function createContextualWorkflowRunner< context.transactionId ??= ulid() context.eventGroupId ??= ulid() - if (typeof dataPreparation === "function") { - try { - const copyInput = input ? JSON.parse(JSON.stringify(input)) : input - input = await dataPreparation(copyInput as TData) - } catch (err) { - if (throwOnError) { - throw new Error( - `Data preparation failed: ${err.message}${EOL}${err.stack}` - ) - } - return { - errors: [err], - } - } - } - return await originalExecution( originalRun, { @@ -339,7 +335,6 @@ function createContextualWorkflowRunner< export const exportWorkflow = ( workflowId: string, defaultResult?: string | Symbol, - dataPreparation?: (data: TData) => Promise, options?: { wrappedInput?: boolean sourcePath?: string @@ -364,7 +359,6 @@ export const exportWorkflow = ( >({ workflowId, defaultResult, - dataPreparation, options, container, }) @@ -390,7 +384,6 @@ export const exportWorkflow = ( >({ workflowId, defaultResult, - dataPreparation, options, container, }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index ca1f6091a7788..fd00c069383b3 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -152,15 +152,10 @@ export function createWorkflow( WorkflowManager.register(name, context.flow, handlers, options) } - const workflow = exportWorkflow( - name, - returnedStep, - undefined, - { - wrappedInput: true, - sourcePath: fileSourcePath, - } - ) + const workflow = exportWorkflow(name, returnedStep, { + wrappedInput: true, + sourcePath: fileSourcePath, + }) const mainFlow = ( container?: LoadedModule[] | MedusaContainer diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts index b02bd97fd1907..66c8b8f0d206a 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts @@ -3,11 +3,14 @@ import { createWorkflow, StepResponse, } from "@medusajs/framework/workflows-sdk" +import { setTimeout } from "timers/promises" const step_1 = createStep( "step_1", - jest.fn((input) => { + jest.fn(async (input) => { input.test = "test" + await setTimeout(200) + return new StepResponse(input, { compensate: 123 }) }), jest.fn((compensateInput) => { @@ -27,9 +30,7 @@ createWorkflow( timeout: 0.1, // 0.1 second }, function (input) { - const resp = step_1(input).config({ - async: true, - }) + const resp = step_1(input) return resp } diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 14b5a7027f4fd..1d25d222d4564 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -131,17 +131,20 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { - let { + const { input, - context, transactionId, resultFrom, - throwOnError, logOnError, events: eventHandlers, container, } = options ?? {} + let { throwOnError, context } = options ?? {} + throwOnError ??= true + context ??= {} + context.transactionId ??= transactionId ?? ulid() + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -153,9 +156,6 @@ export class WorkflowOrchestratorService { ) } - context ??= {} - context.transactionId ??= transactionId ?? ulid() - const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, workflowId, @@ -172,7 +172,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -210,6 +210,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -262,13 +266,15 @@ export class WorkflowOrchestratorService { ) { const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -287,7 +293,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -308,6 +314,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -326,13 +336,15 @@ export class WorkflowOrchestratorService { ) { const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -351,7 +363,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -372,6 +384,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts index fe6d52809b4f2..653567f999d8d 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts @@ -2,6 +2,7 @@ import { createStep, createWorkflow, StepResponse, + WorkflowResponse, } from "@medusajs/framework/workflows-sdk" const step_1 = createStep( @@ -49,6 +50,14 @@ const step_3 = createStep( }) ) +const broken_step_2 = createStep( + "broken_step_2", + jest.fn(() => {}), + jest.fn((_, context) => { + throw new Error("Broken compensation step") + }) +) + createWorkflow( { name: "workflow_2", @@ -67,3 +76,19 @@ createWorkflow( return step_3(ret2) } ) + +createWorkflow( + { + name: "workflow_2_revert_fail", + retentionTime: 1000, + }, + function (input) { + step_1(input) + + broken_step_2().config({ + async: true, + }) + + return new WorkflowResponse("done") + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 323a03c20e2df..c48d4450c197f 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -15,13 +15,13 @@ import { TransactionHandlerType, TransactionStepState, } from "@medusajs/framework/utils" -import { asValue } from "awilix" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { asValue } from "awilix" import { setTimeout } from "timers/promises" +import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { TestDatabase } from "../utils" -import { WorkflowsModuleService } from "../../src/services" jest.setTimeout(999900000) @@ -167,6 +167,98 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) }) + it("should return a list of failed workflow executions and keep it saved when there is a retentionTime set", async () => { + await workflowOrcModule.run("workflow_2", { + input: { + value: "123", + }, + transactionId: "transaction_1", + }) + + let executionsList = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + expect(executionsList).toHaveLength(1) + + await workflowOrcModule.setStepFailure({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "new_step_name", + workflowId: "workflow_2", + transactionId: "transaction_1", + }, + stepResponse: { uhuuuu: "yeaah!" }, + }) + + executionsList = await query({ + workflow_executions: { + fields: ["id", "state"], + }, + }) + + expect(executionsList).toHaveLength(1) + expect(executionsList[0].state).toEqual("reverted") + }) + + it("should throw if setStepFailure fails", async () => { + const { acknowledgement } = await workflowOrcModule.run( + "workflow_2_revert_fail", + { + input: { + value: "123", + }, + } + ) + + let done = false + void workflowOrcModule.subscribe({ + workflowId: "workflow_2_revert_fail", + transactionId: acknowledgement.transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + done = true + } + }, + }) + + let executionsList = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + expect(executionsList).toHaveLength(1) + + const setStepError = await workflowOrcModule + .setStepFailure({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "broken_step_2", + workflowId: "workflow_2_revert_fail", + transactionId: acknowledgement.transactionId, + }, + stepResponse: { uhuuuu: "yeaah!" }, + }) + .catch((e) => { + return e + }) + + expect(setStepError).toEqual({ uhuuuu: "yeaah!" }) + + executionsList = await query({ + workflow_executions: { + fields: ["id", "state", "context"], + }, + }) + + expect(executionsList).toHaveLength(1) + expect(executionsList[0].state).toEqual("failed") + expect(done).toBe(true) + }) + it("should revert the entire transaction when a step timeout expires", async () => { const { transaction, result, errors } = await workflowOrcModule.run( "workflow_step_timeout", diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 3a0269ba0c357..a819e13c4da7c 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -175,17 +175,21 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { - let { + const { input, - context, transactionId, resultFrom, - throwOnError, logOnError, events: eventHandlers, container, } = options ?? {} + let { throwOnError, context } = options ?? {} + + throwOnError ??= true + context ??= {} + context.transactionId ??= transactionId ?? ulid() + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -194,9 +198,6 @@ export class WorkflowOrchestratorService { throw new Error("Workflow ID is required") } - context ??= {} - context.transactionId ??= transactionId ?? ulid() - const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, workflowId, @@ -210,7 +211,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -248,6 +249,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -299,13 +304,15 @@ export class WorkflowOrchestratorService { ) { const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -324,7 +331,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -345,6 +352,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -363,13 +374,15 @@ export class WorkflowOrchestratorService { ) { const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -388,7 +401,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -409,6 +422,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret }