Skip to content

Commit

Permalink
fix(orchestration): fix set step failure (#10031)
Browse files Browse the repository at this point in the history
What:
 - copy data before saving checkpoint
 - removed unused data format function
 - properly handle registerStepFailure to not throw
 - emit onFinish event even when execution failed
  • Loading branch information
carlos-r-l-rodrigues authored Nov 12, 2024
1 parent 7794faf commit 1eef324
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 212 deletions.
8 changes: 8 additions & 0 deletions .changeset/spotty-cups-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---

Fix set step failure for async steps
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ class DistributedTransaction extends EventEmitter {
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.save(key, data, ttl, options)

return data
const rawData = JSON.parse(JSON.stringify(data))
await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options)

return rawData
}

public static async loadTransaction(
Expand Down

This file was deleted.

51 changes: 22 additions & 29 deletions packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
DistributedTransactionEvents,
DistributedTransactionType,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
} from "@medusajs/orchestration"
import {
Expand All @@ -18,6 +17,7 @@ import {
isPresent,
MedusaContextType,
Modules,
TransactionHandlerType,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
Expand All @@ -41,13 +41,11 @@ function createContextualWorkflowRunner<
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
}: {
workflowId: string
defaultResult?: string | Symbol
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand Down Expand Up @@ -110,31 +108,44 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args) as DistributedTransactionType
const transaction = (await method.apply(
method,
args
)) as DistributedTransactionType

let errors = transaction.getErrors(TransactionHandlerType.INVOKE)

const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
const isCancelled =
isCancel && transaction.getState() === TransactionState.REVERTED

const isRegisterStepFailure =
method === originalRegisterStepFailure &&
transaction.getState() === TransactionState.REVERTED

let thrownError = null

if (
!isCancelled &&
failedStatus.includes(transaction.getState()) &&
throwOnError
!isCancelled &&
!isRegisterStepFailure
) {
/*const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)*/
const firstError = errors?.[0]?.error ?? new Error("Unknown error")
throw firstError

thrownError = firstError

if (throwOnError) {
throw firstError
}
}

let result
if (options?.wrappedInput) {
result = resolveValue(resultFrom, transaction.getContext())
if (result instanceof Promise) {
result = await result.catch((e) => {
thrownError = e

if (throwOnError) {
throw e
}
Expand All @@ -151,6 +162,7 @@ function createContextualWorkflowRunner<
errors,
transaction,
result,
thrownError,
}
}

Expand All @@ -175,22 +187,6 @@ function createContextualWorkflowRunner<
context.transactionId ??= ulid()
context.eventGroupId ??= ulid()

if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}

return await originalExecution(
originalRun,
{
Expand Down Expand Up @@ -339,7 +335,6 @@ function createContextualWorkflowRunner<
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string | Symbol,
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand All @@ -364,7 +359,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand All @@ -390,7 +384,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,10 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
WorkflowManager.register(name, context.flow, handlers, options)
}

const workflow = exportWorkflow<TData, TResult>(
name,
returnedStep,
undefined,
{
wrappedInput: true,
sourcePath: fileSourcePath,
}
)
const workflow = exportWorkflow<TData, TResult>(name, returnedStep, {
wrappedInput: true,
sourcePath: fileSourcePath,
})

const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import {
createWorkflow,
StepResponse,
} from "@medusajs/framework/workflows-sdk"
import { setTimeout } from "timers/promises"

const step_1 = createStep(
"step_1",
jest.fn((input) => {
jest.fn(async (input) => {
input.test = "test"
await setTimeout(200)

return new StepResponse(input, { compensate: 123 })
}),
jest.fn((compensateInput) => {
Expand All @@ -27,9 +30,7 @@ createWorkflow(
timeout: 0.1, // 0.1 second
},
function (input) {
const resp = step_1(input).config({
async: true,
})
const resp = step_1(input)

return resp
}
Expand Down
Loading

0 comments on commit 1eef324

Please sign in to comment.