Skip to content

Commit

Permalink
Refactor interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Nov 1, 2023
1 parent e3eb57d commit 1dbba9b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 85 deletions.
8 changes: 7 additions & 1 deletion packages/client/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export interface WorkflowUpdateInput {
readonly options: WorkflowUpdateOptions;
}

/** Output for WorkflowClientInterceptor.update */
export interface WorkflowUpdateOutput {
readonly updateId: string;
readonly outcome?: temporal.api.update.v1.IOutcome | null;
}

/** Input for WorkflowClientInterceptor.signal */
export interface WorkflowSignalInput {
readonly signalName: string;
Expand Down Expand Up @@ -94,7 +100,7 @@ export interface WorkflowClientInterceptor {
/**
* Intercept a service call to updateWorkflowExecution
*/
update?: (input: WorkflowUpdateInput, next: Next<this, 'update'>) => Promise<unknown>;
update?: (input: WorkflowUpdateInput, next: Next<this, 'update'>) => Promise<WorkflowUpdateOutput>;
/**
* Intercept a service call to signalWorkflowExecution
*
Expand Down
167 changes: 84 additions & 83 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
WorkflowStartInput,
WorkflowTerminateInput,
WorkflowUpdateInput,
WorkflowUpdateOutput,
} from './interceptors';
import {
DescribeWorkflowExecutionResponse,
Expand Down Expand Up @@ -321,7 +322,7 @@ export interface WorkflowUpdateHandle<Ret> {
/**
* The ID of this Update request.
*/
id: string;
updateId: string;

/**
* The ID of the Workflow being targeted by this Update request.
Expand Down Expand Up @@ -711,73 +712,58 @@ export class WorkflowClient extends BaseClient {
return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads);
}

/**
* Uses given input to make an UpdateWorkflowExecution call to the service, returning a handle to the update.
*
* Used as the final function of the startUpdate interceptor chain
*/
protected async _startUpdateHandler<Ret>(input: WorkflowUpdateInput): Promise<WorkflowUpdateHandle<Ret>> {
return this._startUpdate(
input,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
);
}
protected _makeStartUpdateHandler(waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage) {
/**
* Start Update and return a handle.
*
* Used as the final function of the interceptor chain during startUpdate and executeUpdate.
*/
return async (input: WorkflowUpdateInput): Promise<WorkflowUpdateOutput> => {
const updateId = input.options?.updateId ?? uuid4();
const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = {
namespace: this.options.namespace,
workflowExecution: input.workflowExecution,
firstExecutionRunId: input.firstExecutionRunId,
waitPolicy: { lifecycleStage: waitForStage },
request: {
meta: {
updateId,
identity: this.options.identity,
},
input: {
header: { fields: input.headers },
name: input.updateName,
args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
},
},
};
let response: temporal.api.workflowservice.v1.UpdateWorkflowExecutionResponse;

/**
* Uses given input to make an UpdateWorkflowExecution call to the service, returning the update result.
*
* Used as the final function of the executeUpdate interceptor chain
*/
protected async _executeUpdateHandler<Ret>(input: WorkflowUpdateInput): Promise<Ret> {
const handle = await this._startUpdate<Ret>(
input,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
);
return await handle.result();
try {
response = await this.workflowService.updateWorkflowExecution(req);
} catch (err) {
this.rethrowGrpcError(err, 'Workflow Update failed', input.workflowExecution);
}
return {
updateId,
outcome: response.outcome,
};
};
}

/**
* Start Update and return a handle.
*/
protected async _startUpdate<Ret>(
protected createWorkflowUpdateHandle<Ret>(
input: WorkflowUpdateInput,
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
): Promise<WorkflowUpdateHandle<Ret>> {
const updateId = input.options?.updateId ?? uuid4();
const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = {
namespace: this.options.namespace,
workflowExecution: input.workflowExecution,
firstExecutionRunId: input.firstExecutionRunId,
waitPolicy: { lifecycleStage: waitForStage },
request: {
meta: {
updateId,
identity: this.options.identity,
},
input: {
header: { fields: input.headers },
name: input.updateName,
args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
},
},
};
let response: temporal.api.workflowservice.v1.UpdateWorkflowExecutionResponse;

try {
response = await this.workflowService.updateWorkflowExecution(req);
} catch (err) {
this.rethrowGrpcError(err, 'Workflow Update failed', input.workflowExecution);
}

output: WorkflowUpdateOutput
): WorkflowUpdateHandle<Ret> {
return {
id: updateId,
updateId: output.updateId,
workflowId: input.workflowExecution.workflowId,
workflowRunId: input.workflowExecution.runId,
result: async () => {
// Note that the API guarantees that (response.outcome) <=> (update completed).
// Therefore we will not poll during executeUpdate(), since in that case
// waitForStage == Completed and so at this point the response has an outcome.
const outcome = response.outcome ?? (await this._pollUpdate(updateId, input.workflowExecution));
const outcome = output.outcome ?? (await this._pollUpdate(output.updateId, input.workflowExecution));
if (outcome.failure) {
throw new WorkflowUpdateFailedError(
'Workflow Update failed',
Expand Down Expand Up @@ -1023,6 +1009,28 @@ export class WorkflowClient extends BaseClient {
runIdForResult,
...resultOptions
}: WorkflowHandleOptions): WorkflowHandle<T> {
// TODO (dan): is there a better location? It must not be visible to users on the WorkflowHandle.
const _startUpdate = async <Ret, Args extends unknown[]>(
def: UpdateDefinition<Ret, Args> | string,
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
options?: WithArgs<Args, WorkflowUpdateOptions>
): Promise<WorkflowUpdateHandle<Ret>> => {
const next = this._makeStartUpdateHandler(waitForStage).bind(this);
const fn = composeInterceptors(interceptors, 'update', next);
const { args, ...opts } = options ?? {};
const input = {
workflowExecution: { workflowId, runId },
firstExecutionRunId,
updateName: typeof def === 'string' ? def : def.name,
args: args ?? [],
waitForStage,
headers: {},
options: opts,
};
const output = await fn(input);
return this.createWorkflowUpdateHandle<Ret>(input, output);
};

return {
client: this,
workflowId,
Expand Down Expand Up @@ -1074,38 +1082,31 @@ export class WorkflowClient extends BaseClient {
}
return temporal.api.history.v1.History.create({ events });
},
async executeUpdate<Ret, Args extends unknown[]>(
async startUpdate<Ret, Args extends unknown[]>(
def: UpdateDefinition<Ret, Args> | string,
options?: WithArgs<Args, WorkflowUpdateOptions>
): Promise<Ret> {
const next = this.client._executeUpdateHandler.bind(this.client);
const fn = composeInterceptors(interceptors, 'update', next);
const { args, ...opts } = options ?? {};
return (await fn({
workflowExecution: { workflowId, runId },
firstExecutionRunId,
updateName: typeof def === 'string' ? def : def.name,
args: args ?? [],
headers: {},
options: opts,
})) as Promise<Ret>;
): Promise<WorkflowUpdateHandle<Ret>> {
return await _startUpdate(
def,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
options
);
},
async startUpdate<Ret, Args extends unknown[]>(

async executeUpdate<Ret, Args extends unknown[]>(
def: UpdateDefinition<Ret, Args> | string,
options?: WithArgs<Args, WorkflowUpdateOptions>
): Promise<WorkflowUpdateHandle<Ret>> {
const next = this.client._startUpdateHandler.bind(this.client);
const fn = composeInterceptors(interceptors, 'update', next);
const { args, ...opts } = options ?? {};
return (await fn({
workflowExecution: { workflowId, runId },
firstExecutionRunId,
updateName: typeof def === 'string' ? def : def.name,
args: args ?? [],
headers: {},
options: opts,
})) as Promise<WorkflowUpdateHandle<Ret>>;
): Promise<Ret> {
const handle = await _startUpdate(
def,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
options
);
return await handle.result();
},

async signal<Args extends unknown[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {
const next = this.client._signalWorkflowHandler.bind(this.client);
const fn = composeInterceptors(interceptors, 'signal', next);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Next, UpdateInput, WorkflowInboundCallsInterceptor, WorkflowInterceptors } from '@temporalio/workflow';
import { WorkflowUpdateInput, WorkflowUpdateOutput } from '@temporalio/client';
import { helpers, makeTestFunction } from './helpers';
import { update, workflowWithUpdates, workflowsPath } from './workflows';

Expand All @@ -10,7 +11,7 @@ const test = makeTestFunction({
interceptors: {
workflow: [
{
async update(input, next): Promise<unknown> {
async update(input: WorkflowUpdateInput, next): Promise<WorkflowUpdateOutput> {
return next({ ...input, args: [input.args[0] + '-clientIntercepted', ...input.args.slice(1)] });
},
},
Expand Down

0 comments on commit 1dbba9b

Please sign in to comment.