Skip to content

Commit

Permalink
Merge recon-termination-payload into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed May 2, 2024
2 parents 69c1906 + d45f19f commit 89a1002
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 57 deletions.
3 changes: 3 additions & 0 deletions src/wgps/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export const msgLogicalChannels: Record<MsgKind, LogicalChannel | null> = {
[MsgKind.ReconciliationSendFingerprint]: LogicalChannel.ReconciliationChannel,
[MsgKind.ReconciliationAnnounceEntries]: LogicalChannel.ReconciliationChannel,
[MsgKind.ReconciliationSendEntry]: LogicalChannel.ReconciliationChannel,
[MsgKind.ReconciliationSendPayload]: LogicalChannel.ReconciliationChannel,
[MsgKind.ReconciliationTerminatePayload]:
LogicalChannel.ReconciliationChannel,
[MsgKind.DataSendEntry]: LogicalChannel.DataChannel,
[MsgKind.DataSendPayload]: LogicalChannel.DataChannel,
[MsgKind.DataReplyPayload]: LogicalChannel.DataChannel,
Expand Down
3 changes: 0 additions & 3 deletions src/wgps/data/data_sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ export class DataSender<
bytes: transformed,
};
}

console.groupEnd();
console.groupEnd();
}
}
}
38 changes: 35 additions & 3 deletions src/wgps/data/payload_ingester.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Entry, FIFO } from "../../../deps.ts";
import { concat, Entry, FIFO } from "../../../deps.ts";
import { WgpsMessageValidationError } from "../../errors.ts";
import { onAsyncIterate } from "../util.ts";
import { GetStoreFn } from "../wgps_messenger.ts";

const CANCELLATION = Symbol("cancellation");

// This class can handle both the payload sending procedures for payloads sent via reconciliation AND data channels. It would probably be better to split them up.
export class PayloadIngester<
Prefingerprint,
Fingerprint,
Expand All @@ -18,6 +19,8 @@ export class PayloadIngester<
private currentEntry:
| Entry<NamespaceId, SubspaceId, PayloadDigest>
| undefined;
private currentlyReceivedLength = 0n;

private events = new FIFO<
Uint8Array | {
entry: Entry<NamespaceId, SubspaceId, PayloadDigest>;
Expand All @@ -27,6 +30,9 @@ export class PayloadIngester<
bytes: Uint8Array,
entryLength: bigint,
) => Uint8Array;
private entryToRequestPayloadFor:
| Entry<NamespaceId, SubspaceId, PayloadDigest>
| null = null;

