Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(orchestration): fix set step failure #10031

Merged
merged 9 commits into from
Nov 12, 2024
8 changes: 8 additions & 0 deletions .changeset/spotty-cups-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---

Fix set step failure for async steps
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ class DistributedTransaction extends EventEmitter {
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.save(key, data, ttl, options)

return data
const rawData = JSON.parse(JSON.stringify(data))
await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options)
carlos-r-l-rodrigues marked this conversation as resolved.
Show resolved Hide resolved

return rawData
}

public static async loadTransaction(
Expand Down

This file was deleted.

51 changes: 22 additions & 29 deletions packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
DistributedTransactionEvents,
DistributedTransactionType,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
} from "@medusajs/orchestration"
import {
Expand All @@ -18,6 +17,7 @@ import {
isPresent,
MedusaContextType,
Modules,
TransactionHandlerType,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
Expand All @@ -41,13 +41,11 @@ function createContextualWorkflowRunner<
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
}: {
workflowId: string
defaultResult?: string | Symbol
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand Down Expand Up @@ -110,31 +108,44 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args) as DistributedTransactionType
const transaction = (await method.apply(
method,
args
)) as DistributedTransactionType

let errors = transaction.getErrors(TransactionHandlerType.INVOKE)

const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
const isCancelled =
isCancel && transaction.getState() === TransactionState.REVERTED

const isRegisterStepFailure =
method === originalRegisterStepFailure &&
transaction.getState() === TransactionState.REVERTED

let thrownError = null

if (
!isCancelled &&
failedStatus.includes(transaction.getState()) &&
throwOnError
!isCancelled &&
!isRegisterStepFailure
) {
/*const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)*/
const firstError = errors?.[0]?.error ?? new Error("Unknown error")
throw firstError

thrownError = firstError

if (throwOnError) {
throw firstError
}
}

let result
if (options?.wrappedInput) {
result = resolveValue(resultFrom, transaction.getContext())
if (result instanceof Promise) {
result = await result.catch((e) => {
thrownError = e

if (throwOnError) {
throw e
}
Expand All @@ -151,6 +162,7 @@ function createContextualWorkflowRunner<
errors,
transaction,
result,
thrownError,
}
}

Expand All @@ -175,22 +187,6 @@ function createContextualWorkflowRunner<
context.transactionId ??= ulid()
context.eventGroupId ??= ulid()

if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}

return await originalExecution(
originalRun,
{
Expand Down Expand Up @@ -339,7 +335,6 @@ function createContextualWorkflowRunner<
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string | Symbol,
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand All @@ -364,7 +359,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand All @@ -390,7 +384,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,10 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
WorkflowManager.register(name, context.flow, handlers, options)
}

const workflow = exportWorkflow<TData, TResult>(
name,
returnedStep,
undefined,
{
wrappedInput: true,
sourcePath: fileSourcePath,
}
)
const workflow = exportWorkflow<TData, TResult>(name, returnedStep, {
wrappedInput: true,
sourcePath: fileSourcePath,
})

const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import {
createWorkflow,
StepResponse,
} from "@medusajs/framework/workflows-sdk"
import { setTimeout } from "timers/promises"

const step_1 = createStep(
"step_1",
jest.fn((input) => {
jest.fn(async (input) => {
input.test = "test"
await setTimeout(200)
carlos-r-l-rodrigues marked this conversation as resolved.
Show resolved Hide resolved

return new StepResponse(input, { compensate: 123 })
}),
jest.fn((compensateInput) => {
Expand All @@ -27,9 +30,7 @@ createWorkflow(
timeout: 0.1, // 0.1 second
},
function (input) {
const resp = step_1(input).config({
async: true,
})
const resp = step_1(input)

return resp
}
Expand Down
Loading
Loading