Skip to content

Commit

Permalink
Merge pull request #11 from earthstar-project/wgps-transports
Browse files Browse the repository at this point in the history
Transport type, `TransportInMemory`, `ReadyTransport`
  • Loading branch information
sgwilym authored Feb 13, 2024
2 parents 048de50 + 0679456 commit fcc6974
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 0 deletions.
1 change: 1 addition & 0 deletions deps.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "https://deno.land/x/[email protected]/mod.ts";

export { FIFO } from "https://deno.land/x/[email protected]/mod.ts";
export { deferred } from "https://deno.land/[email protected]/async/deferred.ts";
export { concat } from "https://deno.land/[email protected]/bytes/concat.ts";
export { equals as equalsBytes } from "https://deno.land/[email protected]/bytes/equals.ts";
Expand Down
132 changes: 132 additions & 0 deletions src/wgps/ready_transport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { assertEquals } from "https://deno.land/[email protected]/assert/mod.ts";
import { delay } from "https://deno.land/[email protected]/async/mod.ts";
import { ReadyTransport } from "./ready_transport.ts";
import { transportPairInMemory } from "./transports/in_memory.ts";
import { concat } from "../../deps.ts";

Deno.test("Ready transport receives max payload ", async () => {
// Happy path
{
const [alfie, betty] = transportPairInMemory();

const readyTransport = new ReadyTransport({
transport: alfie,
challengeLength: 4,
});

let received = new Uint8Array();

(async () => {
for await (const bytes of readyTransport) {
received = concat(received, bytes);
}
})();

await betty.send(new Uint8Array([8]));
await betty.send(new Uint8Array([1, 2, 3, 4]));
await betty.send(new Uint8Array([7, 7, 7, 7]));

const maxPayloadSize = await readyTransport.maximumPayloadSize;
const receivedCommitment = await readyTransport.receivedCommitment;

assertEquals(maxPayloadSize, BigInt(256));
assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4]));

await delay(0);

assertEquals(received, new Uint8Array([7, 7, 7, 7]));
}

// All at once.
{
const [alfie, betty] = transportPairInMemory();

const readyTransport = new ReadyTransport({
transport: alfie,
challengeLength: 4,
});

let received = new Uint8Array();

(async () => {
for await (const bytes of readyTransport) {
received = concat(received, bytes);
}
})();

await betty.send(new Uint8Array([8, 1, 2, 3, 4, 7, 7, 7, 7]));

const maxPayloadSize = await readyTransport.maximumPayloadSize;
const receivedCommitment = await readyTransport.receivedCommitment;

assertEquals(maxPayloadSize, BigInt(256));
assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4]));

await delay(0);

assertEquals(received, new Uint8Array([7, 7, 7, 7]));
}

// Partial commitment.
{
const [alfie, betty] = transportPairInMemory();

const readyTransport = new ReadyTransport({
transport: alfie,
challengeLength: 4,
});

let received = new Uint8Array();

(async () => {
for await (const bytes of readyTransport) {
received = concat(received, bytes);
}
})();

await betty.send(new Uint8Array([8, 1, 2]));
await betty.send(new Uint8Array([3, 4, 7, 7, 7, 7]));

const maxPayloadSize = await readyTransport.maximumPayloadSize;
const receivedCommitment = await readyTransport.receivedCommitment;

assertEquals(maxPayloadSize, BigInt(256));
assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4]));

await delay(0);

assertEquals(received, new Uint8Array([7, 7, 7, 7]));
}

// Even more partial commitment.
{
const [alfie, betty] = transportPairInMemory();

const readyTransport = new ReadyTransport({
transport: alfie,
challengeLength: 4,
});

let received = new Uint8Array();

(async () => {
for await (const bytes of readyTransport) {
received = concat(received, bytes);
}
})();

await betty.send(new Uint8Array([8, 1, 2]));
await betty.send(new Uint8Array([3]));
await betty.send(new Uint8Array([4, 7, 7, 7, 7]));

const maxPayloadSize = await readyTransport.maximumPayloadSize;
const receivedCommitment = await readyTransport.receivedCommitment;

assertEquals(maxPayloadSize, BigInt(256));
assertEquals(receivedCommitment, new Uint8Array([1, 2, 3, 4]));

await delay(0);

assertEquals(received, new Uint8Array([7, 7, 7, 7]));
}
});
81 changes: 81 additions & 0 deletions src/wgps/ready_transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { concat, deferred } from "../../deps.ts";
import { SyncRole, Transport } from "./types.ts";