constructor(opts: {
getStore: GetStoreFn<
Expand Down Expand Up @@ -55,14 +61,18 @@ export class PayloadIngester<
const store = await opts.getStore(event.entry.namespaceId);

this.currentEntry = event.entry;
this.currentlyReceivedLength = 0n;

store.ingestPayload({
path: event.entry.path,
subspace: event.entry.subspaceId,
timestamp: event.entry.timestamp,
}, new CancellableIngestion(this.currentIngestion)).then(
(ingestEvent) => {
if (ingestEvent.kind === "failure") {
if (
ingestEvent.kind === "failure" &&
this.currentlyReceivedLength === event.entry.payloadLength
) {
throw new WgpsMessageValidationError(
"Ingestion failed: " + ingestEvent.reason,
);
Expand All @@ -75,13 +85,22 @@ export class PayloadIngester<
this.currentEntry!.payloadLength,
);

this.currentlyReceivedLength += BigInt(transformed.byteLength);

this.currentIngestion.push(transformed);
}
});
}

target(entry: Entry<NamespaceId, SubspaceId, PayloadDigest>) {
target(
entry: Entry<NamespaceId, SubspaceId, PayloadDigest>,
requestIfImmediatelyTerminated?: boolean,
) {
this.events.push({ entry });

if (requestIfImmediatelyTerminated) {
this.entryToRequestPayloadFor = entry;
}
}

push(bytes: Uint8Array, end: boolean) {
Expand All @@ -90,6 +109,15 @@ export class PayloadIngester<
if (end) {
this.events.push(CANCELLATION);
}

this.entryToRequestPayloadFor = null;
}

// Returns the entry to request a payload for or null
terminate(): Entry<NamespaceId, SubspaceId, PayloadDigest> | null {
this.events.push(CANCELLATION);

return this.entryToRequestPayloadFor;
}
}

Expand All @@ -100,9 +128,13 @@ class CancellableIngestion {

async *[Symbol.asyncIterator]() {
for await (const event of this.iterable) {
let bytes = new Uint8Array();

if (event === CANCELLATION) {
break;
} else {
bytes = concat(bytes, event);

yield event;
}
}
Expand Down
39 changes: 32 additions & 7 deletions src/wgps/decoding/decode_messages.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { Area, Entry, GrowingBytes } from "../../../deps.ts";
import { ReadCapPrivy, SyncMessage, SyncSchemes, Transport } from "../types.ts";
import {
MsgKind,
MsgReconciliationTerminatePayload,
ReadCapPrivy,
SyncMessage,
SyncSchemes,
Transport,
} from "../types.ts";
import { decodeCommitmentReveal } from "./commitment_reveal.ts";
import {
decodeControlAbsolve,
Expand All @@ -24,6 +31,7 @@ import {
decodeReconciliationAnnounceEntries,
decodeReconciliationSendEntry,
decodeReconciliationSendFingerprint,
decodeReconciliationSendPayload,
} from "./reconciliation.ts";
import {
ReconcileMsgTracker,
Expand All @@ -36,6 +44,7 @@ import {
decodeDataSendPayload,
decodeDataSetEagerness,
} from "./data.ts";
import { WgpsMessageValidationError } from "../../errors.ts";

export type DecodeMessagesOpts<
ReadCapability,
Expand Down Expand Up @@ -167,7 +176,7 @@ export async function* decodeMessages<
// Find out the type of decoder to use by bitmasking the first byte of the message.
const [firstByte] = bytes.array;

if (firstByte === 0) {
if (firstByte === 0x0) {
yield await decodeCommitmentReveal(
bytes,
opts.challengeLength,
Expand Down Expand Up @@ -224,10 +233,27 @@ export async function* decodeMessages<
aoiHandlesToArea: opts.aoiHandlesToArea,
});
} else if ((firstByte & 0x50) === 0x50) {
// Reconciliation Announce Entries
// OR a send entry. It all depends on what we are expecting...
// ReconciliationAnnounceEntries
// OR ReconciliationSendEntry

// OR ReconciliationSendPayload
// OR ReconciliationTerminatePayload.

// All depends on what we're expecting.

if (reconcileMsgTracker.isExpectingPayloadOrTermination()) {
if ((firstByte & 0x58) === 0x58) {
reconcileMsgTracker.onTerminatePayload();
// It's a terminate message.

if (reconcileMsgTracker.isExpectingReconciliationSendEntry()) {
bytes.prune(1);
yield {
kind: MsgKind.ReconciliationTerminatePayload,
};
} else {
yield await decodeReconciliationSendPayload(bytes);
}
} else if (reconcileMsgTracker.isExpectingReconciliationSendEntry()) {
const message = await decodeReconciliationSendEntry(
bytes,
{
Expand Down Expand Up @@ -333,8 +359,7 @@ export async function* decodeMessages<
);
} else {
// Couldn't decode.
console.warn("Could not decode!");
break;
throw new WgpsMessageValidationError("Could not decode!");
}
}
}
34 changes: 34 additions & 0 deletions src/wgps/decoding/reconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import {
MsgReconciliationAnnounceEntries,
MsgReconciliationSendEntry,
MsgReconciliationSendFingerprint,
MsgReconciliationSendPayload,
ReconciliationPrivy,
} from "../types.ts";
import { compactWidthFromEndOfByte } from "./util.ts";

export async function decodeReconciliationSendFingerprint<
Fingerprint,
Expand Down Expand Up @@ -376,3 +378,35 @@ export async function decodeReconciliationSendEntry<
staticTokenHandle,
};
}

export async function decodeReconciliationSendPayload(
bytes: GrowingBytes,
): Promise<MsgReconciliationSendPayload> {
await bytes.nextAbsolute(1);

const amountCompactWidth = compactWidthFromEndOfByte(bytes.array[0]);

await bytes.nextAbsolute(1 + amountCompactWidth);

const amount = decodeCompactWidth(
bytes.array.subarray(1, 1 + amountCompactWidth),
);

bytes.prune(1 + amountCompactWidth);

await bytes.nextAbsolute(Number(amount));

const messageBytes = bytes.array.slice(0, Number(amount));

bytes.prune(Number(amount));

return {
kind: MsgKind.ReconciliationSendPayload,
amount: BigInt(amount),
bytes: messageBytes,
};
}

// Don't need to decode ReconciliationTerminatePayload
// as there's nothing in the message and we decode the message type
// in decode_messages.
17 changes: 17 additions & 0 deletions src/wgps/encoding.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
MsgKind,
MsgReconciliationAnnounceEntries,
MsgReconciliationSendEntry,
MsgReconciliationTerminatePayload,
SyncMessage,
SyncSchemes,
} from "./types.ts";
Expand Down Expand Up @@ -489,6 +490,7 @@ const sendEntryVectors: (
TestSubspace,
ArrayBuffer
>
| MsgReconciliationTerminatePayload
)[] = [
{
kind: MsgKind.ReconciliationAnnounceEntries,
Expand Down Expand Up @@ -528,6 +530,9 @@ const sendEntryVectors: (
},
},
},
{
kind: MsgKind.ReconciliationTerminatePayload,
},
{
kind: MsgKind.ReconciliationAnnounceEntries,
receiverHandle: 0n,
Expand Down Expand Up @@ -589,6 +594,9 @@ const sendEntryVectors: (
},
},
},
{
kind: MsgKind.ReconciliationTerminatePayload,
},
{
kind: MsgKind.ReconciliationSendEntry,
dynamicToken: crypto.getRandomValues(new Uint8Array(32)),
Expand All @@ -605,6 +613,9 @@ const sendEntryVectors: (
},
},
},
{
kind: MsgKind.ReconciliationTerminatePayload,
},
{
kind: MsgKind.ReconciliationSendEntry,
dynamicToken: crypto.getRandomValues(new Uint8Array(32)),
Expand All @@ -621,6 +632,9 @@ const sendEntryVectors: (
},
},
},
{
kind: MsgKind.ReconciliationTerminatePayload,
},
{
kind: MsgKind.ReconciliationSendEntry,
dynamicToken: crypto.getRandomValues(new Uint8Array(32)),
Expand All @@ -637,6 +651,9 @@ const sendEntryVectors: (
},
},
},
{
kind: MsgKind.ReconciliationTerminatePayload,
},
];

