Skip to content

Commit

Permalink
emit onFinish when thrown
Browse files Browse the repository at this point in the history
  • Loading branch information
carlos-r-l-rodrigues committed Nov 11, 2024
1 parent 9e195ce commit c1d0abb
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
13 changes: 11 additions & 2 deletions packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,29 @@ function createContextualWorkflowRunner<
method === originalRegisterStepFailure &&
transaction.getState() === TransactionState.REVERTED

let thrownError = null

if (
throwOnError &&
failedStatus.includes(transaction.getState()) &&
!isCancelled &&
!isRegisterStepFailure
) {
const firstError = errors?.[0]?.error ?? new Error("Unknown error")
throw firstError

thrownError = firstError

if (throwOnError) {
throw firstError
}
}

let result
if (options?.wrappedInput) {
result = resolveValue(resultFrom, transaction.getContext())
if (result instanceof Promise) {
result = await result.catch((e) => {
thrownError = e

if (throwOnError) {
throw e
}
Expand All @@ -154,6 +162,7 @@ function createContextualWorkflowRunner<
errors,
transaction,
result,
thrownError,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ export class WorkflowOrchestratorService {
container,
} = options ?? {}

throwOnError ??= true

const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
: workflowIdOrWorkflow.getName()
Expand Down Expand Up @@ -172,7 +174,7 @@ export class WorkflowOrchestratorService {

const ret = await exportedWorkflow.run({
input,
throwOnError,
throwOnError: false,
logOnError,
resultFrom,
context,
Expand Down Expand Up @@ -210,6 +212,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return { acknowledgement, ...ret }
}

Expand Down Expand Up @@ -260,7 +266,7 @@ export class WorkflowOrchestratorService {
},
@MedusaContext() sharedContext: Context = {}
) {
const {
let {
context,
throwOnError,
logOnError,
Expand All @@ -269,6 +275,8 @@ export class WorkflowOrchestratorService {
events: eventHandlers,
} = options ?? {}

throwOnError ??= true

const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)

Expand All @@ -287,7 +295,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
Expand All @@ -308,6 +316,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return ret
}

Expand All @@ -324,7 +336,7 @@ export class WorkflowOrchestratorService {
},
@MedusaContext() sharedContext: Context = {}
) {
const {
let {
context,
throwOnError,
logOnError,
Expand All @@ -333,6 +345,8 @@ export class WorkflowOrchestratorService {
events: eventHandlers,
} = options ?? {}

throwOnError ??= true

const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)

Expand All @@ -351,7 +365,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
Expand All @@ -372,6 +386,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return ret
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
}
)

let done = false
void workflowOrcModule.subscribe({
workflowId: "workflow_2_revert_fail",
transactionId: acknowledgement.transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
done = true
}
},
})

let executionsList = await query({
workflow_executions: {
fields: ["id"],
Expand Down Expand Up @@ -245,6 +256,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({

expect(executionsList).toHaveLength(1)
expect(executionsList[0].state).toEqual("failed")
expect(done).toBe(true)
})

it("should revert the entire transaction when a step timeout expires", async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ export class WorkflowOrchestratorService {
container,
} = options ?? {}

throwOnError ??= true

const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
: workflowIdOrWorkflow.getName()
Expand All @@ -210,7 +212,7 @@ export class WorkflowOrchestratorService {

const ret = await exportedWorkflow.run({
input,
throwOnError,
throwOnError: false,
logOnError,
resultFrom,
context,
Expand Down Expand Up @@ -248,6 +250,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return { acknowledgement, ...ret }
}

Expand Down Expand Up @@ -297,7 +303,7 @@ export class WorkflowOrchestratorService {
},
@MedusaContext() sharedContext: Context = {}
) {
const {
let {
context,
throwOnError,
logOnError,
Expand All @@ -306,6 +312,8 @@ export class WorkflowOrchestratorService {
events: eventHandlers,
} = options ?? {}

throwOnError ??= true

const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)

Expand All @@ -324,7 +332,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
Expand All @@ -345,6 +353,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return ret
}

Expand All @@ -361,7 +373,7 @@ export class WorkflowOrchestratorService {
},
@MedusaContext() sharedContext: Context = {}
) {
const {
let {
context,
throwOnError,
logOnError,
Expand All @@ -370,6 +382,8 @@ export class WorkflowOrchestratorService {
events: eventHandlers,
} = options ?? {}

throwOnError ??= true

const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)

Expand All @@ -388,7 +402,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
Expand All @@ -409,6 +423,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}

if (throwOnError && ret.thrownError) {
throw ret.thrownError
}

return ret
}

Expand Down

0 comments on commit c1d0abb

Please sign in to comment.