Skip to content

Commit

Permalink
improve execute to eliminate race conditions (#99)
Browse files Browse the repository at this point in the history
* improve execute to eliminate race conditions

* bump version to 0.1.51
  • Loading branch information
geoffhendrey authored Jan 21, 2025
1 parent 36f6d4c commit 58f2c5e
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 62 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "stated-js",
"version": "0.1.50",
"version": "0.1.51",
"license": "Apache-2.0",
"description": "JSONata embedded in JSON",
"main": "./dist/src/index.js",
Expand Down
13 changes: 11 additions & 2 deletions src/ParallelExecutionPlanDefault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
jsonPtr: JsonPointerString = "/";
didUpdate: boolean = false;
restore?: boolean = false;
circular?:boolean;

constructor(tp: TemplateProcessor, parallelSteps: ParallelExecutionPlan[] = [], vals?: Partial<ParallelExecutionPlan> | null) {
this.output = tp.output;
this.parallel = parallelSteps;
Expand All @@ -38,6 +40,9 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
if (p.data) {
(json as any).data = p.data;
}
if(p.circular){
(json as any).circular = p.circular
}
return json;
}

Expand All @@ -46,7 +51,7 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
}

cleanCopy(tp: TemplateProcessor, source: ParallelExecutionPlanDefault = this): ParallelExecutionPlanDefault {
return new ParallelExecutionPlanDefault(tp, [], {
const fields:any = {
op: source.op,
parallel: source.parallel.map(p => source.cleanCopy(tp, p as any)),
completed: false,
Expand All @@ -55,7 +60,11 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
forkId: "ROOT",
didUpdate: false,
data: source.data
});
};
if(source.circular){
fields.circular = source.circular;
}
return new ParallelExecutionPlanDefault(tp, [], fields);
}

