Skip to content

Commit

Permalink
quick transformation API update
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed May 1, 2024
1 parent fc29124 commit d6bc781
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions src/wgps/data/payload_ingester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ export class PayloadIngester<
AuthorisationOpts,
> {
private currentIngestion = new FIFO<Uint8Array | typeof CANCELLATION>();
private currentEntry:
| Entry<NamespaceId, SubspaceId, PayloadDigest>
| undefined;
private events = new FIFO<
Uint8Array | {
entry: Entry<NamespaceId, SubspaceId, PayloadDigest>;
} | typeof CANCELLATION
>();
private processReceivedPayload: (bytes: Uint8Array) => Uint8Array;
private processReceivedPayload: (
bytes: Uint8Array,
entryLength: bigint,
) => Uint8Array;

constructor(opts: {
getStore: GetStoreFn<
Expand All @@ -32,7 +38,10 @@ export class PayloadIngester<
SubspaceId,
PayloadDigest
>;
processReceivedPayload: (bytes: Uint8Array) => Uint8Array;
processReceivedPayload: (
bytes: Uint8Array,
entryLength: bigint,
) => Uint8Array;
}) {
this.processReceivedPayload = opts.processReceivedPayload;

Expand All @@ -45,6 +54,8 @@ export class PayloadIngester<

const store = await opts.getStore(event.entry.namespaceId);

this.currentEntry = event.entry;

store.ingestPayload({
path: event.entry.path,
subspace: event.entry.subspaceId,
Expand All @@ -59,7 +70,12 @@ export class PayloadIngester<
},
);
} else {
this.currentIngestion.push(event);
const transformed = this.processReceivedPayload(
event,
this.currentEntry!.payloadLength,
);

this.currentIngestion.push(transformed);
}
});
}
Expand All @@ -69,9 +85,7 @@ export class PayloadIngester<
}

push(bytes: Uint8Array, end: boolean) {
const processed = this.processReceivedPayload(bytes);

this.events.push(processed);
this.events.push(bytes);

if (end) {
this.events.push(CANCELLATION);
Expand Down

0 comments on commit d6bc781

Please sign in to comment.