Skip to content

Commit

Permalink
fix(orchestrato): copy data before saving
Browse files Browse the repository at this point in the history
  • Loading branch information
carlos-r-l-rodrigues committed Nov 11, 2024
1 parent 1a0882c commit 9e195ce
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ class DistributedTransaction extends EventEmitter {
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.save(key, data, ttl, options)
const rawData = JSON.parse(JSON.stringify(data))
await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options)

return data
return rawData
}

public static async loadTransaction(
Expand Down

This file was deleted.

40 changes: 12 additions & 28 deletions packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
DistributedTransactionEvents,
DistributedTransactionType,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
} from "@medusajs/orchestration"
import {
Expand All @@ -18,6 +17,7 @@ import {
isPresent,
MedusaContextType,
Modules,
TransactionHandlerType,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
Expand All @@ -41,13 +41,11 @@ function createContextualWorkflowRunner<
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
}: {
workflowId: string
defaultResult?: string | Symbol
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand Down Expand Up @@ -110,22 +108,27 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args) as DistributedTransactionType
const transaction = (await method.apply(
method,
args
)) as DistributedTransactionType

let errors = transaction.getErrors(TransactionHandlerType.INVOKE)

const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
const isCancelled =
isCancel && transaction.getState() === TransactionState.REVERTED

const isRegisterStepFailure =
method === originalRegisterStepFailure &&
transaction.getState() === TransactionState.REVERTED

if (
!isCancelled &&
throwOnError &&
failedStatus.includes(transaction.getState()) &&
throwOnError
!isCancelled &&
!isRegisterStepFailure
) {
/*const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)*/
const firstError = errors?.[0]?.error ?? new Error("Unknown error")
throw firstError
}
Expand Down Expand Up @@ -175,22 +178,6 @@ function createContextualWorkflowRunner<
context.transactionId ??= ulid()
context.eventGroupId ??= ulid()

if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}

return await originalExecution(
originalRun,
{
Expand Down Expand Up @@ -339,7 +326,6 @@ function createContextualWorkflowRunner<
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string | Symbol,
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
sourcePath?: string
Expand All @@ -364,7 +350,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand All @@ -390,7 +375,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,10 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
WorkflowManager.register(name, context.flow, handlers, options)
}

const workflow = exportWorkflow<TData, TResult>(
name,
returnedStep,
undefined,
{
wrappedInput: true,
sourcePath: fileSourcePath,
}
)
const workflow = exportWorkflow<TData, TResult>(name, returnedStep, {
wrappedInput: true,
sourcePath: fileSourcePath,
})

const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"

const step_1 = createStep(
Expand Down Expand Up @@ -49,6 +50,14 @@ const step_3 = createStep(
})
)

const broken_step_2 = createStep(
"broken_step_2",
jest.fn(() => {}),
jest.fn((_, context) => {
throw new Error("Broken compensation step")
})
)

createWorkflow(
{
name: "workflow_2",
Expand All @@ -67,3 +76,19 @@ createWorkflow(
return step_3(ret2)
}
)

createWorkflow(
{
name: "workflow_2_revert_fail",
retentionTime: 1000,
},
function (input) {
step_1(input)

broken_step_2().config({
async: true,
})

return new WorkflowResponse("done")
}
)
Loading

0 comments on commit 9e195ce

Please sign in to comment.