Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Effects API experiments #33

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .eslintrc.json

This file was deleted.

37 changes: 24 additions & 13 deletions lib/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect, console } from '~/test-utils';
import { Agent } from './agent';
import { Task, NoAction } from './task';
import { Sensor, Subscriber } from './sensor';
import { Observable } from './observable';
import { IO, fromPipe, when } from './effects';

import { setTimeout } from 'timers/promises';

Expand All @@ -24,7 +24,7 @@ describe('Agent', () => {
});
agent.seek({ never: true });
await expect(agent.wait(1000)).to.be.rejected;
await agent.stop();
agent.stop();
});

it('it continues looking for plan unless max retries is set', async () => {
Expand Down Expand Up @@ -64,18 +64,29 @@ describe('Agent', () => {
expect(count).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('it allows to use observables as actions', async () => {
it('allows to use observables as actions', async () => {
const counter = Task.of({
condition: (state: number, { target }) => state < target,
effect: (_: number, { target }) => target,
action: (state: number, { target }) =>
Observable.of(async (s) => {
while (state < target) {
state = state + 1;
s.next(state);
await setTimeout(10);
}
}),
effect: (state: number, { target }) =>
fromPipe(
state,
when(
(s) => s < target,
(s) =>
// this IO call uses a generator function as the async
// side, to produce multiple values while the computation is performed
// The sync side just tells us that the effect of the computation is that
// the counter reaches the target
IO(
async function* () {
while (s < target) {
yield ++s;
await setTimeout(10);
}
},
() => target,
),
),
),
});
const agent = Agent.of({
initial: 0,
Expand Down
4 changes: 2 additions & 2 deletions lib/agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from '../assert';
import { NullLogger } from '../logger';
import { Observable, Subject } from '../observable';
import { Subscribable, Subject } from '../observable';
import { Planner } from '../planner';
import { Sensor } from '../sensor';
import { Target } from '../target';
Expand All @@ -10,7 +10,7 @@ import { AgentOpts, NotStarted, Result } from './types';

export * from './types';

export interface Agent<TState = any> extends Observable<TState> {
export interface Agent<TState = any> extends Subscribable<TState> {
/**
* Tells the agent to seek a new target.
*
Expand Down
51 changes: 23 additions & 28 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { setTimeout as delay } from 'timers/promises';
import { diff, patch, Operation as PatchOperation } from 'mahler-wasm';

import { Observer, Observable } from '../observable';
import { Observer } from '../observable';
import { Planner, Node, EmptyNode } from '../planner';
import { Sensor, Subscription } from '../sensor';
import { Target } from '../target';
Expand Down Expand Up @@ -100,34 +100,29 @@ export class Runtime<TState> {
// what we need to compare the updated state to
const before = this.state;
const res = action(before);
if (Observable.is<TState>(res)) {
const runtime = this;
// If the action result is an observable, then
// we need to subscribe to it and update the internal
// state as the observable emits new values
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
const changes = diff(before, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
// There should be no more changes to perform
// here
resolve([]);
},
error(e) {
reject(e);
},
});
const runtime = this;
// If the action result is an observable, then
// we need to subscribe to it and update the internal
// state as the observable emits new values
return new Promise((resolve, reject) => {
res.subscribe({
next(s) {
const changes = diff(before, s);
if (changes.length > 0) {
runtime.state = patch(runtime.state, changes);
runtime.observer.next(runtime.state);
}
},
complete() {
// There should be no more changes to perform
// here
resolve([]);
},
error(e) {
reject(e);
},
});
} else {
const after = await res;
return diff(before, after);
}
});
} catch (e) {
throw new ActionRunFailed(action, e);
}
Expand Down
Empty file added lib/effects.spec.ts
Empty file.
Loading