Deno.test("Encoding roundtrip test", async () => {
Expand Down
12 changes: 12 additions & 0 deletions src/wgps/encoding/message_encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import {
encodeReconciliationAnnounceEntries,
encodeReconciliationSendEntry,
encodeReconciliationSendFingerprint,
encodeReconciliationSendPayload,
encodeReconciliationTerminatePayload,
} from "./reconciliation.ts";
import {
ReconcileMsgTracker,
Expand Down Expand Up @@ -295,6 +297,16 @@ export class MessageEncoder<
break;
}

case MsgKind.ReconciliationSendPayload: {
bytes = encodeReconciliationSendPayload(message);
break;
}

case MsgKind.ReconciliationTerminatePayload: {
bytes = encodeReconciliationTerminatePayload();
break;
}

// Data

case MsgKind.DataSendEntry: {
Expand Down
15 changes: 15 additions & 0 deletions src/wgps/encoding/reconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
MsgReconciliationAnnounceEntries,
MsgReconciliationSendEntry,
MsgReconciliationSendFingerprint,
MsgReconciliationSendPayload,
MsgReconciliationTerminatePayload,
ReconciliationPrivy,
} from "../types.ts";
import { compactWidthOr } from "./util.ts";
Expand Down Expand Up @@ -291,3 +293,16 @@ export function encodeReconciliationSendEntry<
encodedRelativeEntry,
);
}

export function encodeReconciliationSendPayload(
msg: MsgReconciliationSendPayload,
): Uint8Array {
const header = compactWidthOr(0x50, compactWidth(msg.amount));
const amountEncoded = encodeCompactWidth(msg.amount);

return concat(new Uint8Array([header]), amountEncoded, msg.bytes);
}

export function encodeReconciliationTerminatePayload(): Uint8Array {
return new Uint8Array([0x58]);
}
12 changes: 12 additions & 0 deletions src/wgps/reconciliation/reconcile_msg_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export class ReconcileMsgTracker<

private handleToNamespaceId: (aoiHandle: bigint) => NamespaceId;

private isAwaitingTermination = false;

constructor(
opts: ReconcileMsgTrackerOpts<NamespaceId, SubspaceId, PayloadDigest>,
) {
Expand Down Expand Up @@ -84,6 +86,16 @@ export class ReconcileMsgTracker<
this.prevToken = msg.staticTokenHandle;

this.announcedEntriesRemaining -= 1n;

this.isAwaitingTermination = true;
}

onTerminatePayload() {
this.isAwaitingTermination = false;
}

isExpectingPayloadOrTermination() {
return this.isAwaitingTermination;
}

isExpectingReconciliationSendEntry() {
Expand Down
Loading

0 comments on commit 89a1002

Please sign in to comment.