From d45f19f4ee34d02af18fb2934a927460ad1e5d64 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 2 May 2024 14:37:13 +0100 Subject: [PATCH] Add ReconciliationSendPayload, ReconciliationTerminatePayload --- src/wgps/channels.ts | 3 + src/wgps/data/data_sender.ts | 3 - src/wgps/data/payload_ingester.ts | 38 ++++++++- src/wgps/decoding/decode_messages.ts | 39 +++++++-- src/wgps/decoding/reconciliation.ts | 34 ++++++++ src/wgps/encoding.test.ts | 17 ++++ src/wgps/encoding/message_encoder.ts | 12 +++ src/wgps/encoding/reconciliation.ts | 15 ++++ .../reconciliation/reconcile_msg_tracker.ts | 12 +++ src/wgps/types.ts | 72 +++++++++++------ src/wgps/wgps_messenger.ts | 80 ++++++++++++++----- 11 files changed, 268 insertions(+), 57 deletions(-) diff --git a/src/wgps/channels.ts b/src/wgps/channels.ts index 8f1c729..0ec8f64 100644 --- a/src/wgps/channels.ts +++ b/src/wgps/channels.ts @@ -9,6 +9,9 @@ export const msgLogicalChannels: Record = { [MsgKind.ReconciliationSendFingerprint]: LogicalChannel.ReconciliationChannel, [MsgKind.ReconciliationAnnounceEntries]: LogicalChannel.ReconciliationChannel, [MsgKind.ReconciliationSendEntry]: LogicalChannel.ReconciliationChannel, + [MsgKind.ReconciliationSendPayload]: LogicalChannel.ReconciliationChannel, + [MsgKind.ReconciliationTerminatePayload]: + LogicalChannel.ReconciliationChannel, [MsgKind.DataSendEntry]: LogicalChannel.DataChannel, [MsgKind.DataSendPayload]: LogicalChannel.DataChannel, [MsgKind.DataReplyPayload]: LogicalChannel.DataChannel, diff --git a/src/wgps/data/data_sender.ts b/src/wgps/data/data_sender.ts index fcb9a09..6c990e2 100644 --- a/src/wgps/data/data_sender.ts +++ b/src/wgps/data/data_sender.ts @@ -147,9 +147,6 @@ export class DataSender< bytes: transformed, }; } - - console.groupEnd(); - console.groupEnd(); } } } diff --git a/src/wgps/data/payload_ingester.ts b/src/wgps/data/payload_ingester.ts index 89adb16..66b7826 100644 --- a/src/wgps/data/payload_ingester.ts +++ b/src/wgps/data/payload_ingester.ts @@ -1,10 +1,11 @@ -import { Entry, FIFO } from "../../../deps.ts"; +import { concat, Entry, FIFO } from "../../../deps.ts"; import { WgpsMessageValidationError } from "../../errors.ts"; import { onAsyncIterate } from "../util.ts"; import { GetStoreFn } from "../wgps_messenger.ts"; const CANCELLATION = Symbol("cancellation"); +// This class can handle both the payload sending procedures for payloads sent via reconciliation AND data channels. It would probably be better to split them up. export class PayloadIngester< Prefingerprint, Fingerprint, @@ -18,6 +19,8 @@ export class PayloadIngester< private currentEntry: | Entry | undefined; + private currentlyReceivedLength = 0n; + private events = new FIFO< Uint8Array | { entry: Entry; @@ -27,6 +30,9 @@ export class PayloadIngester< bytes: Uint8Array, entryLength: bigint, ) => Uint8Array; + private entryToRequestPayloadFor: + | Entry + | null = null; constructor(opts: { getStore: GetStoreFn< @@ -55,6 +61,7 @@ export class PayloadIngester< const store = await opts.getStore(event.entry.namespaceId); this.currentEntry = event.entry; + this.currentlyReceivedLength = 0n; store.ingestPayload({ path: event.entry.path, @@ -62,7 +69,10 @@ export class PayloadIngester< timestamp: event.entry.timestamp, }, new CancellableIngestion(this.currentIngestion)).then( (ingestEvent) => { - if (ingestEvent.kind === "failure") { + if ( + ingestEvent.kind === "failure" && + this.currentlyReceivedLength === event.entry.payloadLength + ) { throw new WgpsMessageValidationError( "Ingestion failed: " + ingestEvent.reason, ); @@ -75,13 +85,22 @@ export class PayloadIngester< this.currentEntry!.payloadLength, ); + this.currentlyReceivedLength += BigInt(transformed.byteLength); + this.currentIngestion.push(transformed); } }); } - target(entry: Entry) { + target( + entry: Entry, + requestIfImmediatelyTerminated?: boolean, + ) { this.events.push({ entry }); + + if (requestIfImmediatelyTerminated) { + this.entryToRequestPayloadFor = entry; + } } push(bytes: Uint8Array, end: boolean) { @@ -90,6 +109,15 @@ export class PayloadIngester< if (end) { this.events.push(CANCELLATION); } + + this.entryToRequestPayloadFor = null; + } + + // Returns the entry to request a payload for or null + terminate(): Entry | null { + this.events.push(CANCELLATION); + + return this.entryToRequestPayloadFor; } } @@ -100,9 +128,13 @@ class CancellableIngestion { async *[Symbol.asyncIterator]() { for await (const event of this.iterable) { + let bytes = new Uint8Array(); + if (event === CANCELLATION) { break; } else { + bytes = concat(bytes, event); + yield event; } } diff --git a/src/wgps/decoding/decode_messages.ts b/src/wgps/decoding/decode_messages.ts index 74ce0fe..e6b46f5 100644 --- a/src/wgps/decoding/decode_messages.ts +++ b/src/wgps/decoding/decode_messages.ts @@ -1,5 +1,12 @@ import { Area, Entry, GrowingBytes } from "../../../deps.ts"; -import { ReadCapPrivy, SyncMessage, SyncSchemes, Transport } from "../types.ts"; +import { + MsgKind, + MsgReconciliationTerminatePayload, + ReadCapPrivy, + SyncMessage, + SyncSchemes, + Transport, +} from "../types.ts"; import { decodeCommitmentReveal } from "./commitment_reveal.ts"; import { decodeControlAbsolve, @@ -24,6 +31,7 @@ import { decodeReconciliationAnnounceEntries, decodeReconciliationSendEntry, decodeReconciliationSendFingerprint, + decodeReconciliationSendPayload, } from "./reconciliation.ts"; import { ReconcileMsgTracker, @@ -36,6 +44,7 @@ import { decodeDataSendPayload, decodeDataSetEagerness, } from "./data.ts"; +import { WgpsMessageValidationError } from "../../errors.ts"; export type DecodeMessagesOpts< ReadCapability, @@ -167,7 +176,7 @@ export async function* decodeMessages< // Find out the type of decoder to use by bitmasking the first byte of the message. const [firstByte] = bytes.array; - if (firstByte === 0) { + if (firstByte === 0x0) { yield await decodeCommitmentReveal( bytes, opts.challengeLength, @@ -224,10 +233,27 @@ export async function* decodeMessages< aoiHandlesToArea: opts.aoiHandlesToArea, }); } else if ((firstByte & 0x50) === 0x50) { - // Reconciliation Announce Entries - // OR a send entry. It all depends on what we are expecting... + // ReconciliationAnnounceEntries + // OR ReconciliationSendEntry + + // OR ReconciliationSendPayload + // OR ReconciliationTerminatePayload. + + // All depends on what we're expecting. + + if (reconcileMsgTracker.isExpectingPayloadOrTermination()) { + if ((firstByte & 0x58) === 0x58) { + reconcileMsgTracker.onTerminatePayload(); + // It's a terminate message. - if (reconcileMsgTracker.isExpectingReconciliationSendEntry()) { + bytes.prune(1); + yield { + kind: MsgKind.ReconciliationTerminatePayload, + }; + } else { + yield await decodeReconciliationSendPayload(bytes); + } + } else if (reconcileMsgTracker.isExpectingReconciliationSendEntry()) { const message = await decodeReconciliationSendEntry( bytes, { @@ -333,8 +359,7 @@ export async function* decodeMessages< ); } else { // Couldn't decode. - console.warn("Could not decode!"); - break; + throw new WgpsMessageValidationError("Could not decode!"); } } } diff --git a/src/wgps/decoding/reconciliation.ts b/src/wgps/decoding/reconciliation.ts index 31af578..9e95545 100644 --- a/src/wgps/decoding/reconciliation.ts +++ b/src/wgps/decoding/reconciliation.ts @@ -13,8 +13,10 @@ import { MsgReconciliationAnnounceEntries, MsgReconciliationSendEntry, MsgReconciliationSendFingerprint, + MsgReconciliationSendPayload, ReconciliationPrivy, } from "../types.ts"; +import { compactWidthFromEndOfByte } from "./util.ts"; export async function decodeReconciliationSendFingerprint< Fingerprint, @@ -376,3 +378,35 @@ export async function decodeReconciliationSendEntry< staticTokenHandle, }; } + +export async function decodeReconciliationSendPayload( + bytes: GrowingBytes, +): Promise { + await bytes.nextAbsolute(1); + + const amountCompactWidth = compactWidthFromEndOfByte(bytes.array[0]); + + await bytes.nextAbsolute(1 + amountCompactWidth); + + const amount = decodeCompactWidth( + bytes.array.subarray(1, 1 + amountCompactWidth), + ); + + bytes.prune(1 + amountCompactWidth); + + await bytes.nextAbsolute(Number(amount)); + + const messageBytes = bytes.array.slice(0, Number(amount)); + + bytes.prune(Number(amount)); + + return { + kind: MsgKind.ReconciliationSendPayload, + amount: BigInt(amount), + bytes: messageBytes, + }; +} + +// Don't need to decode ReconciliationTerminatePayload +// as there's nothing in the message and we decode the message type +// in decode_messages. diff --git a/src/wgps/encoding.test.ts b/src/wgps/encoding.test.ts index 91ce081..b777440 100644 --- a/src/wgps/encoding.test.ts +++ b/src/wgps/encoding.test.ts @@ -8,6 +8,7 @@ import { MsgKind, MsgReconciliationAnnounceEntries, MsgReconciliationSendEntry, + MsgReconciliationTerminatePayload, SyncMessage, SyncSchemes, } from "./types.ts"; @@ -489,6 +490,7 @@ const sendEntryVectors: ( TestSubspace, ArrayBuffer > + | MsgReconciliationTerminatePayload )[] = [ { kind: MsgKind.ReconciliationAnnounceEntries, @@ -528,6 +530,9 @@ const sendEntryVectors: ( }, }, }, + { + kind: MsgKind.ReconciliationTerminatePayload, + }, { kind: MsgKind.ReconciliationAnnounceEntries, receiverHandle: 0n, @@ -589,6 +594,9 @@ const sendEntryVectors: ( }, }, }, + { + kind: MsgKind.ReconciliationTerminatePayload, + }, { kind: MsgKind.ReconciliationSendEntry, dynamicToken: crypto.getRandomValues(new Uint8Array(32)), @@ -605,6 +613,9 @@ const sendEntryVectors: ( }, }, }, + { + kind: MsgKind.ReconciliationTerminatePayload, + }, { kind: MsgKind.ReconciliationSendEntry, dynamicToken: crypto.getRandomValues(new Uint8Array(32)), @@ -621,6 +632,9 @@ const sendEntryVectors: ( }, }, }, + { + kind: MsgKind.ReconciliationTerminatePayload, + }, { kind: MsgKind.ReconciliationSendEntry, dynamicToken: crypto.getRandomValues(new Uint8Array(32)), @@ -637,6 +651,9 @@ const sendEntryVectors: ( }, }, }, + { + kind: MsgKind.ReconciliationTerminatePayload, + }, ]; Deno.test("Encoding roundtrip test", async () => { diff --git a/src/wgps/encoding/message_encoder.ts b/src/wgps/encoding/message_encoder.ts index 120d07a..9f19d9a 100644 --- a/src/wgps/encoding/message_encoder.ts +++ b/src/wgps/encoding/message_encoder.ts @@ -31,6 +31,8 @@ import { encodeReconciliationAnnounceEntries, encodeReconciliationSendEntry, encodeReconciliationSendFingerprint, + encodeReconciliationSendPayload, + encodeReconciliationTerminatePayload, } from "./reconciliation.ts"; import { ReconcileMsgTracker, @@ -295,6 +297,16 @@ export class MessageEncoder< break; } + case MsgKind.ReconciliationSendPayload: { + bytes = encodeReconciliationSendPayload(message); + break; + } + + case MsgKind.ReconciliationTerminatePayload: { + bytes = encodeReconciliationTerminatePayload(); + break; + } + // Data case MsgKind.DataSendEntry: { diff --git a/src/wgps/encoding/reconciliation.ts b/src/wgps/encoding/reconciliation.ts index e1d6fcf..c099a6b 100644 --- a/src/wgps/encoding/reconciliation.ts +++ b/src/wgps/encoding/reconciliation.ts @@ -11,6 +11,8 @@ import { MsgReconciliationAnnounceEntries, MsgReconciliationSendEntry, MsgReconciliationSendFingerprint, + MsgReconciliationSendPayload, + MsgReconciliationTerminatePayload, ReconciliationPrivy, } from "../types.ts"; import { compactWidthOr } from "./util.ts"; @@ -291,3 +293,16 @@ export function encodeReconciliationSendEntry< encodedRelativeEntry, ); } + +export function encodeReconciliationSendPayload( + msg: MsgReconciliationSendPayload, +): Uint8Array { + const header = compactWidthOr(0x50, compactWidth(msg.amount)); + const amountEncoded = encodeCompactWidth(msg.amount); + + return concat(new Uint8Array([header]), amountEncoded, msg.bytes); +} + +export function encodeReconciliationTerminatePayload(): Uint8Array { + return new Uint8Array([0x58]); +} diff --git a/src/wgps/reconciliation/reconcile_msg_tracker.ts b/src/wgps/reconciliation/reconcile_msg_tracker.ts index af9bcee..4a13a0a 100644 --- a/src/wgps/reconciliation/reconcile_msg_tracker.ts +++ b/src/wgps/reconciliation/reconcile_msg_tracker.ts @@ -36,6 +36,8 @@ export class ReconcileMsgTracker< private handleToNamespaceId: (aoiHandle: bigint) => NamespaceId; + private isAwaitingTermination = false; + constructor( opts: ReconcileMsgTrackerOpts, ) { @@ -84,6 +86,16 @@ export class ReconcileMsgTracker< this.prevToken = msg.staticTokenHandle; this.announcedEntriesRemaining -= 1n; + + this.isAwaitingTermination = true; + } + + onTerminatePayload() { + this.isAwaitingTermination = false; + } + + isExpectingPayloadOrTermination() { + return this.isAwaitingTermination; } isExpectingReconciliationSendEntry() { diff --git a/src/wgps/types.ts b/src/wgps/types.ts index 8ba080e..fbd5372 100644 --- a/src/wgps/types.ts +++ b/src/wgps/types.ts @@ -100,6 +100,8 @@ export enum MsgKind { ReconciliationSendFingerprint, ReconciliationAnnounceEntries, ReconciliationSendEntry, + ReconciliationSendPayload, + ReconciliationTerminatePayload, DataSendEntry, DataSendPayload, DataSetMetadata, @@ -115,28 +117,30 @@ export enum MsgKind { // This is to help with logging and debugging. export const messageNames: Record = { - 0: "CommitmentReveal", - 1: "PaiBindFragment", - 2: "PaiReplyFragment", - 3: "PaiRequestSubspaceCapability", - 4: "PaiReplySubspaceCapability", - 5: "SetupBindReadCapability", - 6: "SetupBindAreaOfInterest", - 7: "SetupBindStaticToken", - 8: "ReconciliationSendFingerprint", - 9: "ReconciliationAnnounceEntries", - 10: "ReconciliationSendEntry", - 11: "DataSendEntry", - 12: "DataSendPayload", - 13: "DataSetMetadata", - 14: "DataBindPayloadRequest", - 15: "DataReplyPayload", - 16: "ControlIssueGuarantee", - 17: "ControlAbsolve", - 18: "ControlPlead", - 19: "ControlAnnounceDropping", - 20: "ControlApologise", - 21: "ControlFree", + [MsgKind.CommitmentReveal]: "CommitmentReveal", + [MsgKind.PaiBindFragment]: "PaiBindFragment", + [MsgKind.PaiReplyFragment]: "PaiReplyFragment", + [MsgKind.PaiRequestSubspaceCapability]: "PaiRequestSubspaceCapability", + [MsgKind.PaiReplySubspaceCapability]: "PaiReplySubspaceCapability", + [MsgKind.SetupBindReadCapability]: "SetupBindReadCapability", + [MsgKind.SetupBindAreaOfInterest]: "SetupBindAreaOfInterest", + [MsgKind.SetupBindStaticToken]: "SetupBindStaticToken", + [MsgKind.ReconciliationSendFingerprint]: "ReconciliationSendFingerprint", + [MsgKind.ReconciliationAnnounceEntries]: "ReconciliationAnnounceEntries", + [MsgKind.ReconciliationSendEntry]: "ReconciliationSendEntry", + [MsgKind.ReconciliationSendPayload]: "ReconciliationSendPayload", + [MsgKind.ReconciliationTerminatePayload]: "ReconciliationTerminatePayload", + [MsgKind.DataSendEntry]: "DataSendEntry", + [MsgKind.DataSendPayload]: "DataSendPayload", + [MsgKind.DataSetMetadata]: "DataSetMetadata", + [MsgKind.DataBindPayloadRequest]: "DataBindPayloadRequest", + [MsgKind.DataReplyPayload]: "DataReplyPayload", + [MsgKind.ControlIssueGuarantee]: "ControlIssueGuarantee", + [MsgKind.ControlAbsolve]: "ControlAbsolve", + [MsgKind.ControlPlead]: "ControlPlead", + [MsgKind.ControlAnnounceDropping]: "ControlAnnounceDropping", + [MsgKind.ControlApologise]: "ControlApologise", + [MsgKind.ControlFree]: "ControlFree", }; export type Msg< @@ -304,6 +308,7 @@ export type MsgReconciliationAnnounceEntries = Msg< } >; +/** Transmit a LengthyEntry as part of 3d range-based set reconciliation. */ export type MsgReconciliationSendEntry< DynamicToken, NamespaceId, @@ -318,6 +323,21 @@ export type MsgReconciliationSendEntry< dynamicToken: DynamicToken; }>; +export type MsgReconciliationSendPayload = Msg< + MsgKind.ReconciliationSendPayload, + { + /** The number of transmitted bytes. */ + amount: bigint; + /** amount many bytes, a substring of the bytes obtained by applying transform_payload to the Payload to be transmitted. */ + bytes: Uint8Array; + } +>; + +/** Indicate that no more bytes will be transmitted for the currently transmitted Payload as part of set reconciliation. */ +export type MsgReconciliationTerminatePayload = { + kind: MsgKind.ReconciliationTerminatePayload; +}; + /** Transmit an AuthorisedEntry to the other peer, and optionally prepare transmission of its Payload. */ export type MsgDataSendEntry< DynamicToken, @@ -341,7 +361,7 @@ export type MsgDataSendPayload = Msg< { /** The number of transmitted bytes. */ amount: bigint; - /** amount many bytes, to be added to the Payload of the receiver’s currently_received_entry at offset currently_received_offset. */ + /** amount many bytes, a substring of the bytes obtained by applying transform_payload to the Payload to be transmitted. */ bytes: Uint8Array; } >; @@ -408,6 +428,8 @@ export type SyncMessage< SubspaceId, PayloadDigest > + | MsgReconciliationSendPayload + | MsgReconciliationTerminatePayload | MsgDataSendEntry< DynamicToken, NamespaceId, @@ -435,7 +457,9 @@ export type ReconciliationChannelMsg< NamespaceId, SubspaceId, PayloadDigest - >; + > + | MsgReconciliationSendPayload + | MsgReconciliationTerminatePayload; export type DataChannelMsg< DynamicToken, diff --git a/src/wgps/wgps_messenger.ts b/src/wgps/wgps_messenger.ts index 95a0288..ca22e5b 100644 --- a/src/wgps/wgps_messenger.ts +++ b/src/wgps/wgps_messenger.ts @@ -30,6 +30,7 @@ import { IntersectionChannelMsg, IS_ALFIE, LogicalChannel, + messageNames, MsgKind, NoChannelMsg, PayloadRequestChannelMsg, @@ -147,7 +148,10 @@ export type WgpsMessengerOpts< /** A (not necessarily deterministic) algorithm that converts a chunk of a payload into another bytestring. */ transformPayload: (chunk: Uint8Array) => Uint8Array; /** Process transformed payload chunks. */ - processReceivedPayload: (chunk: Uint8Array) => Uint8Array; + processReceivedPayload: ( + chunk: Uint8Array, + entryLength: bigint, + ) => Uint8Array; }; /** Coordinates a complete WGPS synchronisation session. */ @@ -337,6 +341,16 @@ export class WgpsMessenger< remaining: bigint; } | undefined; + private reconciliationPayloadIngester: PayloadIngester< + Prefingerprint, + Fingerprint, + AuthorisationToken, + NamespaceId, + SubspaceId, + PayloadDigest, + AuthorisationOpts + >; + // Data private capFinder: CapFinder< @@ -373,7 +387,7 @@ export class WgpsMessenger< AuthorisationOpts >; - private payloadIngester: PayloadIngester< + private dataPayloadIngester: PayloadIngester< Prefingerprint, Fingerprint, AuthorisationToken, @@ -438,6 +452,8 @@ export class WgpsMessenger< } } + this.getStore = opts.getStore; + this.interests = opts.interests; this.schemes = opts.schemes; @@ -474,9 +490,12 @@ export class WgpsMessenger< staticTokenHandleStoreOurs: this.handlesStaticTokensOurs, }); - // Data + this.reconciliationPayloadIngester = new PayloadIngester({ + getStore: this.getStore, + processReceivedPayload: opts.processReceivedPayload, + }); - this.getStore = opts.getStore; + // Data this.currentlyReceivedEntry = defaultEntry( this.schemes.namespace.defaultNamespaceId, @@ -504,7 +523,7 @@ export class WgpsMessenger< transformPayload: opts.transformPayload, }); - this.payloadIngester = new PayloadIngester({ + this.dataPayloadIngester = new PayloadIngester({ getStore: this.getStore, processReceivedPayload: opts.processReceivedPayload, }); @@ -702,14 +721,12 @@ export class WgpsMessenger< // Begin handling decoded messages onAsyncIterate(decodedMessages, (msg) => { - /* console.log( `%c${this.transport.role === IS_ALFIE ? "Alfie" : "Betty"} got: ${ messageNames[msg.kind] }`, `color: ${this.transport.role === IS_ALFIE ? "red" : "blue"}`, ); - */ if (msg.kind === MsgKind.DataSendEntry) { this.currentlyReceivedEntry = msg.entry; @@ -743,6 +760,8 @@ export class WgpsMessenger< case MsgKind.ReconciliationSendFingerprint: case MsgKind.ReconciliationAnnounceEntries: case MsgKind.ReconciliationSendEntry: + case MsgKind.ReconciliationSendPayload: + case MsgKind.ReconciliationTerminatePayload: this.inChannelReconciliation.push(msg); break; case MsgKind.DataSendEntry: @@ -1006,6 +1025,12 @@ export class WgpsMessenger< dynamicToken: entry.dynamicToken, staticTokenHandle: entry.staticTokenHandle, }); + + // We should check if the entry's payload length is less than our partner's maximum_payload_size and send the payload, but at the time of writing time is short and that requires something of a refactor to the announcer. + + this.encoder.encode({ + kind: MsgKind.ReconciliationTerminatePayload, + }); } }); @@ -1047,6 +1072,7 @@ export class WgpsMessenger< // Whenever the reconciler emits a fingerprint... onAsyncIterate(reconciler.fingerprints(), ({ fingerprint, range }) => { // Send a ReconciliationSendFingerprint message + this.encoder.encode({ kind: MsgKind.ReconciliationSendFingerprint, fingerprint, @@ -1369,13 +1395,29 @@ export class WgpsMessenger< this.currentlyReceivingEntries.remaining -= 1n; - if ( - result.kind === "success" && - message.entry.available === message.entry.entry.payloadLength - ) { + this.reconciliationPayloadIngester.target( + message.entry.entry, + message.entry.available === message.entry.entry.payloadLength, + ); + + break; + } + case MsgKind.ReconciliationSendPayload: { + // Ingest for the currently targeted entry. + this.reconciliationPayloadIngester.push(message.bytes, false); + + break; + } + case MsgKind.ReconciliationTerminatePayload: { + const entryToRequestPayloadFor = this.reconciliationPayloadIngester + .terminate(); + + if (entryToRequestPayloadFor) { // Request the payload. - const capHandle = this.capFinder.findCapHandle(message.entry.entry); + const capHandle = this.capFinder.findCapHandle( + entryToRequestPayloadFor, + ); if (capHandle === undefined) { throw new WillowError( @@ -1384,19 +1426,17 @@ export class WgpsMessenger< } this.handlesPayloadRequestsOurs.bind({ - entry: message.entry.entry, + entry: entryToRequestPayloadFor, offset: 0n, }); this.encoder.encode({ kind: MsgKind.DataBindPayloadRequest, - entry: message.entry.entry, + entry: entryToRequestPayloadFor, offset: 0n, capability: capHandle, }); } - - break; } } } @@ -1428,7 +1468,7 @@ export class WgpsMessenger< throw new WgpsMessageValidationError(result.message); } - this.payloadIngester.target(message.entry); + this.dataPayloadIngester.target(message.entry); break; } @@ -1445,7 +1485,7 @@ export class WgpsMessenger< const endHere = this.currentlyReceivedOffset === this.currentlyReceivedEntry.payloadLength; - this.payloadIngester.push(message.bytes, endHere); + this.dataPayloadIngester.push(message.bytes, endHere); break; } @@ -1458,7 +1498,7 @@ export class WgpsMessenger< ); } - this.payloadIngester.target(result.entry); + this.dataPayloadIngester.target(result.entry); break; } @@ -1496,7 +1536,7 @@ export class WgpsMessenger< ); } - const newHandle = this.handlesCapsTheirs.bind(message.capability); + this.handlesCapsTheirs.bind(message.capability); this.paiFinder.receivedReadCapForIntersection(message.handle); }