From 1dbba9b1e84988b8b174f8b5e077bdc062659eda Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 1 Nov 2023 17:11:44 -0400 Subject: [PATCH] Refactor interceptors --- packages/client/src/interceptors.ts | 8 +- packages/client/src/workflow-client.ts | 167 +++++++++--------- .../test-update-interceptors.ts | 3 +- 3 files changed, 93 insertions(+), 85 deletions(-) diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index 767771111..cbe691649 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -35,6 +35,12 @@ export interface WorkflowUpdateInput { readonly options: WorkflowUpdateOptions; } +/** Output for WorkflowClientInterceptor.update */ +export interface WorkflowUpdateOutput { + readonly updateId: string; + readonly outcome?: temporal.api.update.v1.IOutcome | null; +} + /** Input for WorkflowClientInterceptor.signal */ export interface WorkflowSignalInput { readonly signalName: string; @@ -94,7 +100,7 @@ export interface WorkflowClientInterceptor { /** * Intercept a service call to updateWorkflowExecution */ - update?: (input: WorkflowUpdateInput, next: Next) => Promise; + update?: (input: WorkflowUpdateInput, next: Next) => Promise; /** * Intercept a service call to signalWorkflowExecution * diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 74407d9b1..642b2fcf8 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -51,6 +51,7 @@ import { WorkflowStartInput, WorkflowTerminateInput, WorkflowUpdateInput, + WorkflowUpdateOutput, } from './interceptors'; import { DescribeWorkflowExecutionResponse, @@ -321,7 +322,7 @@ export interface WorkflowUpdateHandle { /** * The ID of this Update request. */ - id: string; + updateId: string; /** * The ID of the Workflow being targeted by this Update request. @@ -711,73 +712,58 @@ export class WorkflowClient extends BaseClient { return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads); } - /** - * Uses given input to make an UpdateWorkflowExecution call to the service, returning a handle to the update. - * - * Used as the final function of the startUpdate interceptor chain - */ - protected async _startUpdateHandler(input: WorkflowUpdateInput): Promise> { - return this._startUpdate( - input, - temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED - ); - } + protected _makeStartUpdateHandler(waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage) { + /** + * Start Update and return a handle. + * + * Used as the final function of the interceptor chain during startUpdate and executeUpdate. + */ + return async (input: WorkflowUpdateInput): Promise => { + const updateId = input.options?.updateId ?? uuid4(); + const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = { + namespace: this.options.namespace, + workflowExecution: input.workflowExecution, + firstExecutionRunId: input.firstExecutionRunId, + waitPolicy: { lifecycleStage: waitForStage }, + request: { + meta: { + updateId, + identity: this.options.identity, + }, + input: { + header: { fields: input.headers }, + name: input.updateName, + args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + }, + }, + }; + let response: temporal.api.workflowservice.v1.UpdateWorkflowExecutionResponse; - /** - * Uses given input to make an UpdateWorkflowExecution call to the service, returning the update result. - * - * Used as the final function of the executeUpdate interceptor chain - */ - protected async _executeUpdateHandler(input: WorkflowUpdateInput): Promise { - const handle = await this._startUpdate( - input, - temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED - ); - return await handle.result(); + try { + response = await this.workflowService.updateWorkflowExecution(req); + } catch (err) { + this.rethrowGrpcError(err, 'Workflow Update failed', input.workflowExecution); + } + return { + updateId, + outcome: response.outcome, + }; + }; } - /** - * Start Update and return a handle. - */ - protected async _startUpdate( + protected createWorkflowUpdateHandle( input: WorkflowUpdateInput, - waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage - ): Promise> { - const updateId = input.options?.updateId ?? uuid4(); - const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = { - namespace: this.options.namespace, - workflowExecution: input.workflowExecution, - firstExecutionRunId: input.firstExecutionRunId, - waitPolicy: { lifecycleStage: waitForStage }, - request: { - meta: { - updateId, - identity: this.options.identity, - }, - input: { - header: { fields: input.headers }, - name: input.updateName, - args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, - }, - }, - }; - let response: temporal.api.workflowservice.v1.UpdateWorkflowExecutionResponse; - - try { - response = await this.workflowService.updateWorkflowExecution(req); - } catch (err) { - this.rethrowGrpcError(err, 'Workflow Update failed', input.workflowExecution); - } - + output: WorkflowUpdateOutput + ): WorkflowUpdateHandle { return { - id: updateId, + updateId: output.updateId, workflowId: input.workflowExecution.workflowId, workflowRunId: input.workflowExecution.runId, result: async () => { // Note that the API guarantees that (response.outcome) <=> (update completed). // Therefore we will not poll during executeUpdate(), since in that case // waitForStage == Completed and so at this point the response has an outcome. - const outcome = response.outcome ?? (await this._pollUpdate(updateId, input.workflowExecution)); + const outcome = output.outcome ?? (await this._pollUpdate(output.updateId, input.workflowExecution)); if (outcome.failure) { throw new WorkflowUpdateFailedError( 'Workflow Update failed', @@ -1023,6 +1009,28 @@ export class WorkflowClient extends BaseClient { runIdForResult, ...resultOptions }: WorkflowHandleOptions): WorkflowHandle { + // TODO (dan): is there a better location? It must not be visible to users on the WorkflowHandle. + const _startUpdate = async ( + def: UpdateDefinition | string, + waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage, + options?: WithArgs + ): Promise> => { + const next = this._makeStartUpdateHandler(waitForStage).bind(this); + const fn = composeInterceptors(interceptors, 'update', next); + const { args, ...opts } = options ?? {}; + const input = { + workflowExecution: { workflowId, runId }, + firstExecutionRunId, + updateName: typeof def === 'string' ? def : def.name, + args: args ?? [], + waitForStage, + headers: {}, + options: opts, + }; + const output = await fn(input); + return this.createWorkflowUpdateHandle(input, output); + }; + return { client: this, workflowId, @@ -1074,38 +1082,31 @@ export class WorkflowClient extends BaseClient { } return temporal.api.history.v1.History.create({ events }); }, - async executeUpdate( + async startUpdate( def: UpdateDefinition | string, options?: WithArgs - ): Promise { - const next = this.client._executeUpdateHandler.bind(this.client); - const fn = composeInterceptors(interceptors, 'update', next); - const { args, ...opts } = options ?? {}; - return (await fn({ - workflowExecution: { workflowId, runId }, - firstExecutionRunId, - updateName: typeof def === 'string' ? def : def.name, - args: args ?? [], - headers: {}, - options: opts, - })) as Promise; + ): Promise> { + return await _startUpdate( + def, + temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED, + options + ); }, - async startUpdate( + + async executeUpdate( def: UpdateDefinition | string, options?: WithArgs - ): Promise> { - const next = this.client._startUpdateHandler.bind(this.client); - const fn = composeInterceptors(interceptors, 'update', next); - const { args, ...opts } = options ?? {}; - return (await fn({ - workflowExecution: { workflowId, runId }, - firstExecutionRunId, - updateName: typeof def === 'string' ? def : def.name, - args: args ?? [], - headers: {}, - options: opts, - })) as Promise>; + ): Promise { + const handle = await _startUpdate( + def, + temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, + options + ); + return await handle.result(); }, + async signal(def: SignalDefinition | string, ...args: Args): Promise { const next = this.client._signalWorkflowHandler.bind(this.client); const fn = composeInterceptors(interceptors, 'signal', next); diff --git a/packages/test/src/integration-tests/test-update-interceptors.ts b/packages/test/src/integration-tests/test-update-interceptors.ts index ef9fabbc6..25557e8a1 100644 --- a/packages/test/src/integration-tests/test-update-interceptors.ts +++ b/packages/test/src/integration-tests/test-update-interceptors.ts @@ -1,4 +1,5 @@ import { Next, UpdateInput, WorkflowInboundCallsInterceptor, WorkflowInterceptors } from '@temporalio/workflow'; +import { WorkflowUpdateInput, WorkflowUpdateOutput } from '@temporalio/client'; import { helpers, makeTestFunction } from './helpers'; import { update, workflowWithUpdates, workflowsPath } from './workflows'; @@ -10,7 +11,7 @@ const test = makeTestFunction({ interceptors: { workflow: [ { - async update(input, next): Promise { + async update(input: WorkflowUpdateInput, next): Promise { return next({ ...input, args: [input.args[0] + '-clientIntercepted', ...input.args.slice(1)] }); }, },