getNodeList(all: boolean = false): JsonPointerString[] {
Expand Down
123 changes: 65 additions & 58 deletions src/ParallelPlanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export class ParallelPlanner implements Planner{
return leaves;
}



async execute(plan: ExecutionPlan): Promise<void>{
//console.log(`executing ${stringifyTemplateJSON(plan)}`);
Expand All @@ -279,76 +279,83 @@ export class ParallelPlanner implements Planner{
* @param step
*/
const _execute = async (step: ParallelExecutionPlan): Promise<ParallelExecutionPlan> => {
const { jsonPtr, op } = step;
const { jsonPtr, op, circular=false } = step;

if(circular){ //if step is marked as circular, evaluate it's expression and bail without following its reference
step.didUpdate = await this.tp.evaluateNode(step);
this.tp.logger.debug(`execute: circular, abort ${step.jsonPtr}`);
return step;
}

// Check if a Promise already exists for this jsonPtr (mutation is put in the map first since it is the root of the plan, so will be found by the leaves that depend on it)
if (promises.has(jsonPtr)) {
this.tp.logger.debug(`execute: waiting ${step.jsonPtr}`);
const promise = promises.get(jsonPtr)!; //don't freak out ... '!' is TS non-null assertion
//return a 'pointer' to the cached plan, or else we create loops with lead nodes in a mutation plan pointing back to the root of the
//plan that holds the mutation
return promise.then(plan=>{
return (plan as ParallelExecutionPlanDefault).getPointer(this.tp);
});
});
}

// Create a placeholder Promise immediately and store it in the map
const placeholderPromise: Promise<ParallelExecutionPlan> = new Promise<ParallelExecutionPlan>((resolve, reject) => {
promises.set(
jsonPtr,
(async () => {
try {
step.output = plan.output;
step.forkId = plan.forkId;
step.forkStack = plan.forkStack;
//await all dependencies ...and replace the parallel array with the executed array, since the
//`promises` Map has caused already executed subtrees to be replaces with their cache-normalized
//equivalent
step.parallel = await Promise.all(
step.parallel.map((d) => {
const executed = _execute(d);
return executed
})
);
//if we are initializing the node, or of it had dependencies that changed, then we
//need to run the step
if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){
if(!step.completed){ //fast forward past completed steps
try {
step.didUpdate = await this.tp.evaluateNode(step);
}catch(error){
throw error;
}
}
}else { //if we are here then we are not initializing a node, but reacting to some mutation.
//and it is possible that the node itself is originally a non-materialized node, that
//has now become materialized because it is inside/within a subtree set by a mutation
const _plan = plan as ParallelExecutionPlanDefault;
const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan );
if(insideMutation){
const theMutation = promises.get(_plan.jsonPtr);
if(!theMutation){
throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`);
}
const mutationCausedAChange= (await theMutation).didUpdate;
if(mutationCausedAChange){
const _didUpdate= await this.tp.evaluateNode(step);
step.didUpdate = _didUpdate;
}
}
let resolve:any, reject:any;
const placeHolderPromise:Promise<ParallelExecutionPlan> = new Promise((res, rej) => {
resolve = res;
reject = rej;
});

const executor = async () => {
try {
step.output = plan.output;
step.forkId = plan.forkId;
step.forkStack = plan.forkStack;
//await all dependencies ...and replace the parallel array with the executed array, since the
//`promises` Map has caused already executed subtrees to be replaces with their cache-normalized
//equivalent
step.parallel = await Promise.all(
step.parallel.map((d) => {
const executed = _execute(d);
return executed
})
);
//if we are initializing the node, or of it had dependencies that changed, then we
//need to run the step
if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){
if(!step.completed){ //fast forward past completed steps
this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`);
step.didUpdate = await this.tp.evaluateNode(step);
}
}else { //if we are here then we are not initializing a node, but reacting to some mutation.
//and it is possible that the node itself is originally a non-materialized node, that
//has now become materialized because it is inside/within a subtree set by a mutation
const _plan = plan as ParallelExecutionPlanDefault;
const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan );
if(insideMutation){
const theMutation = promises.get(_plan.jsonPtr);
if(!theMutation){
throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`);
}
const mutationCausedAChange= (await theMutation).didUpdate;
if(mutationCausedAChange){
this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`);
const _didUpdate= await this.tp.evaluateNode(step);
step.didUpdate = _didUpdate;
}
step.completed = true;
resolve(step);
} catch (error) {
promises.delete(jsonPtr); // Clean up failed promise
reject(error);
}
})().then(() => {//IIFE returns void, so we can't take it as parameter to then
return step as ParallelExecutionPlan; //return the step from the closure
}) // Explicitly return a ParallelExecutionPlan becuase IIFE does not return a known type
);
});
}
step.completed = true;
resolve(step);
} catch (error) {
promises.delete(jsonPtr); // Clean up failed promise
reject(error);
}
};

promises.set(jsonPtr, placeHolderPromise)
executor(); //it is critical that the placeholder be placed in the promises map before we begin executing


return placeholderPromise;
return placeHolderPromise;
}; //end _execute

try { //mutation plan
Expand Down
3 changes: 2 additions & 1 deletion src/TemplateProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ export type PlanStep = {
output:object,
forkStack:Fork[],
forkId:string,
didUpdate:boolean
didUpdate:boolean,
circular?:boolean
}
export type Mutation = {jsonPtr:JsonPointerString, op:Op, data:any};

Expand Down
1 change: 1 addition & 0 deletions src/TraversalState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class TraversalState {
const e = '🔃 Circular dependency ' + this.stack.map(n => n.jsonPtr).join(' → ') + " → " + jsonPtr;
this.tp.warnings.push(e);
this.tp.logger.log('warn', e);
node.circular = true; //flag this step as circular so that during plan execution we can drop circular steps
return false;
}
if (this.stack[this.stack.length - 1]?.op === "noop") {
Expand Down
3 changes: 3 additions & 0 deletions src/test/TemplateProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5491,6 +5491,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand All @@ -5510,6 +5511,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand All @@ -5529,6 +5531,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand Down

0 comments on commit 58f2c5e

Please sign in to comment.