Skip to content

Commit

Permalink
Implement planning for parallel tasks
Browse files Browse the repository at this point in the history
WIP

Change-type: minor
  • Loading branch information
pipex committed Aug 16, 2023
1 parent ab14029 commit 1ca4d2e
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 195 deletions.
14 changes: 9 additions & 5 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { setTimeout as delay } from 'timers/promises';
import { diff, patch } from 'mahler-wasm';

import { Observer, Observable } from '../observable';
import { Planner, Node } from '../planner';
import { Sensor, Subscription } from '../sensor';
import { Target } from '../target';
import { Action } from '../task';
import { equals } from '../json';

import {
AgentOpts,
Expand Down Expand Up @@ -101,8 +101,11 @@ export class Runtime<TState> {
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
runtime.state = s;
runtime.observer.next(s);
const changes = diff(runtime.state, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
resolve(runtime.state);
Expand Down Expand Up @@ -142,8 +145,9 @@ export class Runtime<TState> {
// coming from sensors?
logger.info(`${action.description}: running ...`);
const state = await this.runAction(action);
if (!equals(this.state, state)) {
this.state = state;
const changes = diff(this.state, state);
if (changes.length > 0) {
this.state = patch(this.state, changes);

// Notify observer of the new state only if there
// are changes
Expand Down
155 changes: 0 additions & 155 deletions lib/diff.spec.ts

This file was deleted.

45 changes: 45 additions & 0 deletions lib/json.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { expect } from '~/test-utils';
import { merge } from './json';

describe('JSON', () => {
describe('merge', () => {
it('merges simple values', () => {
expect(merge(1, 2)).to.equal(2);
expect(merge('a', 'b')).to.equal('b');
});

it('merges simple arrays', () => {
expect(merge([], [])).to.eql([]);
expect(merge([1, 2, 3], [4, 5, 6])).to.eql([1, 2, 3, 4, 5, 6]);
expect(merge([1, 1], [1, 5, 6])).to.eql([1, 1, 1, 5, 6]);
});

it('merges objects', () => {
expect(merge({}, {})).to.eql({});
expect(merge({ a: 1 }, { b: 2 })).to.eql({ a: 1, b: 2 });
expect(merge({ a: 1 }, { a: 2 })).to.eql({ a: 2 });
expect(merge({ a: 1 }, { a: { b: 2 } })).to.eql({ a: { b: 2 } });
expect(merge({ a: { b: 1 } }, { a: { b: 2 } })).to.eql({ a: { b: 2 } });
expect(merge({ a: { b: 1 } }, { a: { c: 2 } })).to.eql({
a: { b: 1, c: 2 },
});
});

it('merges arrays and objects', () => {
expect(merge([], {})).to.eql({});
expect(merge({}, [])).to.eql({});
expect(merge([1, 2, 3], { a: 1 })).to.eql({ 0: 1, 1: 2, 2: 3, a: 1 });
expect(merge({ a: 1 }, [1, 2, 3])).to.eql({ a: 1, 0: 1, 1: 2, 2: 3 });
expect(merge([1, 2, 3], { a: 1, b: 2 })).to.eql({
0: 1,
1: 2,
2: 3,
a: 1,
b: 2,
});
expect(merge({ a: { b: 1 } }, { a: [1, 2] })).to.eql({
a: { b: 1, 0: 1, 1: 2 },
});
});
});
});
87 changes: 87 additions & 0 deletions lib/planner.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,93 @@ describe('Planner', () => {
);
});

it('solves parallel problems', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).filter((k) => ctx.target[k] - state[k] > 0)
.length > 1,
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 0)
.map((k) => byOne({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });
expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.fork()
.branch('a + 1')
.branch('b + 1')
.join()
.action('a + 1')
.end(),
);
});

it('solves parallel problems with methods', () => {
type Counters = { [k: string]: number };

const byOne = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
description: ({ counter }) => `${counter} + 1`,
});

const byTwo = Task.of({
path: '/:counter',
condition: (state: Counters, ctx) => ctx.target - ctx.get(state) > 1,
method: (_: Counters, ctx) => [byOne({ ...ctx }), byOne({ ...ctx })],
description: ({ counter }) => `increase '${counter}'`,
});

const multiIncrement = Task.of({
condition: (state: Counters, ctx) =>
Object.keys(state).some((k) => ctx.target[k] - state[k] > 1),
parallel: (state: Counters, ctx) =>
Object.keys(state)
.filter((k) => ctx.target[k] - state[k] > 1)
.map((k) => byTwo({ counter: k, target: ctx.target[k] })),
description: `increment counters`,
});

const planner = Planner.of({
tasks: [multiIncrement, byTwo, byOne],
config: { trace: console.trace },
});

const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });

expect(simplified(result)).to.deep.equal(
plan()
.fork()
.branch('a + 1', 'a + 1')
.branch('b + 1', 'b + 1')
.join()
.action('a + 1')
.end(),
);
});

it.skip('simple travel problem', async () => {
// Alice needs to go to the park and may walk or take a taxi. Depending on the distance to the park and
// the available cash, some actions may be possible
Expand Down
Loading

0 comments on commit 1ca4d2e

Please sign in to comment.