Skip to content

Commit

Permalink
Convert Plan from a Linked List to a DAG
Browse files Browse the repository at this point in the history
Plan nodes can now be of three types, a regular action node (same as
before), a fork node, creating branching on the plan, and an empty node,
to indicate joining of previously created branches. This commit also
updates all existing functions to deal with this new plan format, even
though the planner still cannot generate these types of plans

Change-type: minor
  • Loading branch information
pipex committed Aug 16, 2023
1 parent 4361c94 commit ab14029
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 61 deletions.
35 changes: 27 additions & 8 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,14 @@ export class Runtime<TState> {
}
}

private async runPlan(node: Node<TState> | null) {
private async runPlan(node: Node<TState> | null): Promise<void> {
const { logger } = this.opts;
while (node != null) {

if (node == null) {
return;
}

if (Node.isAction(node)) {
const { action } = node;

if (this.stopped) {
Expand All @@ -146,9 +151,15 @@ export class Runtime<TState> {
}
logger.info(`${action.description}: success`);

// Advance the list
node = node.next;
return await this.runPlan(node.next);
}

if (Node.isFork(node)) {
// Run children in parallel
await Promise.all(node.next.map(this.runPlan));
}

// Nothing to do if the node is empty
}

start() {
Expand All @@ -158,12 +169,20 @@ export class Runtime<TState> {

const { logger } = this.opts;

const toArray = <T>(n: Node<T> | null): string[] => {
if (n == null) {
const flatten = <T>(node: Node<T> | null): string[] => {
if (node == null) {
return [];
}

return [n.action.description, ...toArray(n.next)];
if (Node.isAction(node)) {
return [node.action.description, ...flatten(node.next)];
}

if (Node.isFork(node)) {
node.next.flatMap((n) => flatten(n));
}

return [];
};

this.promise = new Promise<Result<TState>>(async (resolve) => {
Expand All @@ -190,7 +209,7 @@ export class Runtime<TState> {

logger.debug(
'plan found, will execute the following actions',
toArray(start),
flatten(start),
);

// If we got here, we have found a suitable plan
Expand Down
21 changes: 14 additions & 7 deletions lib/planner/findPlan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@ interface PlanningState<TState = any> {
callStack?: Array<Method<TState>>;
}

function planHasId<T>(id: string, node: Node<T> | null): boolean {
while (node != null) {
if (node.id === id) {
return true;
}
node = node.next;
function findLoop<T>(id: string, node: Node<T> | null): boolean {
if (node == null) {
return false;
}

if (Node.isAction(node)) {
return node.id === id;
}

if (Node.isFork(node)) {
return node.next.some((n) => findLoop(id, n));
}

// If the node is empty, ignore it
return false;
}

Expand All @@ -51,7 +58,7 @@ function tryAction<TState = any>(
const id = node.id;

// Detect loops in the plan
if (planHasId(id, initialPlan.start)) {
if (findLoop(id, initialPlan.start)) {
return { success: false, stats: initialPlan.stats, error: LoopDetected };
}
const state = action.effect(initialPlan.state);
Expand Down
30 changes: 19 additions & 11 deletions lib/planner/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@ export interface Planner<TState = any> {
findPlan(current: TState, target: Target<TState>): Plan<TState>;
}

function reverse<T>(head: Node<T> | null): Node<T> | null {
let curr = head;
let prev: Node<T> | null = null;

while (curr != null) {
const next = curr.next;
curr.next = prev;
prev = curr;
curr = next;
function reversePlan<T>(
curr: Node<T> | null,
prev: Node<T> | null = null,
): Node<T> | null {
if (curr == null) {
return prev;
}
if (Node.isFork(curr)) {
// When reversing a fork node, we are turning the node
// into an empty node. For this reason, we create the empty node
// first that we pass as the `prev` argument to the recursive call to
// reversePlan for each of the children
const empty = { next: prev };
curr.next.map((n) => reversePlan(n, empty));
return empty;
}

return prev;
const next = curr.next;
curr.next = prev;
return reversePlan(next, curr);
}

function of<TState = any>({
Expand Down Expand Up @@ -74,7 +82,7 @@ function of<TState = any>({
});
res.stats = { ...res.stats, time: performance.now() - time };
if (res.success) {
res.start = reverse(res.start);
res.start = reversePlan(res.start);
trace({ event: 'success', start: res.start });
} else {
trace({ event: 'failed' });
Expand Down
45 changes: 41 additions & 4 deletions lib/planner/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { Action } from '../task';
import { Pointer } from '../pointer';

/**
* A node defines a specific step in a plan.
* An action node defines an executable step of a plan
*/
export interface Node<TState> {
export interface ActionNode<TState> {
/**
* Unique id for the node. This is calculated from the
* action metadata and the current runtime state expected
Expand All @@ -20,13 +20,48 @@ export interface Node<TState> {
readonly action: Action<TState, any, any>;

/**
* The next step in the plan
* The next step in the plan.
*/
next: Node<TState> | null;
}

/**
* A fork node defines a branching in the plan created
* by the existence of a parallel task. A fork node can
* have zero or more next nodes.
*/
export interface ForkNode<TState> {
next: Array<Node<TState>>;
}

/**
* An empty node is a node that doesn't specify a specific action but can be
* use to indicate an empty step at the start of the plan, or a joining of the branches
* created by the split node.
*/
export interface EmptyNode<TState> {
next: Node<TState> | null;
}

export type Node<TState> =
| ActionNode<TState>
| ForkNode<TState>
| EmptyNode<TState>;

function isActionNode<TState>(n: Node<TState>): n is ActionNode<TState> {
return (n as ActionNode<TState>).action !== undefined;
}

function isForkNode<TState>(n: Node<TState>): n is ForkNode<TState> {
return (
(n as any).action === undefined &&
(n as any).next !== undefined &&
Array.isArray((n as any).next)
);
}

export const Node = {
of: <TState>(s: TState, a: Action<TState, any, any>): Node<TState> => {
of: <TState>(s: TState, a: Action<TState, any, any>): ActionNode<TState> => {
// We don't use the full state to calculate the
// id as there may be changes in the state that have nothing
// to do with the action. We just use the part of the state
Expand All @@ -52,4 +87,6 @@ export const Node = {
next: null,
};
},
isAction: isActionNode,
isFork: isForkNode,
};
47 changes: 47 additions & 0 deletions lib/testing/builder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,51 @@ describe('testing/builder', () => {
'c',
]);
});

it('builds a plan with parallel branches', () => {
expect(
plan()
.action('a')
.fork()
.branch('b', 'c')
.branch('d')
.join()
.action('f')
.end(),
).to.deep.equal(['a', [['b', 'c'], ['d']], 'f']);

expect(
plan()
.fork()
.action('a')
// A fork within a fork
.fork()
.branch('c')
.branch('d')
.join()
.branch('b')
.join()
.action('f')
.end(),
).to.deep.equal([[['a', [['c'], ['d']]], ['b']], 'f']);
});

it('collapses empty branches', () => {
// Add an empty fork to the plan
expect(plan().action('a').fork().join().action('f').end()).to.deep.equal([
'a',
'f',
]);
expect(
plan()
.action('a')
.fork()
.branch('b', 'c')
// This branch is empty
.branch()
.join()
.action('f')
.end(),
).to.deep.equal(['a', [['b', 'c']], 'f']);
});
});
83 changes: 80 additions & 3 deletions lib/testing/builder.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,86 @@
import { SimplePlan } from './types';

export interface Builder {
interface PlanBuilder {
/**
* Adds a next node to the plan
*/
action(description: string): Builder;
action(description: string): PlanBuilder;

/**
* Adds a sub-plan to the simple plan
* to represent a fork in the current plan
*/
fork(): ForkBuilder;

/**
* Builds the test plan
*/
end(): SimplePlan;
}

interface ForkBuilder<P extends PlanBuilder | ForkBuilder<any> = PlanBuilder> {
/**
* Adds a branch to the fork
*/
branch(...actions: string[]): ForkBuilder<P>;

/**
* Adds an action to the current branch
*/
action(description: string): ForkBuilder<P>;

/**
* Creates a fork within a fork
*/
fork(): ForkBuilder<ForkBuilder<P>>;

/**
* Joins the forked branches and returns
* the original builder
*/
join(): P;
}

function createFork<P extends PlanBuilder | ForkBuilder<any> = PlanBuilder>(
parent: P,
p: SimplePlan = [],
): ForkBuilder<P> {
const repr: SimplePlan = [];
let br: SimplePlan = [];
const f: ForkBuilder<P> = {
branch(...actions: string[]) {
if (br.length > 0) {
repr.push(br);
br = [];
}
br.push(...actions);
return f;
},
action(description: string) {
br.push(description);
return f;
},
fork() {
return createFork(f, br);
},
join() {
if (br.length > 0) {
repr.push(br);
}
if (repr.length > 0) {
p.push(repr);
}
return parent;
},
};

return f;
}

/**
* Start building a plan
*/
export function plan(): Builder {
export function plan(): PlanBuilder {
const repr: SimplePlan = [];

const builder = {
Expand All @@ -25,10 +90,22 @@ export function plan(): Builder {
return builder;
},

fork() {
return createFork(builder, repr);
},

end() {
return repr;
},
};

return builder;
}

export function branch(...values: string[]) {
let b = plan();
for (const a of values) {
b = b.action(a);
}
return b.end();
}
Loading

0 comments on commit ab14029

Please sign in to comment.