Skip to content

Commit

Permalink
Add basic chunk-by-chunk payload transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed May 1, 2024
1 parent f62e9e0 commit fc29124
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 4 deletions.
4 changes: 3 additions & 1 deletion .nova/Configuration.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"co.gwil.deno.config.documentPreloadLimit" : 2000,
"co.gwil.deno.config.enabledPaths" : [
"\/Users\/gwil\/Projects\/willow-js\/src",
"\/Users\/gwil\/Projects\/willow-js\/scripts",
Expand All @@ -10,5 +9,8 @@
"deno.disablePaths" : [
"dist"
],
"deno.enablePaths" : [
"src"
],
"deno.unstable" : true
}
7 changes: 5 additions & 2 deletions src/wgps/data/data_sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class DataSender<
SubspaceId,
PayloadDigest
>;
transformPayload: (chunk: Uint8Array) => Uint8Array;
},
) {
}
Expand Down Expand Up @@ -138,10 +139,12 @@ export class DataSender<
const payloadIterator = await pack.payload.stream(pack.offset);

for await (const chunk of payloadIterator) {
const transformed = this.opts.transformPayload(chunk);

yield {
kind: MsgKind.DataSendPayload,
amount: BigInt(chunk.byteLength),
bytes: chunk,
amount: BigInt(transformed.byteLength),
bytes: transformed,
};
}

Expand Down
8 changes: 7 additions & 1 deletion src/wgps/data/payload_ingester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class PayloadIngester<
entry: Entry<NamespaceId, SubspaceId, PayloadDigest>;
} | typeof CANCELLATION
>();
private processReceivedPayload: (bytes: Uint8Array) => Uint8Array;

constructor(opts: {
getStore: GetStoreFn<
Expand All @@ -31,7 +32,10 @@ export class PayloadIngester<
SubspaceId,
PayloadDigest
>;
processReceivedPayload: (bytes: Uint8Array) => Uint8Array;
}) {
this.processReceivedPayload = opts.processReceivedPayload;

onAsyncIterate(this.events, async (event) => {
if (event === CANCELLATION) {
this.currentIngestion.push(CANCELLATION);
Expand Down Expand Up @@ -65,7 +69,9 @@ export class PayloadIngester<
}

push(bytes: Uint8Array, end: boolean) {
this.events.push(bytes);
const processed = this.processReceivedPayload(bytes);

this.events.push(processed);

if (end) {
this.events.push(CANCELLATION);
Expand Down
32 changes: 32 additions & 0 deletions src/wgps/wgps_messenger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -319,6 +321,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -543,6 +547,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -632,6 +638,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -818,6 +826,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -909,6 +919,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -1091,6 +1103,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -1181,6 +1195,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -1355,6 +1371,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -1454,6 +1472,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -1641,6 +1661,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -1732,6 +1754,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -1906,6 +1930,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -1997,6 +2023,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down Expand Up @@ -2211,6 +2239,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

const storeMapBetty = new StoreMap(
Expand Down Expand Up @@ -2342,6 +2372,8 @@ function testWgpsMessenger(scenario: WgpsScenario) {
maxSize: BigInt(0),
}],
]]),
transformPayload: (bytes) => bytes,
processReceivedPayload: (bytes) => bytes,
});

await delay(20 * scenario.timeMultiplier);
Expand Down
7 changes: 7 additions & 0 deletions src/wgps/wgps_messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ export type WgpsMessengerOpts<
SubspaceId,
PayloadDigest
>;

/** A (not necessarily deterministic) algorithm that converts a chunk of a payload into another bytestring. */
transformPayload: (chunk: Uint8Array) => Uint8Array;
/** Process transformed payload chunks. */
processReceivedPayload: (chunk: Uint8Array) => Uint8Array;
};

/** Coordinates a complete WGPS synchronisation session. */
Expand Down Expand Up @@ -496,10 +501,12 @@ export class WgpsMessenger<
this.dataSender = new DataSender({
handlesPayloadRequestsTheirs: this.handlesPayloadRequestsTheirs,
getStore: this.getStore,
transformPayload: opts.transformPayload,
});

this.payloadIngester = new PayloadIngester({
getStore: this.getStore,
processReceivedPayload: opts.processReceivedPayload,
});

// Send encoded messages
Expand Down

0 comments on commit fc29124

Please sign in to comment.