diff --git a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts index 63a5b720..95f28ba1 100644 --- a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts @@ -42,7 +42,7 @@ export class ReductionTaskFlow { reductionTask: Task, Output>; mergableFunction: (a: Output, b: Output) => boolean; }, - private readonly flowCreator: FlowCreator + flowCreator: FlowCreator ) { this.flow = flowCreator.createFlow>(options.name, { numMergesCompleted: 0, @@ -121,8 +121,10 @@ export class ReductionTaskFlow { const { availableReductions, touchedIndizes } = this.resolveReducibleTasks(flow.state.queue, options.mergableFunction); - // I don't know exactly what this rule wants from me, I suspect - // it complains bcs the function is called forEach + flow.state.queue = flow.state.queue.filter( + (ignored, index) => !touchedIndizes.includes(index) + ); + await flow.forEach(availableReductions, async (reduction) => { const taskParameters: PairTuple = [reduction.r1, reduction.r2]; await flow.pushTask( @@ -135,10 +137,6 @@ export class ReductionTaskFlow { } ); }); - - flow.state.queue = flow.state.queue.filter( - (ignored, index) => !touchedIndizes.includes(index) - ); } } diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index 57725b4f..d27bee5a 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -1,6 +1,9 @@ import { log, mapSequential, noop } from "@proto-kit/common"; -import { SequencerModule } from "../../sequencer/builder/SequencerModule"; +import { + sequencerModule, + SequencerModule, +} from "../../sequencer/builder/SequencerModule"; import { TaskPayload } from "../flow/Task"; import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue"; @@ -20,6 +23,7 @@ export interface LocalTaskQueueConfig { simulatedDuration?: number; } +@sequencerModule() export class LocalTaskQueue extends SequencerModule implements TaskQueue diff --git a/packages/sequencer/test-integration/workers/workers-proven.test.ts b/packages/sequencer/test-integration/workers/workers-proven.test.ts index e9368bc1..16f60586 100644 --- a/packages/sequencer/test-integration/workers/workers-proven.test.ts +++ b/packages/sequencer/test-integration/workers/workers-proven.test.ts @@ -26,6 +26,8 @@ import { ChildProcessWorker } from "./ChildProcessWorker"; const timeout = 300000; +const proofsEnabled = false; + describe("worker-proven", () => { describe("sequencer", () => { let test: BlockTestService; @@ -87,7 +89,7 @@ describe("worker-proven", () => { try { // Start AppChain const childContainer = container.createChildContainer(); - await app.start(false, childContainer); + await app.start(proofsEnabled, childContainer); test = app.sequencer.dependencyContainer.resolve(BlockTestService); @@ -124,10 +126,33 @@ describe("worker-proven", () => { console.log(batch.proof); - expect(batch.proof.proof.length).toBeGreaterThan(50); + expect(batch.proof.proof.length).toBeGreaterThan( + proofsEnabled ? 50 : 0 + ); expect(batch.blockHashes).toHaveLength(1); }, timeout ); + + it.each([5, 14, 20])( + "should produce a batch of a %s of blocks", + async (numBlocks) => { + for (let i = 0; i < numBlocks; i++) { + await test.produceBlock(); + } + + const batch = await test.produceBatch(); + + expectDefined(batch); + + console.log(batch.proof); + + expect(batch.proof.proof.length).toBeGreaterThan( + proofsEnabled ? 50 : 0 + ); + expect(batch.blockHashes).toHaveLength(numBlocks); + }, + timeout + ); }); }); diff --git a/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts b/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts new file mode 100644 index 00000000..28da2fc4 --- /dev/null +++ b/packages/sequencer/test/protocol/production/flow/ReductionTaskFlow.test.ts @@ -0,0 +1,140 @@ +import "reflect-metadata"; +import { container, DependencyContainer } from "tsyringe"; +import { noop, sleep } from "@proto-kit/common"; + +import { + FlowCreator, + FlowTaskWorker, + JSONTaskSerializer, + LocalTaskQueue, + PairTuple, + ReductionTaskFlow, + Task, + TaskSerializer, + TaskWorkerModule, +} from "../../../../src"; + +type IndexNumber = { + index: number; + value: number; +}; + +type RangeSum = { + from: number; + to: number; + value: number; +}; + +class PairedMulTask + extends TaskWorkerModule + implements Task, RangeSum> +{ + public name = "sum"; + + public inputSerializer(): TaskSerializer> { + return JSONTaskSerializer.fromType>(); + } + + public resultSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public async compute([a, b]: PairTuple): Promise { + return { + from: a.from, + to: b.to, + value: a.value + b.value, + }; + } + + public async prepare(): Promise { + noop(); + } +} + +class NumberIdentityTask + extends TaskWorkerModule + implements Task +{ + public name = "numberIdentity"; + + public inputSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public resultSerializer(): TaskSerializer { + return JSONTaskSerializer.fromType(); + } + + public async compute(input: IndexNumber): Promise { + return { + from: input.index, + to: input.index + 1, + value: input.value, + }; + } + + public async prepare(): Promise { + noop(); + } +} + +describe("ReductionTaskFlow", () => { + let di: DependencyContainer; + beforeAll(async () => { + di = container.createChildContainer(); + + const queue = new LocalTaskQueue(); + queue.config = {}; + + di.register("TaskQueue", { + useValue: queue, + }); + + const worker = new FlowTaskWorker(di.resolve("TaskQueue"), [ + di.resolve(NumberIdentityTask), + di.resolve(PairedMulTask), + ]); + await worker.start(); + }); + + it("regressions - should work for parallel result stream", async () => { + expect.assertions(1); + + const creator = di.resolve(FlowCreator); + const flow = new ReductionTaskFlow( + { + inputLength: 5, + mappingTask: di.resolve(NumberIdentityTask), + reductionTask: di.resolve(PairedMulTask), + name: "test", + mergableFunction: (a, b) => { + return a.to === b.from; + }, + }, + creator + ); + + // eslint-disable-next-line no-async-promise-executor + const result = await new Promise(async (res) => { + flow.onCompletion(async (output) => res(output)); + + await flow.pushInput({ index: 0, value: 1 }); + await flow.pushInput({ index: 1, value: 2 }); + await flow.pushInput({ index: 2, value: 3 }); + + await sleep(100); + + await flow.pushInput({ index: 3, value: 4 }); + await flow.pushInput({ index: 4, value: 0 }); + }); + + const expected: RangeSum = { + from: 0, + to: 5, + value: 1 + 2 + 3 + 4, + }; + + expect(result).toStrictEqual(expected); + }, 1000000); +});