Skip to content

Commit

Permalink
chore(workflow-engine): Migrate to DML
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien2p committed Dec 6, 2024
1 parent 597bffa commit e4c96a4
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 128 deletions.
7 changes: 5 additions & 2 deletions packages/core/utils/src/modules-sdk/joiner-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
toCamelCase,
upperCaseFirst,
} from "../common"
import { DmlEntity } from "../dml"
import { DmlEntity, IdProperty } from "../dml"
import { toGraphQLSchema } from "../dml/helpers/create-graphql"
import { PrimaryKeyModifier } from "../dml/properties/primary-key"
import { BaseRelationship } from "../dml/relations/base"
Expand Down Expand Up @@ -396,7 +396,10 @@ export function buildLinkConfigFromModelObjects<
}

const parsedProperty = (value as PropertyType<any>).parse(property)
if (PrimaryKeyModifier.isPrimaryKeyModifier(value)) {
if (
PrimaryKeyModifier.isPrimaryKeyModifier(value) ||
IdProperty.isIdProperty(value)
) {
const linkableKeyName =
parsedProperty.dataType.options?.linkable ??
`${camelToSnakeCase(model.name).toLowerCase()}_${property}`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { WorkflowManager } from "@medusajs/framework/orchestration"
import {
DistributedTransactionType,
WorkflowManager,
} from "@medusajs/framework/orchestration"
import {
Context,
IWorkflowEngineService,
Expand Down Expand Up @@ -60,6 +63,20 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
serviceName: "workflows",
field: "workflowExecution",
},
transaction_id: {
linkable: "workflow_execution_transaction_id",
entity: "WorkflowExecution",
primaryKey: "transaction_id",
serviceName: "workflows",
field: "workflowExecution",
},
workflow_id: {
linkable: "workflow_execution_workflow_id",
entity: "WorkflowExecution",
primaryKey: "workflow_id",
serviceName: "workflows",
field: "workflowExecution",
},
},
})
})
Expand Down Expand Up @@ -87,12 +104,12 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})

// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
expect(
(workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1]
).toEqual(expect.objectContaining({ eventGroupId }))
expect(
(workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1]
).toEqual(expect.objectContaining({ eventGroupId }))
})

it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => {
Expand All @@ -114,14 +131,19 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
stepResponse: { hey: "oh" },
})

const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock
.calls[0][1] as unknown as Context)!.eventGroupId
const generatedEventGroupId = ((
workflowEventGroupIdStep1Mock.mock.calls[0] as any[]
)[1] as unknown as Context)!.eventGroupId

// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect(
(workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1]
).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect(
(workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1]
).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
})
Expand All @@ -139,10 +161,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
})

let executionsList = await query({
workflow_executions: {
fields: ["workflow_id", "transaction_id", "state"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["workflow_id", "transaction_id", "state"],
})

expect(executionsList).toHaveLength(1)
Expand All @@ -157,11 +178,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
stepResponse: { uhuuuu: "yeaah!" },
})

executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
}))

expect(executionsList).toHaveLength(0)
expect(result).toEqual({
Expand All @@ -180,10 +200,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
transactionId: "transaction_1",
})

let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})

expect(executionsList).toHaveLength(1)
Expand All @@ -208,40 +227,38 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({
uhuuuu: "yeaah!",
})

executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
}))

expect(executionsList).toHaveLength(1)
})

it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
const { transaction } = (await workflowOrcModule.run(
"workflow_step_timeout",
{
input: {},
throwOnError: false,
}
)
)) as Awaited<{ transaction: DistributedTransactionType }>

expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
})

it("should revert the entire transaction when the transaction timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
const { transaction } = (await workflowOrcModule.run(
"workflow_transaction_timeout",
{
input: {},
throwOnError: false,
}
)
)) as Awaited<{ transaction: DistributedTransactionType }>

await setTimeoutPromise(200)

expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
})

it.skip("should subscribe to a async workflow and receive the response when it finishes", (done) => {
Expand Down Expand Up @@ -393,7 +410,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})

it("should fetch an idempotent workflow after its completion", async () => {
const { transaction: firstRun } = await workflowOrcModule.run(
const { transaction: firstRun } = (await workflowOrcModule.run(
"workflow_idempotent",
{
input: {
Expand All @@ -402,15 +419,14 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
transactionId: "transaction_1",
}
)
)) as Awaited<{ transaction: DistributedTransactionType }>

let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})

const { transaction: secondRun } = await workflowOrcModule.run(
const { transaction: secondRun } = (await workflowOrcModule.run(
"workflow_idempotent",
{
input: {
Expand All @@ -419,15 +435,16 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
transactionId: "transaction_1",
}
)
)) as Awaited<{ transaction: DistributedTransactionType }>

const executionsListAfter = await query({
workflow_executions: {
fields: ["id"],
},
const { data: executionsListAfter } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})

expect(secondRun.flow.startedAt).toEqual(firstRun.flow.startedAt)
expect(secondRun.getFlow().startedAt).toEqual(
firstRun.getFlow().startedAt
)
expect(executionsList).toHaveLength(1)
expect(executionsListAfter).toHaveLength(1)
})
Expand Down
Loading

0 comments on commit e4c96a4

Please sign in to comment.