Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some supporting stuff for future plum trees #33

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/store/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ export class EntryIngestEvent<
{
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>;
authToken: AuthorisationToken;
externalSourceId?: string;
}
> {
constructor(
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>,
authToken: AuthorisationToken,
externalSourceId?: string,
) {
super("entryingest", {
detail: {
entry,
authToken,
externalSourceId,
},
});
}
Expand All @@ -61,17 +64,43 @@ export class PayloadIngestEvent<
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>;
authToken: AuthorisationToken;
payload: Payload;
externalSourceId?: string;
}> {
constructor(
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>,
authToken: AuthorisationToken,
payload: Payload,
externalSourceId?: string,
) {
super("payloadingest", {
detail: {
entry,
authToken,
payload,
externalSourceId,
},
});
}
}

/** Emitted after a {@linkcode Store} attempts to ingest a payload, but already had it. */
export class PayloadNoOpEvent<
NamespacePublicKey,
SubspacePublicKey,
PayloadDigest,
AuthorisationToken,
> extends CustomEvent<{
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>;
externalSourceId?: string;
}> {
constructor(
entry: Entry<NamespacePublicKey, SubspacePublicKey, PayloadDigest>,
externalSourceId?: string,
) {
super("payloadnoop", {
detail: {
entry,
externalSourceId,
},
});
}
Expand Down
7 changes: 6 additions & 1 deletion src/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
EntryPayloadSetEvent,
EntryRemoveEvent,
PayloadIngestEvent,
PayloadNoOpEvent,
PayloadRemoveEvent,
} from "./events.ts";
import type { Storage3d } from "./storage/storage_3d/types.ts";
Expand Down Expand Up @@ -618,6 +619,7 @@ export class Store<
payload: AsyncIterable<Uint8Array>,
allowPartial = false,
offset = 0,
externalSourceId? : string,
): Promise<IngestPayloadEvent> {
const getResult = await this.storage.get(
entryDetails.subspace,
Expand All @@ -636,6 +638,9 @@ export class Store<
const existingPayload = await this.payloadDriver.get(entry.payloadDigest);

if (existingPayload) {
this.dispatchEvent(
new PayloadNoOpEvent(entry, externalSourceId),
);
return {
kind: "no_op",
reason: "already_have_it",
Expand Down Expand Up @@ -692,7 +697,7 @@ export class Store<
}

this.dispatchEvent(
new PayloadIngestEvent(entry, authToken, complete),
new PayloadIngestEvent(entry, authToken, complete, externalSourceId),
);
}

Expand Down
6 changes: 6 additions & 0 deletions src/store/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ export type IngestEventFailure = {
export type IngestEventNoOp = {
kind: "no_op";
reason: "obsolete_from_same_subspace" | "newer_prefix_found";
/** An ID representing the source of this ingested entry. */
externalSourceId?: string;
};

/** Emitted after a successuful entry ingestion. */
Expand Down Expand Up @@ -261,11 +263,15 @@ export type IngestPayloadEventFailure = {
export type IngestPayloadEventNoOp = {
kind: "no_op";
reason: "already_have_it";
/** An ID representing the source of this ingested entry. */
externalSourceId?: string;
};

/** Emitted after the succesful ingestion of a payload. */
export type IngestPayloadEventSuccess = {
kind: "success";
/** An ID representing the source of this ingested entry. */
externalSourceId?: string;
};

/** Emitted after payload entry. */
Expand Down
7 changes: 6 additions & 1 deletion src/wgps/data/payload_ingester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export class PayloadIngester<
| Entry<NamespaceId, SubspaceId, PayloadDigest>
| null = null;

private id: string;

constructor(opts: {
getStore: GetStoreFn<
Prefingerprint,
Expand All @@ -58,9 +60,12 @@ export class PayloadIngester<
bytes: Uint8Array,
entryLength: bigint,
) => Uint8Array;
id: string;
}) {
this.processReceivedPayload = opts.processReceivedPayload;

this.id = opts.id;

onAsyncIterate(this.events, async (event) => {
if (event === CANCELLATION) {
if (this.currentIngestion.kind === "active") {
Expand Down Expand Up @@ -105,7 +110,7 @@ export class PayloadIngester<
path: entry.path,
subspace: entry.subspaceId,
timestamp: entry.timestamp,
}, new CancellableIngestion(fifo));
}, new CancellableIngestion(fifo), false, 0, this.id);

this.currentIngestion = {
kind: "active",
Expand Down
52 changes: 6 additions & 46 deletions src/wgps/reconciliation/announcer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { Range3d } from "@earthstar/willow-utils";
import { encodeBase64 } from "@std/encoding/base64";
import { FIFO } from "@korkje/fifo";
import { WillowError } from "../../errors.ts";
import type { Store } from "../../store/store.ts";
import type { LengthyEntry, Payload, PayloadScheme } from "../../store/types.ts";
import type { HandleStore } from "../handle_store.ts";
import type { AuthorisationTokenScheme, COVERS_NONE } from "../types.ts";
import type { StaticTokenStore } from "../static_token_store.ts";

export type AnnouncerOpts<
AuthorisationToken,
Expand All @@ -19,7 +18,7 @@ export type AnnouncerOpts<
DynamicToken
>;
payloadScheme: PayloadScheme<PayloadDigest>;
staticTokenHandleStoreOurs: HandleStore<StaticToken>;
staticTokenStore: StaticTokenStore<StaticToken>,
};

type AnnouncementPack<
Expand All @@ -29,9 +28,7 @@ type AnnouncementPack<
SubspaceId,
PayloadDigest,
> = {
// Send these first in SetupBindStaticToken messages
staticTokenBinds: StaticToken[];
// Then send a ReconciliationAnnounceEntries
// Send ReconciliationAnnounceEntries
announcement: {
range: Range3d<SubspaceId>;
count: number;
Expand Down Expand Up @@ -66,9 +63,7 @@ export class Announcer<
DynamicToken
>;
private payloadScheme: PayloadScheme<PayloadDigest>;
private staticTokenHandleStoreOurs: HandleStore<StaticToken>;

private staticTokenHandleMap = new Map<string, bigint>();
private staticTokenStore: StaticTokenStore<StaticToken>;

private announcementPackQueue = new FIFO<
AnnouncementPack<
Expand All @@ -92,33 +87,7 @@ export class Announcer<
) {
this.authorisationTokenScheme = opts.authorisationTokenScheme;
this.payloadScheme = opts.payloadScheme;
this.staticTokenHandleStoreOurs = opts.staticTokenHandleStoreOurs;
}

private getStaticTokenHandle(
staticToken: StaticToken,
): { handle: bigint; alreadyExisted: boolean } {
const encoded = this.authorisationTokenScheme.encodings.staticToken.encode(
staticToken,
);
const base64 = encodeBase64(encoded);

const existingHandle = this.staticTokenHandleMap.get(base64);

if (existingHandle !== undefined) {
const canUse = this.staticTokenHandleStoreOurs.canUse(existingHandle);

if (!canUse) {
throw new WillowError("Could not use a static token handle");
}

return { handle: existingHandle, alreadyExisted: true };
}

const newHandle = this.staticTokenHandleStoreOurs.bind(staticToken);
this.staticTokenHandleMap.set(base64, newHandle);

return { handle: newHandle, alreadyExisted: false };
this.staticTokenStore = opts.staticTokenStore;
}

async queueAnnounce(announcement: {
Expand All @@ -140,7 +109,6 @@ export class Announcer<
}) {
// Queue announcement message.

const staticTokenBinds: StaticToken[] = [];
const entries: {
lengthyEntry: LengthyEntry<NamespaceId, SubspaceId, PayloadDigest>;
staticTokenHandle: bigint;
Expand All @@ -158,14 +126,7 @@ export class Announcer<
const [staticToken, dynamicToken] = this.authorisationTokenScheme
.decomposeAuthToken(authToken);

const {
handle: staticTokenHandle,
alreadyExisted: staticTokenHandleAlreadyExisted,
} = this.getStaticTokenHandle(staticToken);

if (!staticTokenHandleAlreadyExisted) {
staticTokenBinds.push(staticToken);
}
const staticTokenHandle = this.staticTokenStore.getByValue(staticToken);

let available = payload ? await payload.length() : 0n;

Expand All @@ -187,7 +148,6 @@ export class Announcer<
}

this.announcementPackQueue.push({
staticTokenBinds,
announcement: {
count: entries.length,
range: announcement.range,
Expand Down
49 changes: 49 additions & 0 deletions src/wgps/static_token_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { encodeBase64 } from "@std/encoding/base64";
import { HandleStore } from "./handle_store.ts";
import { WillowError } from "../errors.ts";
import FIFO from "@korkje/fifo";

/** Maps static tokens to handles and vice versa. `boundStaticTokens` needs to be sent to the peer. */
export class StaticTokenStore<StaticToken> {
private byHandle = new HandleStore<StaticToken>();
private byValue = new Map<string, bigint>();

private valueEncoder: (value: StaticToken) => Uint8Array;
private boundTokensQueue = new FIFO<StaticToken>();

constructor(valueEncoder: (value: StaticToken) => Uint8Array) {
this.valueEncoder = valueEncoder;
}

getByValue(
value: StaticToken,
): bigint {
const encoded = this.valueEncoder(value);
const base64 = encodeBase64(encoded);

const existingHandle = this.byValue.get(base64);

if (existingHandle !== undefined) {
const canUse = this.byHandle.canUse(existingHandle);

if (!canUse) {
throw new WillowError("Could not use a static token handle");
}

return existingHandle;
}

const newHandle = this.byHandle.bind(value);
this.byValue.set(base64, newHandle);

this.boundTokensQueue.push(value);

return newHandle;
}

*boundStaticTokens() {
for (const boundToken of this.boundTokensQueue) {
yield boundToken;
}
}
}
Loading
Loading