Skip to content

Commit

Permalink
Implement planning for parallel tasks
Browse files Browse the repository at this point in the history
WIP

Change-type: minor
  • Loading branch information
pipex committed Aug 16, 2023
1 parent ab14029 commit e563525
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 36 deletions.
3 changes: 3 additions & 0 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ export class Runtime<TState> {
logger.info(`${action.description}: running ...`);
const state = await this.runAction(action);
if (!equals(this.state, state)) {
// TODO: this is wrong, when using parallel tasks, this means changes
// in parallel branches will get overwritten. We need to calculate patches
// and apply them
this.state = state;

// Notify observer of the new state only if there
Expand Down
45 changes: 45 additions & 0 deletions lib/json.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { expect } from '~/test-utils';
import { merge } from './json';

describe('JSON', () => {
describe('merge', () => {
it('merges simple values', () => {
expect(merge(1, 2)).to.equal(2);
expect(merge('a', 'b')).to.equal('b');
});

it('merges simple arrays', () => {
expect(merge([], [])).to.eql([]);
expect(merge([1, 2, 3], [4, 5, 6])).to.eql([1, 2, 3, 4, 5, 6]);
expect(merge([1, 1], [1, 5, 6])).to.eql([1, 1, 1, 5, 6]);
});

it('merges objects', () => {
expect(merge({}, {})).to.eql({});
expect(merge({ a: 1 }, { b: 2 })).to.eql({ a: 1, b: 2 });
expect(merge({ a: 1 }, { a: 2 })).to.eql({ a: 2 });
expect(merge({ a: 1 }, { a: { b: 2 } })).to.eql({ a: { b: 2 } });
expect(merge({ a: { b: 1 } }, { a: { b: 2 } })).to.eql({ a: { b: 2 } });
expect(merge({ a: { b: 1 } }, { a: { c: 2 } })).to.eql({
a: { b: 1, c: 2 },
});
});

it('merges arrays and objects', () => {
expect(merge([], {})).to.eql({});
expect(merge({}, [])).to.eql({});
expect(merge([1, 2, 3], { a: 1 })).to.eql({ 0: 1, 1: 2, 2: 3, a: 1 });
expect(merge({ a: 1 }, [1, 2, 3])).to.eql({ a: 1, 0: 1, 1: 2, 2: 3 });
expect(merge([1, 2, 3], { a: 1, b: 2 })).to.eql({
0: 1,
1: 2,
2: 3,
a: 1,
b: 2,
});
expect(merge({ a: { b: 1 } }, { a: [1, 2] })).to.eql({
a: { b: 1, 0: 1, 1: 2 },
});
});
});
});
38 changes: 38 additions & 0 deletions lib/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,41 @@ export function equals<T>(value: T, other: T): boolean {

return value === other;
}

/**
* Performs a deep merge between javascript
* objects
*/
export function merge<T>(value: T, other: T): T {
if (Array.isArray(value) && Array.isArray(other)) {
return [...value, ...other] as T;
}

if (isObject(value) && isObject(other)) {
const [vProps, oProps] = [value, other].map(
(a) => Object.keys(a) as Array<keyof T>,
);

// Merge properties removing duplicates
const props = [...vProps, ...oProps].filter(
(p, i, a) => a.indexOf(p) === i,
);

return props.reduce(
(acc, key) => ({
...acc,
[key]: merge(value[key], other[key]),
}),
{} as T,
);
}

// Merge should be equivalent to spread [...]
// if the key is not present in the second object, the
// value of the first object persists
if (other === undefined) {
return value;
}

return other;
}
87 changes: 87 additions & 0 deletions lib/planner.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,93 @@ describe('Planner', () => {
);
});

it('solves parallel problems', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).filter((k) => ctx.target[k] - state[k] > 0)
.length > 1,
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 0)
.map((k) => byOne({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });
expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.action('a + 1')
.end(),
);
});

it('solves parallel problems with methods', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const byTwo = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.target - ctx.get(state) > 1,
method: (_: Counters, ctx) => [byOne({ ...ctx }), byOne({ ...ctx })],
description: ({ counter }) => `increase '${counter}'`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).some((k) => ctx.target[k] - state[k] > 1),
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 1)
.map((k) => byTwo({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byTwo, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });

expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1', 'a + 1')
.branch('b + 1', 'b + 1')
.join()
.action('a + 1')
.end(),
);
});

