diff --git a/deps.ts b/deps.ts index e8c9551..774464a 100644 --- a/deps.ts +++ b/deps.ts @@ -1,5 +1,6 @@ export * from "https://deno.land/x/willow_utils@0.2.1/mod.ts"; +export { FIFO } from "https://deno.land/x/fifo@v0.2.2/mod.ts"; export { deferred } from "https://deno.land/std@0.202.0/async/deferred.ts"; export { concat } from "https://deno.land/std@0.202.0/bytes/concat.ts"; export { equals as equalsBytes } from "https://deno.land/std@0.202.0/bytes/equals.ts"; diff --git a/src/wgps/ready_transport.test.ts b/src/wgps/ready_transport.test.ts new file mode 100644 index 0000000..9bea678 --- /dev/null +++ b/src/wgps/ready_transport.test.ts @@ -0,0 +1,132 @@ +import { assertEquals } from "https://deno.land/std@0.202.0/assert/mod.ts"; +import { delay } from "https://deno.land/std@0.202.0/async/mod.ts"; +import { ReadyTransport } from "./ready_transport.ts"; +import { transportPairInMemory } from "./transports/in_memory.ts"; +import { concat } from "../../deps.ts"; + +Deno.test("Ready transport receives max payload ", async () => { + // Happy path + { + const [alfie, betty] = transportPairInMemory(); + + const readyTransport = new ReadyTransport({ + transport: alfie, + challengeLength: 4, + }); + + let received = new Uint8Array(); + + (async () => { + for await (const bytes of readyTransport) { + received = concat(received, bytes); + } + })(); + + await betty.send(new Uint8Array([8])); + await betty.send(new Uint8Array([1, 2, 3, 4])); + await betty.send(new Uint8Array([7, 7, 7, 7])); + + const maxPayloadSize = await readyTransport.maximumPayloadSize; + const receivedCommitment = await readyTransport.receivedCommitment; + + assertEquals(maxPayloadSize, BigInt(256)); + assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4])); + + await delay(0); + + assertEquals(received, new Uint8Array([7, 7, 7, 7])); + } + + // All at once. + { + const [alfie, betty] = transportPairInMemory(); + + const readyTransport = new ReadyTransport({ + transport: alfie, + challengeLength: 4, + }); + + let received = new Uint8Array(); + + (async () => { + for await (const bytes of readyTransport) { + received = concat(received, bytes); + } + })(); + + await betty.send(new Uint8Array([8, 1, 2, 3, 4, 7, 7, 7, 7])); + + const maxPayloadSize = await readyTransport.maximumPayloadSize; + const receivedCommitment = await readyTransport.receivedCommitment; + + assertEquals(maxPayloadSize, BigInt(256)); + assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4])); + + await delay(0); + + assertEquals(received, new Uint8Array([7, 7, 7, 7])); + } + + // Partial commitment. + { + const [alfie, betty] = transportPairInMemory(); + + const readyTransport = new ReadyTransport({ + transport: alfie, + challengeLength: 4, + }); + + let received = new Uint8Array(); + + (async () => { + for await (const bytes of readyTransport) { + received = concat(received, bytes); + } + })(); + + await betty.send(new Uint8Array([8, 1, 2])); + await betty.send(new Uint8Array([3, 4, 7, 7, 7, 7])); + + const maxPayloadSize = await readyTransport.maximumPayloadSize; + const receivedCommitment = await readyTransport.receivedCommitment; + + assertEquals(maxPayloadSize, BigInt(256)); + assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4])); + + await delay(0); + + assertEquals(received, new Uint8Array([7, 7, 7, 7])); + } + + // Even more partial commitment. + { + const [alfie, betty] = transportPairInMemory(); + + const readyTransport = new ReadyTransport({ + transport: alfie, + challengeLength: 4, + }); + + let received = new Uint8Array(); + + (async () => { + for await (const bytes of readyTransport) { + received = concat(received, bytes); + } + })(); + + await betty.send(new Uint8Array([8, 1, 2])); + await betty.send(new Uint8Array([3])); + await betty.send(new Uint8Array([4, 7, 7, 7, 7])); + + const maxPayloadSize = await readyTransport.maximumPayloadSize; + const receivedCommitment = await readyTransport.receivedCommitment; + + assertEquals(maxPayloadSize, BigInt(256)); + assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4])); + + await delay(0); + + assertEquals(received, new Uint8Array([7, 7, 7, 7])); + } +}); diff --git a/src/wgps/ready_transport.ts b/src/wgps/ready_transport.ts new file mode 100644 index 0000000..971c4a7 --- /dev/null +++ b/src/wgps/ready_transport.ts @@ -0,0 +1,81 @@ +import { concat, deferred } from "../../deps.ts"; +import { SyncRole, Transport } from "./types.ts"; + +/** A transport which only emits encoded messages, following the initial max payload size and commitment. + * + * _Doesn't_ send our own max payload size and commitment. + */ +export class ReadyTransport implements Transport { + private transport: Transport; + private challengeLength: number; + + role: SyncRole; + + /** The maximum payload size derived from the first byte sent over the transport. */ + maximumPayloadSize = deferred(); + /** The received commitment sent after the first byte over the transport */ + receivedCommitment = deferred(); + + constructor(opts: { + transport: Transport; + challengeLength: 1 | 2 | 4 | 8; + }) { + this.role = opts.transport.role; + this.transport = opts.transport; + this.challengeLength = opts.challengeLength; + } + + send(bytes: Uint8Array): Promise { + return this.transport.send(bytes); + } + + private commitmentAcc: Uint8Array = new Uint8Array(); + + async *[Symbol.asyncIterator]() { + for await (const bytes of this.transport) { + if ( + this.maximumPayloadSize.state === "fulfilled" && + this.receivedCommitment.state === "fulfilled" + ) { + yield bytes; + } + + if (this.maximumPayloadSize.state === "pending") { + const view = new DataView(bytes.buffer); + + const power = view.getUint8(0); + + this.maximumPayloadSize.resolve(BigInt(2) ** BigInt(power)); + + const rest = bytes.slice(1); + + if (rest.byteLength < this.challengeLength) { + this.commitmentAcc = rest; + } else if (rest.byteLength === this.challengeLength) { + this.receivedCommitment.resolve(rest); + } else { + this.receivedCommitment.resolve(rest.slice(0, this.challengeLength)); + yield rest.slice(this.challengeLength); + } + + continue; + } + + if (this.receivedCommitment.state === "pending") { + const combined = concat(this.commitmentAcc, bytes); + + if (combined.byteLength === this.challengeLength) { + this.receivedCommitment.resolve(combined); + } else if (combined.byteLength < this.challengeLength) { + this.commitmentAcc = combined; + } else { + this.receivedCommitment.resolve( + combined.slice(0, this.challengeLength), + ); + + yield combined.slice(this.challengeLength); + } + } + } + } +} diff --git a/src/wgps/transports/in_memory.ts b/src/wgps/transports/in_memory.ts new file mode 100644 index 0000000..c693954 --- /dev/null +++ b/src/wgps/transports/in_memory.ts @@ -0,0 +1,44 @@ +import { FIFO } from "../../../deps.ts"; +import { IS_ALFIE, IS_BETTY, SyncRole, Transport } from "../types.ts"; + +export class TransportInMemory implements Transport { + private incoming: FIFO; + private outgoing: FIFO; + + role: SyncRole; + + constructor( + syncRole: SyncRole, + incoming: FIFO, + outgoing: FIFO, + ) { + this.role = syncRole; + this.incoming = incoming; + this.outgoing = outgoing; + } + + send(bytes: Uint8Array): Promise { + this.outgoing.push(bytes); + + return Promise.resolve(); + } + + async *[Symbol.asyncIterator]() { + for await (const bytes of this.incoming) { + yield bytes; + } + } +} + +export function transportPairInMemory(): [ + TransportInMemory, + TransportInMemory, +] { + const alfie = new FIFO(); + const betty = new FIFO(); + + const alfieTransport = new TransportInMemory(IS_ALFIE, alfie, betty); + const bettyTransport = new TransportInMemory(IS_BETTY, betty, alfie); + + return [alfieTransport, bettyTransport]; +} diff --git a/src/wgps/types.ts b/src/wgps/types.ts new file mode 100644 index 0000000..d6c7552 --- /dev/null +++ b/src/wgps/types.ts @@ -0,0 +1,17 @@ +/** The peer which initiated the synchronisation session. */ +export const IS_ALFIE = Symbol("alfie"); +/** The peer which did not initiate the synchronisation session. */ +export const IS_BETTY = Symbol("betty"); + +/** we refer to the peer that initiated the synchronisation session as Alfie, and the other peer as Betty. */ +export type SyncRole = typeof IS_ALFIE | typeof IS_BETTY; + +/** A transport for receiving and sending data to with another peer */ +export interface Transport { + /** Whether this transport comes from the initiating party (Alfie), or not (Betty). */ + role: SyncRole; + /** Send bytes to the other peer using this transport. */ + send(bytes: Uint8Array): Promise; + /** An async iterator of bytes received from the other peer via this transport. */ + [Symbol.asyncIterator](): AsyncIterator; +}