From 9e195ce736a22a5c31eafcbb97975e74e77a50c9 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 14:09:01 -0300 Subject: [PATCH 1/6] fix(orchestrato): copy data before saving --- .../transaction/distributed-transaction.ts | 5 +- .../helper/__tests__/workflow-export.spec.ts | 144 ------------------ .../src/helper/workflow-export.ts | 40 ++--- .../src/utils/composer/create-workflow.ts | 13 +- .../__fixtures__/workflow_2.ts | 25 +++ .../integration-tests/__tests__/index.spec.ts | 84 +++++++++- 6 files changed, 126 insertions(+), 185 deletions(-) delete mode 100644 packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index e75985ba1febe..58eb906a6b452 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -215,9 +215,10 @@ class DistributedTransaction extends EventEmitter { this.modelId, this.transactionId ) - await DistributedTransaction.keyValueStore.save(key, data, ttl, options) + const rawData = JSON.parse(JSON.stringify(data)) + await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options) - return data + return rawData } public static async loadTransaction( diff --git a/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts deleted file mode 100644 index aa62f7a79c4cc..0000000000000 --- a/packages/core/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { createMedusaContainer } from "@medusajs/utils" -import { MedusaWorkflow } from "../../medusa-workflow" -import { exportWorkflow } from "../workflow-export" - -jest.mock("@medusajs/orchestration", () => { - return { - TransactionHandlerType: { - INVOKE: "invoke", - COMPENSATE: "compensate", - }, - TransactionState: { - FAILED: "failed", - REVERTED: "reverted", - }, - LocalWorkflow: jest.fn(() => { - return { - run: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - registerStepSuccess: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - registerStepFailure: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "done"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - cancel: jest.fn(() => { - return { - getErrors: jest.fn(), - getState: jest.fn(() => "reverted"), - getContext: jest.fn(() => { - return { - invoke: { result_step: "invoke_test" }, - } - }), - } - }), - } - }), - } -}) - -describe("Export Workflow", function () { - afterEach(() => { - MedusaWorkflow.workflows = {} - }) - - it("should prepare the input data before initializing the transaction", async function () { - let transformedInput - const prepare = jest.fn().mockImplementation(async (data) => { - data.__transformed = true - transformedInput = data - - return data - }) - - const work = exportWorkflow("id" as any, "result_step", prepare) - - const container = createMedusaContainer() - const wfHandler = work(container) - - const input = { - test: "payload", - } - - const { result } = await wfHandler.run({ - input, - }) - - expect(input).toEqual({ - test: "payload", - }) - - expect(transformedInput).toEqual({ - test: "payload", - __transformed: true, - }) - - expect(result).toEqual("invoke_test") - }) - - describe("Using the exported workflow run", function () { - afterEach(() => { - MedusaWorkflow.workflows = {} - }) - - it("should prepare the input data before initializing the transaction", async function () { - let transformedInput - const prepare = jest.fn().mockImplementation(async (data) => { - data.__transformed = true - transformedInput = data - - return data - }) - - const work = exportWorkflow("id" as any, "result_step", prepare) - - const input = { - test: "payload", - } - - const container = createMedusaContainer() - - const { result } = await work.run({ - input, - container, - }) - - expect(input).toEqual({ - test: "payload", - }) - - expect(transformedInput).toEqual({ - test: "payload", - __transformed: true, - }) - - expect(result).toEqual("invoke_test") - }) - }) -}) diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 9434a6909d569..be5a18a0245f5 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -3,7 +3,6 @@ import { DistributedTransactionEvents, DistributedTransactionType, LocalWorkflow, - TransactionHandlerType, TransactionState, } from "@medusajs/orchestration" import { @@ -18,6 +17,7 @@ import { isPresent, MedusaContextType, Modules, + TransactionHandlerType, } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" @@ -41,13 +41,11 @@ function createContextualWorkflowRunner< >({ workflowId, defaultResult, - dataPreparation, options, container, }: { workflowId: string defaultResult?: string | Symbol - dataPreparation?: (data: TData) => Promise options?: { wrappedInput?: boolean sourcePath?: string @@ -110,7 +108,10 @@ function createContextualWorkflowRunner< events, flowMetadata, ] - const transaction = await method.apply(method, args) as DistributedTransactionType + const transaction = (await method.apply( + method, + args + )) as DistributedTransactionType let errors = transaction.getErrors(TransactionHandlerType.INVOKE) @@ -118,14 +119,16 @@ function createContextualWorkflowRunner< const isCancelled = isCancel && transaction.getState() === TransactionState.REVERTED + const isRegisterStepFailure = + method === originalRegisterStepFailure && + transaction.getState() === TransactionState.REVERTED + if ( - !isCancelled && + throwOnError && failedStatus.includes(transaction.getState()) && - throwOnError + !isCancelled && + !isRegisterStepFailure ) { - /*const errorMessage = errors - ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) - ?.join(`${EOL}`)*/ const firstError = errors?.[0]?.error ?? new Error("Unknown error") throw firstError } @@ -175,22 +178,6 @@ function createContextualWorkflowRunner< context.transactionId ??= ulid() context.eventGroupId ??= ulid() - if (typeof dataPreparation === "function") { - try { - const copyInput = input ? JSON.parse(JSON.stringify(input)) : input - input = await dataPreparation(copyInput as TData) - } catch (err) { - if (throwOnError) { - throw new Error( - `Data preparation failed: ${err.message}${EOL}${err.stack}` - ) - } - return { - errors: [err], - } - } - } - return await originalExecution( originalRun, { @@ -339,7 +326,6 @@ function createContextualWorkflowRunner< export const exportWorkflow = ( workflowId: string, defaultResult?: string | Symbol, - dataPreparation?: (data: TData) => Promise, options?: { wrappedInput?: boolean sourcePath?: string @@ -364,7 +350,6 @@ export const exportWorkflow = ( >({ workflowId, defaultResult, - dataPreparation, options, container, }) @@ -390,7 +375,6 @@ export const exportWorkflow = ( >({ workflowId, defaultResult, - dataPreparation, options, container, }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index ca1f6091a7788..fd00c069383b3 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -152,15 +152,10 @@ export function createWorkflow( WorkflowManager.register(name, context.flow, handlers, options) } - const workflow = exportWorkflow( - name, - returnedStep, - undefined, - { - wrappedInput: true, - sourcePath: fileSourcePath, - } - ) + const workflow = exportWorkflow(name, returnedStep, { + wrappedInput: true, + sourcePath: fileSourcePath, + }) const mainFlow = ( container?: LoadedModule[] | MedusaContainer diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts index fe6d52809b4f2..653567f999d8d 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts @@ -2,6 +2,7 @@ import { createStep, createWorkflow, StepResponse, + WorkflowResponse, } from "@medusajs/framework/workflows-sdk" const step_1 = createStep( @@ -49,6 +50,14 @@ const step_3 = createStep( }) ) +const broken_step_2 = createStep( + "broken_step_2", + jest.fn(() => {}), + jest.fn((_, context) => { + throw new Error("Broken compensation step") + }) +) + createWorkflow( { name: "workflow_2", @@ -67,3 +76,19 @@ createWorkflow( return step_3(ret2) } ) + +createWorkflow( + { + name: "workflow_2_revert_fail", + retentionTime: 1000, + }, + function (input) { + step_1(input) + + broken_step_2().config({ + async: true, + }) + + return new WorkflowResponse("done") + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 323a03c20e2df..e3d6851985322 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -15,13 +15,13 @@ import { TransactionHandlerType, TransactionStepState, } from "@medusajs/framework/utils" -import { asValue } from "awilix" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { asValue } from "awilix" import { setTimeout } from "timers/promises" +import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { TestDatabase } from "../utils" -import { WorkflowsModuleService } from "../../src/services" jest.setTimeout(999900000) @@ -167,6 +167,86 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) }) + it("should return a list of failed workflow executions and keep it saved when there is a retentionTime set", async () => { + await workflowOrcModule.run("workflow_2", { + input: { + value: "123", + }, + transactionId: "transaction_1", + }) + + let executionsList = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + expect(executionsList).toHaveLength(1) + + await workflowOrcModule.setStepFailure({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "new_step_name", + workflowId: "workflow_2", + transactionId: "transaction_1", + }, + stepResponse: { uhuuuu: "yeaah!" }, + }) + + executionsList = await query({ + workflow_executions: { + fields: ["id", "state"], + }, + }) + + expect(executionsList).toHaveLength(1) + expect(executionsList[0].state).toEqual("reverted") + }) + + it("should throw if setStepFailure fails", async () => { + const { acknowledgement } = await workflowOrcModule.run( + "workflow_2_revert_fail", + { + input: { + value: "123", + }, + } + ) + + let executionsList = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + expect(executionsList).toHaveLength(1) + + const setStepError = await workflowOrcModule + .setStepFailure({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "broken_step_2", + workflowId: "workflow_2_revert_fail", + transactionId: acknowledgement.transactionId, + }, + stepResponse: { uhuuuu: "yeaah!" }, + }) + .catch((e) => { + return e + }) + + expect(setStepError).toEqual({ uhuuuu: "yeaah!" }) + + executionsList = await query({ + workflow_executions: { + fields: ["id", "state", "context"], + }, + }) + + expect(executionsList).toHaveLength(1) + expect(executionsList[0].state).toEqual("failed") + }) + it("should revert the entire transaction when a step timeout expires", async () => { const { transaction, result, errors } = await workflowOrcModule.run( "workflow_step_timeout", From c1d0abbfa58a90a86911247057e57deb947f185a Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 14:40:07 -0300 Subject: [PATCH 2/6] emit onFinish when thrown --- .../src/helper/workflow-export.ts | 13 +++++++-- .../src/services/workflow-orchestrator.ts | 28 +++++++++++++++---- .../integration-tests/__tests__/index.spec.ts | 12 ++++++++ .../src/services/workflow-orchestrator.ts | 28 +++++++++++++++---- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index be5a18a0245f5..cac64fec83eb0 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -123,14 +123,20 @@ function createContextualWorkflowRunner< method === originalRegisterStepFailure && transaction.getState() === TransactionState.REVERTED + let thrownError = null + if ( - throwOnError && failedStatus.includes(transaction.getState()) && !isCancelled && !isRegisterStepFailure ) { const firstError = errors?.[0]?.error ?? new Error("Unknown error") - throw firstError + + thrownError = firstError + + if (throwOnError) { + throw firstError + } } let result @@ -138,6 +144,8 @@ function createContextualWorkflowRunner< result = resolveValue(resultFrom, transaction.getContext()) if (result instanceof Promise) { result = await result.catch((e) => { + thrownError = e + if (throwOnError) { throw e } @@ -154,6 +162,7 @@ function createContextualWorkflowRunner< errors, transaction, result, + thrownError, } } diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 14b5a7027f4fd..421cb022a303b 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -142,6 +142,8 @@ export class WorkflowOrchestratorService { container, } = options ?? {} + throwOnError ??= true + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -172,7 +174,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -210,6 +212,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -260,7 +266,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -269,6 +275,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -287,7 +295,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -308,6 +316,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -324,7 +336,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -333,6 +345,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -351,7 +365,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -372,6 +386,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index e3d6851985322..c48d4450c197f 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -213,6 +213,17 @@ moduleIntegrationTestRunner({ } ) + let done = false + void workflowOrcModule.subscribe({ + workflowId: "workflow_2_revert_fail", + transactionId: acknowledgement.transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + done = true + } + }, + }) + let executionsList = await query({ workflow_executions: { fields: ["id"], @@ -245,6 +256,7 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("failed") + expect(done).toBe(true) }) it("should revert the entire transaction when a step timeout expires", async () => { diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 3a0269ba0c357..9b8c8a6ff10aa 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -186,6 +186,8 @@ export class WorkflowOrchestratorService { container, } = options ?? {} + throwOnError ??= true + const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -210,7 +212,7 @@ export class WorkflowOrchestratorService { const ret = await exportedWorkflow.run({ input, - throwOnError, + throwOnError: false, logOnError, resultFrom, context, @@ -248,6 +250,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return { acknowledgement, ...ret } } @@ -297,7 +303,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -306,6 +312,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -324,7 +332,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -345,6 +353,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } @@ -361,7 +373,7 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - const { + let { context, throwOnError, logOnError, @@ -370,6 +382,8 @@ export class WorkflowOrchestratorService { events: eventHandlers, } = options ?? {} + throwOnError ??= true + const [idempotencyKey_, { workflowId, transactionId }] = this.buildIdempotencyKeyAndParts(idempotencyKey) @@ -388,7 +402,7 @@ export class WorkflowOrchestratorService { idempotencyKey: idempotencyKey_, context, resultFrom, - throwOnError, + throwOnError: false, logOnError, events, response: stepResponse, @@ -409,6 +423,10 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } + if (throwOnError && ret.thrownError) { + throw ret.thrownError + } + return ret } From d09f554f51d90fee61bd1c741e9ec3da5ae6af19 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 14:42:21 -0300 Subject: [PATCH 3/6] changeset --- .changeset/spotty-cups-accept.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/spotty-cups-accept.md diff --git a/.changeset/spotty-cups-accept.md b/.changeset/spotty-cups-accept.md new file mode 100644 index 0000000000000..315facfa278bf --- /dev/null +++ b/.changeset/spotty-cups-accept.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +Fix set step failure for async steps From a27aa99ac4c8e139224e9683a53be9dc8a3b9d0d Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 15:18:45 -0300 Subject: [PATCH 4/6] isObject --- .../orchestration/src/transaction/distributed-transaction.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index 58eb906a6b452..1336d96163908 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -1,4 +1,4 @@ -import { isDefined } from "@medusajs/utils" +import { isDefined, isObject } from "@medusajs/utils" import { EventEmitter } from "events" import { IDistributedTransactionStorage } from "./datastore/abstract-storage" import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage" @@ -215,7 +215,7 @@ class DistributedTransaction extends EventEmitter { this.modelId, this.transactionId ) - const rawData = JSON.parse(JSON.stringify(data)) + const rawData = isObject(data) ? JSON.parse(JSON.stringify(data)) : data await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options) return rawData From 546a7fecf9c0718019053380b62e5e4f86bde7d8 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 11 Nov 2024 15:25:11 -0300 Subject: [PATCH 5/6] fix test --- .../src/transaction/distributed-transaction.ts | 5 +++-- .../__fixtures__/workflow_transaction_timeout.ts | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index 1336d96163908..0713ec94a7fcb 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -1,4 +1,4 @@ -import { isDefined, isObject } from "@medusajs/utils" +import { isDefined } from "@medusajs/utils" import { EventEmitter } from "events" import { IDistributedTransactionStorage } from "./datastore/abstract-storage" import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage" @@ -215,7 +215,8 @@ class DistributedTransaction extends EventEmitter { this.modelId, this.transactionId ) - const rawData = isObject(data) ? JSON.parse(JSON.stringify(data)) : data + + const rawData = JSON.parse(JSON.stringify(data)) await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options) return rawData diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts index b02bd97fd1907..66c8b8f0d206a 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_transaction_timeout.ts @@ -3,11 +3,14 @@ import { createWorkflow, StepResponse, } from "@medusajs/framework/workflows-sdk" +import { setTimeout } from "timers/promises" const step_1 = createStep( "step_1", - jest.fn((input) => { + jest.fn(async (input) => { input.test = "test" + await setTimeout(200) + return new StepResponse(input, { compensate: 123 }) }), jest.fn((compensateInput) => { @@ -27,9 +30,7 @@ createWorkflow( timeout: 0.1, // 0.1 second }, function (input) { - const resp = step_1(input).config({ - async: true, - }) + const resp = step_1(input) return resp } From e4e0fdbd736ed69ad73b7021141bc2dbe7e23aed Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Tue, 12 Nov 2024 06:47:48 -0300 Subject: [PATCH 6/6] normalize const and let --- .../src/services/workflow-orchestrator.ts | 18 ++++++++---------- .../src/services/workflow-orchestrator.ts | 19 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 421cb022a303b..1d25d222d4564 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -131,18 +131,19 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { - let { + const { input, - context, transactionId, resultFrom, - throwOnError, logOnError, events: eventHandlers, container, } = options ?? {} + let { throwOnError, context } = options ?? {} throwOnError ??= true + context ??= {} + context.transactionId ??= transactionId ?? ulid() const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow @@ -155,9 +156,6 @@ export class WorkflowOrchestratorService { ) } - context ??= {} - context.transactionId ??= transactionId ?? ulid() - const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, workflowId, @@ -266,15 +264,15 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - let { + const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} throwOnError ??= true const [idempotencyKey_, { workflowId, transactionId }] = @@ -336,15 +334,15 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - let { + const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} throwOnError ??= true const [idempotencyKey_, { workflowId, transactionId }] = diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 9b8c8a6ff10aa..a819e13c4da7c 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -175,18 +175,20 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { - let { + const { input, - context, transactionId, resultFrom, - throwOnError, logOnError, events: eventHandlers, container, } = options ?? {} + let { throwOnError, context } = options ?? {} + throwOnError ??= true + context ??= {} + context.transactionId ??= transactionId ?? ulid() const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow @@ -196,9 +198,6 @@ export class WorkflowOrchestratorService { throw new Error("Workflow ID is required") } - context ??= {} - context.transactionId ??= transactionId ?? ulid() - const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, workflowId, @@ -303,15 +302,15 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - let { + const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} throwOnError ??= true const [idempotencyKey_, { workflowId, transactionId }] = @@ -373,15 +372,15 @@ export class WorkflowOrchestratorService { }, @MedusaContext() sharedContext: Context = {} ) { - let { + const { context, - throwOnError, logOnError, resultFrom, container, events: eventHandlers, } = options ?? {} + let { throwOnError } = options ?? {} throwOnError ??= true const [idempotencyKey_, { workflowId, transactionId }] =