it.skip('simple travel problem', async () => {
// Alice needs to go to the park and may walk or take a taxi. Depending on the distance to the park and
// the available cash, some actions may be possible
Expand Down
121 changes: 112 additions & 9 deletions lib/planner/findPlan.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { diff as createPatch, patch as applyPatch } from 'mahler-wasm';

import { Context } from '../context';
import { Diff } from '../diff';
import { Operation } from '../operation';
import { Path } from '../path';
import { Pointer } from '../pointer';
import { Action, Instruction, Method, Task, Parallel } from '../task';
import { Plan } from './plan';
import { Node } from './node';
import { ActionNode, EmptyNode, Node } from './node';

Check failure on line 10 in lib/planner/findPlan.ts

View workflow job for this annotation

GitHub Actions / Flowzone / Test npm (18.x)

'ActionNode' is declared but its value is never read.

Check failure on line 10 in lib/planner/findPlan.ts

View workflow job for this annotation

GitHub Actions / Flowzone / Test npm (18.x)

'EmptyNode' is declared but its value is never read.
import {
PlannerConfig,
LoopDetected,
RecursionDetected,
MethodExpansionEmpty,
ConditionNotMet,
SearchFailed,
NotImplemented,
MergeFailed,
} from './types';
import { isTaskApplicable } from './utils';
import assert from '../assert';
Expand All @@ -25,7 +27,7 @@ interface PlanningState<TState = any> {
operation?: Operation<TState, any>;
trace: PlannerConfig<TState>['trace'];
initialPlan: Plan<TState>;
callStack?: Array<Method<TState>>;
callStack?: Array<Method<TState> | Parallel<TState>>;
}

function findLoop<T>(id: string, node: Node<T> | null): boolean {
Expand Down Expand Up @@ -63,10 +65,19 @@ function tryAction<TState = any>(
}
const state = action.effect(initialPlan.state);

// We calculate the changes only at the action level
const changes = createPatch(initialPlan.state, state);

// We create the plan reversed so we can backtrack easily
const start = { id, action, next: initialPlan.start };

return { success: true, state, start, stats: initialPlan.stats };
return {
success: true,
start,
stats: initialPlan.stats,
state,
pendingChanges: initialPlan.pendingChanges.concat(changes),
};
}

function tryMethod<TState = any>(
Expand All @@ -91,11 +102,12 @@ function tryMethod<TState = any>(

// We use spread here to avoid modifying the source object
const plan: Plan<TState> = { ...initialPlan };
const cStack = [...callStack, method];
for (const i of instructions) {
const res = tryInstruction(i, {
...pState,
initialPlan: plan,
callStack: [...callStack, method],
callStack: cStack,
});

if (!res.success) {
Expand All @@ -106,16 +118,96 @@ function tryMethod<TState = any>(
plan.start = res.start;
plan.stats = res.stats;
plan.state = res.state;
plan.pendingChanges = res.pendingChanges;
}

return plan;
}

function tryParallel<TState = any>(
_parallel: Parallel<TState>,
{ initialPlan }: PlanningState<TState>,
parallel: Parallel<TState>,
{ initialPlan, callStack = [], ...pState }: PlanningState<TState>,
): Plan<TState> {
return { success: false, stats: initialPlan.stats, error: NotImplemented };
assert(initialPlan.success);

// look task in the call stack
if (callStack.find((p) => Parallel.equals(p, parallel))) {
return {
success: false,
stats: initialPlan.stats,
error: RecursionDetected,
};
}

const instructions = parallel(initialPlan.state);

// Nothing to do here as other branches may still
// result in actions
if (instructions.length === 0) {
return initialPlan;
}

const empty = Node.empty(initialPlan.start);

const plan: Plan<TState> = {
...initialPlan,
start: empty,
};

let results: Array<Plan<TState> & { success: true }> = [];
const cStack = [...callStack, parallel];
for (const i of instructions) {
const res = tryInstruction(i, {
...pState,
initialPlan: plan,
callStack: cStack,
});

if (!res.success) {
return res;
}

results.push(res);
}

// There should not be any results pointing to null as we passed
// an empty node as the start node to each one
assert(results.every((r) => r.start != null));

// If all branches are empty (they still point to the start node we provided)
// we just return the initialPlan
results = results.filter((r) => r.start !== empty);
if (results.length === 0) {
return initialPlan;
}

// TODO: here is where we check for conflicts created by the parallel plan.
// If two branches change the same part of the state, that means that there is
// a conflict and the planning should fail.
// Intersection is an expensive operation so we probably will just do it during
// testing

// We add the fork node
const start = Node.fork(results.map((r) => r.start!));

// We don't update the state here as
// applyPatch performs changes in place, which means
// we need to make a structured copy of the state
const state = initialPlan.state;

// Since we already checked conflicts, we can just concat the changes
const pendingChanges = results.reduce(
(acc, r) => acc.concat(r.pendingChanges),
initialPlan.pendingChanges,
);

return {
success: true,
state,
pendingChanges,
start,
stats: initialPlan.stats,
};
}

function tryInstruction<TState = any>(
Expand Down Expand Up @@ -173,6 +265,7 @@ export function findPlan<TState = any>({
start: initialPlan.start,
state: initialPlan.state,
stats: { ...stats, maxDepth },
pendingChanges: [],
};
}

Expand Down Expand Up @@ -222,12 +315,22 @@ export function findPlan<TState = any>({
// expansion didn't add any tasks so it makes no sense to go to a
// deeper level
if (taskPlan.start !== initialPlan.start) {
let state: TState;
try {
// applyPatch makes a copy of the source object so we only want to
// perform this operation if the instruction suceeded
state = applyPatch(initialPlan.state, taskPlan.pendingChanges);
} catch (e: any) {
trace(MergeFailed(e));
continue;
}

const res = findPlan({
depth: depth + 1,
diff,
tasks,
trace,
initialPlan: taskPlan,
initialPlan: { ...taskPlan, state, pendingChanges: [] },
callStack,
});

Expand Down
Loading

0 comments on commit e563525

Please sign in to comment.