From e4c96a4d59b3a46763b93fb9e5541bb514ef285c Mon Sep 17 00:00:00 2001 From: adrien2p Date: Fri, 6 Dec 2024 12:18:31 +0100 Subject: [PATCH] chore(workflow-engine): Migrate to DML --- .../src/modules-sdk/joiner-config-builder.ts | 7 +- .../integration-tests/__tests__/index.spec.ts | 115 +++++++----- .../.snapshot-medusa-workflows.json | 171 ++++++++++++++++++ .../src/migrations/Migration20241206101446.ts | 27 +++ .../src/models/index.ts | 2 +- .../src/models/workflow-execution.ts | 102 +++-------- .../src/services/workflows-module.ts | 7 +- .../utils/workflow-orchestrator-storage.ts | 2 + 8 files changed, 305 insertions(+), 128 deletions(-) create mode 100644 packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json create mode 100644 packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts diff --git a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts index 8c5809b6fbbba..b31278a88b5ae 100644 --- a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts +++ b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts @@ -17,7 +17,7 @@ import { toCamelCase, upperCaseFirst, } from "../common" -import { DmlEntity } from "../dml" +import { DmlEntity, IdProperty } from "../dml" import { toGraphQLSchema } from "../dml/helpers/create-graphql" import { PrimaryKeyModifier } from "../dml/properties/primary-key" import { BaseRelationship } from "../dml/relations/base" @@ -396,7 +396,10 @@ export function buildLinkConfigFromModelObjects< } const parsedProperty = (value as PropertyType).parse(property) - if (PrimaryKeyModifier.isPrimaryKeyModifier(value)) { + if ( + PrimaryKeyModifier.isPrimaryKeyModifier(value) || + IdProperty.isIdProperty(value) + ) { const linkableKeyName = parsedProperty.dataType.options?.linkable ?? `${camelToSnakeCase(model.name).toLowerCase()}_${property}` diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e499e9381b37f..765d64facacbe 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,4 +1,7 @@ -import { WorkflowManager } from "@medusajs/framework/orchestration" +import { + DistributedTransactionType, + WorkflowManager, +} from "@medusajs/framework/orchestration" import { Context, IWorkflowEngineService, @@ -60,6 +63,20 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + transaction_id: { + linkable: "workflow_execution_transaction_id", + entity: "WorkflowExecution", + primaryKey: "transaction_id", + serviceName: "workflows", + field: "workflowExecution", + }, + workflow_id: { + linkable: "workflow_execution_workflow_id", + entity: "WorkflowExecution", + primaryKey: "workflow_id", + serviceName: "workflows", + field: "workflowExecution", + }, }, }) }) @@ -87,12 +104,12 @@ moduleIntegrationTestRunner({ }) // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) }) it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { @@ -114,14 +131,19 @@ moduleIntegrationTestRunner({ stepResponse: { hey: "oh" }, }) - const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock - .calls[0][1] as unknown as Context)!.eventGroupId + const generatedEventGroupId = (( + workflowEventGroupIdStep1Mock.mock.calls[0] as any[] + )[1] as unknown as Context)!.eventGroupId // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) }) @@ -139,10 +161,9 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - let executionsList = await query({ - workflow_executions: { - fields: ["workflow_id", "transaction_id", "state"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["workflow_id", "transaction_id", "state"], }) expect(executionsList).toHaveLength(1) @@ -157,11 +178,10 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(0) expect(result).toEqual({ @@ -180,10 +200,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -208,40 +227,38 @@ moduleIntegrationTestRunner({ expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ uhuuuu: "yeaah!", }) - - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(1) }) it("should revert the entire transaction when a step timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_step_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it("should revert the entire transaction when the transaction timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_transaction_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> await setTimeoutPromise(200) - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it.skip("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -393,7 +410,7 @@ moduleIntegrationTestRunner({ }) it("should fetch an idempotent workflow after its completion", async () => { - const { transaction: firstRun } = await workflowOrcModule.run( + const { transaction: firstRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -402,15 +419,14 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - const { transaction: secondRun } = await workflowOrcModule.run( + const { transaction: secondRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -419,15 +435,16 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - const executionsListAfter = await query({ - workflow_executions: { - fields: ["id"], - }, + const { data: executionsListAfter } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - expect(secondRun.flow.startedAt).toEqual(firstRun.flow.startedAt) + expect(secondRun.getFlow().startedAt).toEqual( + firstRun.getFlow().startedAt + ) expect(executionsList).toHaveLength(1) expect(executionsListAfter).toHaveLength(1) }) diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json new file mode 100644 index 0000000000000..0c8c584c91589 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -0,0 +1,171 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "transaction_id": { + "name": "transaction_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "execution": { + "name": "execution", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "context": { + "name": "context", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "state": { + "name": "state", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "enumItems": [ + "not_started", + "invoking", + "waiting_to_compensate", + "compensating", + "done", + "reverted", + "failed" + ], + "mappedType": "enum" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "workflow_execution", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_workflow_execution_deleted_at", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_transaction_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" + }, + { + "keyName": "workflow_execution_pkey", + "columnNames": [ + "workflow_id", + "transaction_id" + ], + "composite": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts new file mode 100644 index 0000000000000..be8b1cbac182a --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts @@ -0,0 +1,27 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241206101446 extends Migration { + async up(): Promise { + this.addSql( + `DROP INDEX IF EXISTS "IDX_workflow_execution_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_state";` + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;' + ) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/index.ts b/packages/modules/workflow-engine-inmemory/src/models/index.ts index 78fcbfa9214f9..fa5b8a3dd019e 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/index.ts @@ -1 +1 @@ -export { default as WorkflowExecution } from "./workflow-execution" +export { WorkflowExecution } from "./workflow-execution" diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 22e693d4283eb..c41bc8936ed1b 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -1,76 +1,30 @@ import { TransactionState } from "@medusajs/framework/orchestration" -import { DALUtils, generateEntityId } from "@medusajs/framework/utils" -import { - BeforeCreate, - Entity, - Enum, - Filter, - Index, - OnInit, - OptionalProps, - PrimaryKey, - Property, - Unique, -} from "@mikro-orm/core" - -type OptionalFields = "deleted_at" - -@Entity() -@Unique({ - name: "IDX_workflow_execution_workflow_id_transaction_id_unique", - properties: ["workflow_id", "transaction_id"], -}) -@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions) -export default class WorkflowExecution { - [OptionalProps]?: OptionalFields - - @Property({ columnType: "text", nullable: false }) - @Index({ name: "IDX_workflow_execution_id" }) - id!: string - - @Index({ name: "IDX_workflow_execution_workflow_id" }) - @PrimaryKey({ columnType: "text" }) - workflow_id: string - - @Index({ name: "IDX_workflow_execution_transaction_id" }) - @PrimaryKey({ columnType: "text" }) - transaction_id: string - - @Property({ columnType: "jsonb", nullable: true }) - execution: Record | null = null - - @Property({ columnType: "jsonb", nullable: true }) - context: Record | null = null - - @Index({ name: "IDX_workflow_execution_state" }) - @Enum(() => TransactionState) - state: TransactionState - - @Property({ - onCreate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", - }) - created_at: Date - - @Property({ - onCreate: () => new Date(), - onUpdate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", +import { model } from "@medusajs/framework/utils" + +export const WorkflowExecution = model + .define("workflow_execution", { + id: model.id({ prefix: "wf_exec" }), + workflow_id: model.text().primaryKey(), + transaction_id: model.text().primaryKey(), + execution: model.json().nullable(), + context: model.json().nullable(), + state: model.enum(TransactionState), }) - updated_at: Date - - @Property({ columnType: "timestamptz", nullable: true }) - deleted_at: Date | null = null - - @BeforeCreate() - onCreate() { - this.id = generateEntityId(this.id, "wf_exec") - } - - @OnInit() - onInit() { - this.id = generateEntityId(this.id, "wf_exec") - } -} + .indexes([ + { + on: ["id"], + where: "deleted_at IS NULL", + }, + { + on: ["workflow_id"], + where: "deleted_at IS NULL", + }, + { + on: ["transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state"], + where: "deleted_at IS NULL", + }, + ]) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 9771f48784aac..3b67e36d2a5c8 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -1,6 +1,7 @@ import { Context, DAL, + InferEntityType, InternalModuleDeclaration, MedusaContainer, ModulesSdkTypes, @@ -25,9 +26,11 @@ type InjectedDependencies = { } export class WorkflowsModuleService< - TWorkflowExecution extends WorkflowExecution = WorkflowExecution + TWorkflowExecution extends InferEntityType< + typeof WorkflowExecution + > = InferEntityType > extends ModulesSdkUtils.MedusaService<{ - WorkflowExecution: { dto: WorkflowExecution } + WorkflowExecution: { dto: InferEntityType } }>({ WorkflowExecution }) { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index bd1a625dc26d1..f7e966edf1fe3 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -48,6 +48,8 @@ export class InMemoryDistributedTransactionStorage } private async saveToDb(data: TransactionCheckpoint) { + const foo = await this.workflowExecutionService_.list() + console.log(foo) await this.workflowExecutionService_.upsert([ { workflow_id: data.flow.modelId,