Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Reduction Flow halt #235

Open
wants to merge 2 commits into
base: feature/proven-settlemnet
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class ReductionTaskFlow<Input, Output> {
reductionTask: Task<PairTuple<Output>, Output>;
mergableFunction: (a: Output, b: Output) => boolean;
},
private readonly flowCreator: FlowCreator
flowCreator: FlowCreator
) {
this.flow = flowCreator.createFlow<ReductionState<Output>>(options.name, {
numMergesCompleted: 0,
Expand Down Expand Up @@ -121,8 +121,10 @@ export class ReductionTaskFlow<Input, Output> {
const { availableReductions, touchedIndizes } =
this.resolveReducibleTasks(flow.state.queue, options.mergableFunction);

// I don't know exactly what this rule wants from me, I suspect
// it complains bcs the function is called forEach
flow.state.queue = flow.state.queue.filter(
(ignored, index) => !touchedIndizes.includes(index)
);

await flow.forEach(availableReductions, async (reduction) => {
const taskParameters: PairTuple<Output> = [reduction.r1, reduction.r2];
await flow.pushTask(
Expand All @@ -135,10 +137,6 @@ export class ReductionTaskFlow<Input, Output> {
}
);
});

flow.state.queue = flow.state.queue.filter(
(ignored, index) => !touchedIndizes.includes(index)
);
}
}

Expand Down
6 changes: 5 additions & 1 deletion packages/sequencer/src/worker/queue/LocalTaskQueue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { log, mapSequential, noop } from "@proto-kit/common";

import { SequencerModule } from "../../sequencer/builder/SequencerModule";
import {
sequencerModule,
SequencerModule,
} from "../../sequencer/builder/SequencerModule";
import { TaskPayload } from "../flow/Task";

import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue";
Expand All @@ -20,6 +23,7 @@ export interface LocalTaskQueueConfig {
simulatedDuration?: number;
}

@sequencerModule()
export class LocalTaskQueue
extends SequencerModule<LocalTaskQueueConfig>
implements TaskQueue
Expand Down
29 changes: 27 additions & 2 deletions packages/sequencer/test-integration/workers/workers-proven.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { ChildProcessWorker } from "./ChildProcessWorker";

const timeout = 300000;

const proofsEnabled = false;

describe("worker-proven", () => {
describe("sequencer", () => {
let test: BlockTestService;
Expand Down Expand Up @@ -87,7 +89,7 @@ describe("worker-proven", () => {
try {
// Start AppChain
const childContainer = container.createChildContainer();
await app.start(false, childContainer);
await app.start(proofsEnabled, childContainer);

test = app.sequencer.dependencyContainer.resolve(BlockTestService);

Expand Down Expand Up @@ -124,10 +126,33 @@ describe("worker-proven", () => {

console.log(batch.proof);

expect(batch.proof.proof.length).toBeGreaterThan(50);
expect(batch.proof.proof.length).toBeGreaterThan(
proofsEnabled ? 50 : 0
);
expect(batch.blockHashes).toHaveLength(1);
},
timeout
);

it.each([5, 14, 20])(
"should produce a batch of a %s of blocks",
async (numBlocks) => {
for (let i = 0; i < numBlocks; i++) {
await test.produceBlock();
}

const batch = await test.produceBatch();

expectDefined(batch);

console.log(batch.proof);

expect(batch.proof.proof.length).toBeGreaterThan(
proofsEnabled ? 50 : 0
);
expect(batch.blockHashes).toHaveLength(numBlocks);
},
timeout
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import "reflect-metadata";
import { container, DependencyContainer } from "tsyringe";
import { noop, sleep } from "@proto-kit/common";

import {
FlowCreator,
FlowTaskWorker,
JSONTaskSerializer,
LocalTaskQueue,
PairTuple,
ReductionTaskFlow,
Task,
TaskSerializer,
TaskWorkerModule,
} from "../../../../src";

type IndexNumber = {
index: number;
value: number;
};

type RangeSum = {
from: number;
to: number;
value: number;
};

class PairedMulTask
extends TaskWorkerModule
implements Task<PairTuple<RangeSum>, RangeSum>
{
public name = "sum";

public inputSerializer(): TaskSerializer<PairTuple<RangeSum>> {
return JSONTaskSerializer.fromType<PairTuple<RangeSum>>();
}

public resultSerializer(): TaskSerializer<RangeSum> {
return JSONTaskSerializer.fromType<RangeSum>();
}

public async compute([a, b]: PairTuple<RangeSum>): Promise<RangeSum> {
return {
from: a.from,
to: b.to,
value: a.value + b.value,
};
}

public async prepare(): Promise<void> {
noop();
}
}

class NumberIdentityTask
extends TaskWorkerModule
implements Task<IndexNumber, RangeSum>
{
public name = "numberIdentity";

public inputSerializer(): TaskSerializer<IndexNumber> {
return JSONTaskSerializer.fromType<IndexNumber>();
}

public resultSerializer(): TaskSerializer<RangeSum> {
return JSONTaskSerializer.fromType<RangeSum>();
}

public async compute(input: IndexNumber): Promise<RangeSum> {
return {
from: input.index,
to: input.index + 1,
value: input.value,
};
}

public async prepare(): Promise<void> {
noop();
}
}

describe("ReductionTaskFlow", () => {
let di: DependencyContainer;
beforeAll(async () => {
di = container.createChildContainer();

const queue = new LocalTaskQueue();
queue.config = {};

di.register("TaskQueue", {
useValue: queue,
});

const worker = new FlowTaskWorker(di.resolve("TaskQueue"), [
di.resolve(NumberIdentityTask),
di.resolve(PairedMulTask),
]);
await worker.start();
});

it("regressions - should work for parallel result stream", async () => {
expect.assertions(1);

const creator = di.resolve(FlowCreator);
const flow = new ReductionTaskFlow<IndexNumber, RangeSum>(
{
inputLength: 5,
mappingTask: di.resolve(NumberIdentityTask),
reductionTask: di.resolve(PairedMulTask),
name: "test",
mergableFunction: (a, b) => {
return a.to === b.from;
},
},
creator
);

// eslint-disable-next-line no-async-promise-executor
const result = await new Promise(async (res) => {
flow.onCompletion(async (output) => res(output));

await flow.pushInput({ index: 0, value: 1 });
await flow.pushInput({ index: 1, value: 2 });
await flow.pushInput({ index: 2, value: 3 });

await sleep(100);

await flow.pushInput({ index: 3, value: 4 });
await flow.pushInput({ index: 4, value: 0 });
});

const expected: RangeSum = {
from: 0,
to: 5,
value: 1 + 2 + 3 + 4,
};

expect(result).toStrictEqual(expected);
}, 1000000);
});