/** A transport which only emits encoded messages, following the initial max payload size and commitment.
*
* _Doesn't_ send our own max payload size and commitment.
*/
export class ReadyTransport implements Transport {
private transport: Transport;
private challengeLength: number;

role: SyncRole;

/** The maximum payload size derived from the first byte sent over the transport. */
maximumPayloadSize = deferred<bigint>();
/** The received commitment sent after the first byte over the transport */
receivedCommitment = deferred<Uint8Array>();

constructor(opts: {
transport: Transport;
challengeLength: 1 | 2 | 4 | 8;
}) {
this.role = opts.transport.role;
this.transport = opts.transport;
this.challengeLength = opts.challengeLength;
}

send(bytes: Uint8Array): Promise<void> {
return this.transport.send(bytes);
}

private commitmentAcc: Uint8Array = new Uint8Array();

async *[Symbol.asyncIterator]() {
for await (const bytes of this.transport) {
if (
this.maximumPayloadSize.state === "fulfilled" &&
this.receivedCommitment.state === "fulfilled"
) {
yield bytes;
}

if (this.maximumPayloadSize.state === "pending") {
const view = new DataView(bytes.buffer);

const power = view.getUint8(0);

this.maximumPayloadSize.resolve(BigInt(2) ** BigInt(power));

const rest = bytes.slice(1);

if (rest.byteLength < this.challengeLength) {
this.commitmentAcc = rest;
} else if (rest.byteLength === this.challengeLength) {
this.receivedCommitment.resolve(rest);
} else {
this.receivedCommitment.resolve(rest.slice(0, this.challengeLength));
yield rest.slice(this.challengeLength);
}

continue;
}

if (this.receivedCommitment.state === "pending") {
const combined = concat(this.commitmentAcc, bytes);

if (combined.byteLength === this.challengeLength) {
this.receivedCommitment.resolve(combined);
} else if (combined.byteLength < this.challengeLength) {
this.commitmentAcc = combined;
} else {
this.receivedCommitment.resolve(
combined.slice(0, this.challengeLength),
);

yield combined.slice(this.challengeLength);
}
}
}
}
}
44 changes: 44 additions & 0 deletions src/wgps/transports/in_memory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { FIFO } from "../../../deps.ts";
import { IS_ALFIE, IS_BETTY, SyncRole, Transport } from "../types.ts";

export class TransportInMemory implements Transport {
private incoming: FIFO<Uint8Array>;
private outgoing: FIFO<Uint8Array>;

role: SyncRole;

constructor(
syncRole: SyncRole,
incoming: FIFO<Uint8Array>,
outgoing: FIFO<Uint8Array>,
) {
this.role = syncRole;
this.incoming = incoming;
this.outgoing = outgoing;
}

send(bytes: Uint8Array): Promise<void> {
this.outgoing.push(bytes);

return Promise.resolve();
}

async *[Symbol.asyncIterator]() {
for await (const bytes of this.incoming) {
yield bytes;
}
}
}

export function transportPairInMemory(): [
TransportInMemory,
TransportInMemory,
] {
const alfie = new FIFO<Uint8Array>();
const betty = new FIFO<Uint8Array>();

const alfieTransport = new TransportInMemory(IS_ALFIE, alfie, betty);
const bettyTransport = new TransportInMemory(IS_BETTY, betty, alfie);

return [alfieTransport, bettyTransport];
}
17 changes: 17 additions & 0 deletions src/wgps/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/** The peer which initiated the synchronisation session. */
export const IS_ALFIE = Symbol("alfie");
/** The peer which did not initiate the synchronisation session. */
export const IS_BETTY = Symbol("betty");

/** we refer to the peer that initiated the synchronisation session as Alfie, and the other peer as Betty. */
export type SyncRole = typeof IS_ALFIE | typeof IS_BETTY;

/** A transport for receiving and sending data to with another peer */
export interface Transport {
/** Whether this transport comes from the initiating party (Alfie), or not (Betty). */
role: SyncRole;
/** Send bytes to the other peer using this transport. */
send(bytes: Uint8Array): Promise<void>;
/** An async iterator of bytes received from the other peer via this transport. */
[Symbol.asyncIterator](): AsyncIterator<Uint8Array>;
}

0 comments on commit fcc6974

Please sign in to comment.