Skip to content

Commit

Permalink
Implement planning for parallel tasks
Browse files Browse the repository at this point in the history
WIP

Change-type: minor
  • Loading branch information
pipex committed Aug 17, 2023
1 parent ab14029 commit 4acfeef
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 56 deletions.
48 changes: 48 additions & 0 deletions lib/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,54 @@ describe('Agent', () => {
// Intermediate states returned by the observable should be emitted by the agent
expect(count).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('runs parallel plans', async () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
action: async (state: Counters, ctx) => {
await setTimeout(100 * Math.random());
return ctx.set(state, ctx.get(state) + 1);
},
description: ({ counter }) => `${counter} + 1`,
});

const byTwo = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.target - ctx.get(state) > 1,
method: (_: Counters, ctx) => [byOne({ ...ctx }), byOne({ ...ctx })],
description: ({ counter }) => `increase '${counter}'`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).some((k) => ctx.target[k] - state[k] > 1),
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 1)
.map((k) => byTwo({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const agent = Agent.of({
initial: { a: 0, b: 0 },
opts: { logger: console, minWaitMs: 1 * 1000 },
tasks: [multiIncrement, byTwo, byOne],
});

agent.seek({ a: 3, b: 2 });

// We wait at most for one cycle to complete, meaning the
// state is reached immediately and the agent terminates after the
// first pause
await expect(agent.wait(1500)).to.eventually.deep.equal({
success: true,
state: { a: 3, b: 2 },
});
});
});

describe('heater', () => {
Expand Down
61 changes: 43 additions & 18 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { setTimeout as delay } from 'timers/promises';
import { diff, patch, Operation as PatchOperation } from 'mahler-wasm';

import { Observer, Observable } from '../observable';
import { Planner, Node } from '../planner';
import { Planner, Node, EmptyNode } from '../planner';
import { Sensor, Subscription } from '../sensor';
import { Target } from '../target';
import { Action } from '../task';
import { equals } from '../json';
import { assert } from '../assert';

import {
AgentOpts,
Expand All @@ -16,6 +17,7 @@ import {
Timeout,
UnknownError,
} from './types';
import { simplified } from '../testing';

/**
* Internal error
Expand Down Expand Up @@ -90,9 +92,12 @@ export class Runtime<TState> {
return result;
}

private async runAction(action: Action) {
private async runAction(action: Action): Promise<PatchOperation[]> {
try {
const res = action(this.state);
// We keep a reference to the previous state, which is
// what we need to compare the updated state to
const before = this.state;
const res = action(before);
if (Observable.is<TState>(res)) {
const runtime = this;
// If the action result is an observable, then
Expand All @@ -101,26 +106,34 @@ export class Runtime<TState> {
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
runtime.state = s;
runtime.observer.next(s);
const changes = diff(before, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
resolve(runtime.state);
// There should be no more changes to perform
// here
resolve([]);
},
error(e) {
reject(e);
},
});
});
} else {
return await res;
const after = await res;
return diff(before, after);
}
} catch (e) {
throw new ActionRunFailed(action, e);
}
}

private async runPlan(node: Node<TState> | null): Promise<void> {
private async runPlan(
node: Node<TState> | null,
): Promise<undefined | EmptyNode<TState>> {
const { logger } = this.opts;

if (node == null) {
Expand All @@ -138,12 +151,16 @@ export class Runtime<TState> {
throw new ActionConditionFailed(action);
}

// QUESTION: do we need to handle concurrency to deal with state changes
// coming from sensors?
logger.info(`${action.description}: running ...`);
const state = await this.runAction(action);
if (!equals(this.state, state)) {
this.state = state;
const changes = await this.runAction(action);
if (changes.length > 0) {
// NOTE: there is a small chance that the state changes while the
// patch is being applied. This means there is potential to lose changes
// by a race (even though patch should be very fast).
// There are two potential solutions here, either we wrap this call in a
// mutex, so only one patch can be applied at a time, or we find a way to update
// the state object in place, so only the relevant parts of the state are updated
this.state = patch(this.state, changes);

// Notify observer of the new state only if there
// are changes
Expand All @@ -155,11 +172,19 @@ export class Runtime<TState> {
}

if (Node.isFork(node)) {
// Run children in parallel
await Promise.all(node.next.map(this.runPlan));
// Run children in parallel. Continue following the plan when reaching the
// empty node only for one of the branches
const [empty] = await Promise.all(node.next.map((n) => this.runPlan(n)));

// There should always be at least one branch in the fork because
// of the way the planner is implemented
assert(empty !== undefined);

return await this.runPlan(empty.next);
}

// Nothing to do if the node is empty
// We return the node
return node;
}

start() {
Expand Down Expand Up @@ -209,7 +234,7 @@ export class Runtime<TState> {

logger.debug(
'plan found, will execute the following actions',
flatten(start),
simplified(result),
);

// If we got here, we have found a suitable plan
Expand Down
133 changes: 133 additions & 0 deletions lib/planner.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,139 @@ describe('Planner', () => {
);
});

it('solves parallel problems', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).filter((k) => ctx.target[k] - state[k] > 0)
.length > 1,
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 0)
.map((k) => byOne({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });
expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.action('a + 1')
.end(),
);
});

it('solves parallel problems with methods', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const byTwo = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.target - ctx.get(state) > 1,
method: (_: Counters, ctx) => [byOne({ ...ctx }), byOne({ ...ctx })],
description: ({ counter }) => `increase '${counter}'`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).some((k) => ctx.target[k] - state[k] > 1),
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 1)
.map((k) => byTwo({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byTwo, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });

expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1', 'a + 1')
.branch('b + 1', 'b + 1')
.join()
.action('a + 1')
.end(),
);
});

it('detects planning conflicts', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const conflictingIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).filter((k) => ctx.target[k] - state[k] > 1)
.length > 1,
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 1)
.flatMap((k) => [
// We create parallel steps to increase the same element of the state
// concurrently
byOne({ counter: k, target: ctx.target[k] }),
byOne({ counter: k, target: ctx.target[k] }),
]),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [conflictingIncrement, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });

// The resulting plan is just the linear version because the parallel version
// will result in a conflict being detected
expect(simplified(result)).to.deep.equal(
plan()
.action('a + 1')
.action('a + 1')
.action('a + 1')
.action('b + 1')
.action('b + 1')
.end(),
);
});

it.skip('simple travel problem', async () => {
// Alice needs to go to the park and may walk or take a taxi. Depending on the distance to the park and
// the available cash, some actions may be possible
Expand Down
Loading

0 comments on commit 4acfeef

Please sign in to comment.