Skip to content

Commit

Permalink
chore: workflow internals improvementss (#9455)
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien2p authored Oct 10, 2024
1 parent 1b9379b commit 34d5787
Show file tree
Hide file tree
Showing 26 changed files with 344 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ medusaIntegrationTestRunner({

await deleteLineItemsWorkflow(appContainer).run({
input: {
cart_id: cart.id,
ids: items.map((i) => i.id),
},
throwOnError: false,
Expand Down Expand Up @@ -1211,6 +1212,7 @@ medusaIntegrationTestRunner({

const { errors } = await workflow.run({
input: {
cart_id: cart.id,
ids: items.map((i) => i.id),
},
throwOnError: false,
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/modules/__tests__/cart/store/carts.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
ProductStatus,
PromotionRuleOperator,
PromotionType,
RuleOperator
RuleOperator,
} from "@medusajs/utils"
import { medusaIntegrationTestRunner } from "medusa-test-utils"
import {
Expand Down Expand Up @@ -2131,6 +2131,7 @@ medusaIntegrationTestRunner({
)

expect(response.status).toEqual(200)

expect(response.data.cart).toEqual(
expect.objectContaining({
id: cart.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { MedusaError } from "@medusajs/framework/utils"
import {
WorkflowData,
createWorkflow,
parallelize,
transform,
WorkflowData,
} from "@medusajs/framework/workflows-sdk"
import { useRemoteQueryStep } from "../../common/steps/use-remote-query"
import {
Expand Down Expand Up @@ -102,13 +103,14 @@ export const addShippingMethodToWorkflow = createWorkflow(
return cart.shipping_methods.map((sm) => sm.id)
})

removeShippingMethodFromCartStep({
shipping_method_ids: currentShippingMethods,
})

const shippingMethodsToAdd = addShippingMethodToCartStep({
shipping_methods: shippingMethodInput,
})
const [, shippingMethodsToAdd] = parallelize(
removeShippingMethodFromCartStep({
shipping_method_ids: currentShippingMethods,
}),
addShippingMethodToCartStep({
shipping_methods: shippingMethodInput,
})
)

updateTaxLinesWorkflow.runAsStep({
input: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export const completeCartWorkflow = createWorkflow(
name: completeCartWorkflowId,
store: true,
idempotent: true,
// 3 days of retention time
retentionTime: THREE_DAYS,
},
(
Expand Down
5 changes: 4 additions & 1 deletion packages/core/core-flows/src/cart/workflows/create-carts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ export const createCartWorkflow = createWorkflow(
}

// If there is only one country in the region, we prepare a shipping address with that country's code.
if (!data.input.shipping_address && data.region.countries.length === 1) {
if (
!data.input.shipping_address &&
data.region.countries.length === 1
) {
data_.shipping_address = {
country_code: data.region.countries[0].iso_2,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import {
} from "@medusajs/framework/types"
import { Modules } from "@medusajs/framework/utils"
import {
WorkflowData,
createStep,
createWorkflow,
parallelize,
transform,
WorkflowData,
} from "@medusajs/framework/workflows-sdk"
import { createRemoteLinkStep } from "../../common/steps/create-remote-links"
import { useRemoteQueryStep } from "../../common/steps/use-remote-query"
Expand Down Expand Up @@ -52,9 +53,10 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow(
list: false,
})

validateCartStep({ cart })

validateExistingPaymentCollectionStep({ cart })
parallelize(
validateCartStep({ cart }),
validateExistingPaymentCollectionStep({ cart })
)

const paymentData = transform({ cart }, ({ cart }) => {
return {
Expand Down
19 changes: 9 additions & 10 deletions packages/core/core-flows/src/cart/workflows/update-cart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import {
} from "@medusajs/framework/types"
import { MedusaError, PromotionActions } from "@medusajs/framework/utils"
import {
WorkflowData,
WorkflowResponse,
createHook,
createWorkflow,
parallelize,
transform,
when,
WorkflowData,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { useRemoteQueryStep } from "../../common"
import {
Expand Down Expand Up @@ -138,14 +138,13 @@ export const updateCartWorkflow = createWorkflow(
list: false,
}).config({ name: "refetch–cart" })

parallelize(
refreshCartShippingMethodsStep({ cart }),
updateTaxLinesWorkflow.runAsStep({
input: {
cart_id: carts[0].id,
},
})
)
refreshCartShippingMethodsStep({ cart })

updateTaxLinesWorkflow.runAsStep({
input: {
cart_id: carts[0].id,
},
})

updateCartPromotionsWorkflow.runAsStep({
input: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ import {
Logger,
PaymentSessionDTO,
} from "@medusajs/framework/types"
import { ContainerRegistrationKeys, Modules } from "@medusajs/framework/utils"
import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk"
import {
ContainerRegistrationKeys,
Modules,
promiseAll,
} from "@medusajs/framework/utils"
import { createStep, StepResponse } from "@medusajs/framework/workflows-sdk"

export interface DeletePaymentSessionStepInput {
ids: string[]
Expand All @@ -29,32 +33,42 @@ export const deletePaymentSessionsStep = createStep(
return new StepResponse([], null)
}

for (const id of ids) {
const select = [
"provider_id",
"currency_code",
"amount",
"data",
"context",
"payment_collection.id",
]
const select = [
"provider_id",
"currency_code",
"amount",
"data",
"context",
"payment_collection.id",
]

const sessions = await service.listPaymentSessions({ id: ids }, { select })
const sessionMap = new Map(sessions.map((s) => [s.id, s]))

const [session] = await service.listPaymentSessions({ id }, { select })
const promises: Promise<void>[] = []

for (const id of ids) {
const session = sessionMap.get(id)!

// As this requires an external method call, we will try to delete as many successful calls
// as possible and pass them over to the compensation step to be recreated if any of the
// payment sessions fails to delete.
try {
await service.deletePaymentSession(id)
const promise = service
.deletePaymentSession(id)
.then((res) => {
deleted.push(session)
})
.catch((e) => {
logger.error(
`Encountered an error when trying to delete payment session - ${id} - ${e}`
)
})

deleted.push(session)
} catch (e) {
logger.error(
`Encountered an error when trying to delete payment session - ${id} - ${e}`
)
}
promises.push(promise)
}

await promiseAll(promises)

return new StepResponse(
deleted.map((d) => d.id),
deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class DistributedTransaction extends EventEmitter {
private readonly context: TransactionContext = new TransactionContext()
private static keyValueStore: IDistributedTransactionStorage

/**
* Store data during the life cycle of the current transaction execution.
* Store non persistent data such as transformers results, temporary data, etc.
*
* @private
*/
#temporaryStorage = new Map<string, unknown>()

public static setStorage(storage: IDistributedTransactionStorage) {
this.keyValueStore = storage
}
Expand Down Expand Up @@ -298,6 +306,18 @@ class DistributedTransaction extends EventEmitter {

await DistributedTransaction.keyValueStore.clearStepTimeout(this, step)
}

public setTemporaryData(key: string, value: unknown) {
this.#temporaryStorage.set(key, value)
}

public getTemporaryData(key: string) {
return this.#temporaryStorage.get(key)
}

public hasTemporaryData(key: string) {
return this.#temporaryStorage.has(key)
}
}

DistributedTransaction.setStorage(
Expand Down
49 changes: 33 additions & 16 deletions packages/core/orchestration/src/workflow/local-workflow.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import {
createMedusaContainer,
isDefined,
isString,
MedusaContext,
MedusaContextType,
MedusaError,
MedusaModuleType,
createMedusaContainer,
isDefined,
isString,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
Expand Down Expand Up @@ -107,9 +107,17 @@ export class LocalWorkflow {
return resolved
}

const wrappableMethods = Object.getOwnPropertyNames(resolved).filter(
(key) => key !== "constructor"
)

return new Proxy(resolved, {
get: function (target, prop) {
if (typeof target[prop] !== "function") {
const shouldWrap =
wrappableMethods.includes(prop as string) &&
typeof target[prop] === "function"

if (!shouldWrap) {
return target[prop]
}

Expand All @@ -131,6 +139,7 @@ export class LocalWorkflow {
},
})
}

return container
}

Expand Down Expand Up @@ -369,9 +378,11 @@ export class LocalWorkflow {

await orchestrator.resume(transaction)

cleanUpEventListeners()

return transaction
try {
return transaction
} finally {
cleanUpEventListeners()
}
}

async getRunningTransaction(uniqueTransactionId: string, context?: Context) {
Expand Down Expand Up @@ -406,9 +417,11 @@ export class LocalWorkflow {

await orchestrator.cancelTransaction(transaction)

cleanUpEventListeners()

return transaction
try {
return transaction
} finally {
cleanUpEventListeners()
}
}

async registerStepSuccess(
Expand All @@ -433,9 +446,11 @@ export class LocalWorkflow {
response
)

cleanUpEventListeners()

return transaction
try {
return transaction
} finally {
cleanUpEventListeners()
}
}

async registerStepFailure(
Expand All @@ -459,9 +474,11 @@ export class LocalWorkflow {
handler(this.container_, context)
)

cleanUpEventListeners()

return transaction
try {
return transaction
} finally {
cleanUpEventListeners()
}
}

setOptions(options: Partial<TransactionModelOptions>) {
Expand Down
Loading

0 comments on commit 34d5787

Please sign in to comment.