diff --git a/.gitignore b/.gitignore index a83b974..ad9f178 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ .DS_Store -dist -debug/replica_test \ No newline at end of file +dist \ No newline at end of file diff --git a/README.md b/README.md index 44ba6f4..e7ecfdb 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,10 @@ This is a reference implementation of the Willow protocol written in TypeScript. **It is a work in progress**! +Want to follow along with development, ask questions, or get involved yourself? +Come and join us on the +[Earthstar Project Discord](https://discord.gg/6NtYzQC2G4). + Here is what has been implemented: - `Store` @@ -41,5 +45,6 @@ And here is what remains: - QUIC transport driver - IndexedDB KV driver -Want to follow along with development, or get involved? Come and join us on the -[Earthstar Project Discord](https://discord.gg/6NtYzQC2G4). +## Dev + +Deno is used the development runtime. Run `deno task test` to run tests. diff --git a/debug/keyhop_tree.ts b/debug/keyhop_tree.ts deleted file mode 100644 index 7d0f1cf..0000000 --- a/debug/keyhop_tree.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { KvDriverDeno } from "../src/replica/storage/kv/kv_driver_deno.ts"; -import { KeyHopTree } from "../src/replica/storage/prefix_iterators/key_hop_tree.ts"; -import { RadixishTree } from "../src/replica/storage/prefix_iterators/radixish_tree.ts"; -import { SimpleKeyIterator } from "../src/replica/storage/prefix_iterators/simple_key_iterator.ts"; - -const driver = new KvDriverDeno(await Deno.openKv("gtree")); -const driver2 = new KvDriverDeno(await Deno.openKv("straightforward")); - -await driver.clear(); -await driver2.clear(); - -const gtree = new KeyHopTree(driver); -const stree = new SimpleKeyIterator(driver2); -const mtree = new RadixishTree(); - -const encoder = new TextEncoder(); -const decoder = new TextDecoder(); - -const k = (str: string) => encoder.encode(str); -const d = (b: Uint8Array) => decoder.decode(b); - -const wordList = [ - "n", - "na", - "natty", - "nature", - "natured", -]; - -for (const word of wordList) { - await gtree.insert(k(word), `<${word}>`); - await stree.insert(k(word), `<${word}>`); - await mtree.insert(k(word), `<${word}>`); -} -await gtree.print(); - -await gtree.remove(k("natty")); -await stree.remove(k("natty")); -await mtree.remove(k("natty")); - -console.group("Tree contents"); - -await gtree.print(); - -console.groupEnd(); - -console.group("Prefixes of naturalistic"); - -for await (const [key, value] of gtree.prefixesOf(k("naturalistic"))) { - console.log(d(key), "-", value); -} - -console.log("---"); - -for await (const [key, value] of stree.prefixesOf(k("naturalistic"))) { - console.log(d(key), "-", value); -} - -console.log("---"); - -for await (const [key, value] of mtree.prefixesOf(k("naturalistic"))) { - console.log(d(key), "-", value); -} - -console.groupEnd(); - -console.group("Items prefixed by natural"); - -for await (const [key, value] of gtree.prefixedBy(k("natural"))) { - console.log(d(key), "-", value); -} - -console.log("---"); - -for await (const [key, value] of stree.prefixedBy(k("natural"))) { - console.log(d(key), "-", value); -} - -console.log("---"); - -for await (const [key, value] of mtree.prefixedBy(k("natural"))) { - console.log(d(key), "-", value); -} - -console.groupEnd(); - -Deno.bench( - "prefixes of (unnamed tree)", - { baseline: true, group: "prefixes" }, - async () => { - for await (const [key, value] of gtree.prefixesOf(k("naturalistic"))) { - // - } - }, -); - -Deno.bench( - "prefixes of (simple tree)", - { baseline: true, group: "prefixes" }, - async () => { - for await (const [key, value] of stree.prefixesOf(k("naturalistic"))) { - // - } - }, -); - -Deno.bench( - "prefixes of (memory tree)", - { baseline: true, group: "prefixes" }, - async () => { - for await (const [key, value] of mtree.prefixesOf(k("naturalistic"))) { - // - } - }, -); - -Deno.bench( - "prefixed by (unnamed tree)", - { baseline: true, group: "prefixedBy" }, - async () => { - for await (const [key, value] of gtree.prefixedBy(k("natural"))) { - // - } - }, -); - -Deno.bench( - "prefixed by (simple tree)", - { baseline: true, group: "prefixedBy" }, - async () => { - for await (const [key, value] of stree.prefixedBy(k("natural"))) { - // - } - }, -); - -Deno.bench( - "prefixed by (memory tree)", - { baseline: true, group: "prefixedBy" }, - async () => { - for await (const [key, value] of mtree.prefixedBy(k("natural"))) { - // - } - }, -); diff --git a/debug/radixish_tree.ts b/debug/radixish_tree.ts deleted file mode 100644 index 00b9565..0000000 --- a/debug/radixish_tree.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { RadixishTree } from "../src/replica/storage/prefix_iterators/radixish_tree.ts"; - -const rtree = new RadixishTree(); - -/* - -const p3 = new Uint8Array([23, 9]); -const p2 = new Uint8Array([23, 163]); -const p1 = new Uint8Array([23]); -const p4 = new Uint8Array([23, 163, 9]); - -const paths = [p3, p2, p1, p4]; - -for (const path of paths) { - await rtree.insert(path, path); - - console.group("after insert", path); - rtree.print(); - console.groupEnd(); -} - -await rtree.remove(p2); - -rtree.print(); - -*/ - -await rtree.insert( - new Uint8Array([128, 186, 15, 86]), - new Uint8Array([128, 186, 15, 86]), -); - -await rtree.insert(new Uint8Array([128, 186]), new Uint8Array([128, 186])); - -await rtree.insert( - new Uint8Array([128, 186, 15, 86, 190]), - new Uint8Array([128, 186, 15, 86, 190]), -); - -await rtree.insert( - new Uint8Array([128, 186, 15, 86, 190, 15]), - new Uint8Array([128, 186, 15, 86, 190, 15]), -); - -await rtree.remove(new Uint8Array([128, 186, 15, 86, 190])); - -rtree.print(); - -for await ( - const entry of rtree.prefixedBy(new Uint8Array([128, 186, 15, 86, 190])) -) { - console.log(entry); -} diff --git a/debug/replica.ts b/debug/replica.ts deleted file mode 100644 index 22d8f97..0000000 --- a/debug/replica.ts +++ /dev/null @@ -1,322 +0,0 @@ -import { Replica } from "../src/replica/replica.ts"; -import { crypto } from "$std/crypto/mod.ts"; -import { equals as bytesEquals } from "$std/bytes/equals.ts"; -import { ProtocolParameters } from "../src/replica/types.ts"; -import { getPersistedDrivers } from "../src/replica/util.ts"; -import { compareBytes } from "../src/util/bytes.ts"; -import { encodeEntry } from "../src/entries/encode_decode.ts"; - -async function makeKeypair() { - const { publicKey, privateKey } = await crypto.subtle.generateKey( - { - name: "ECDSA", - namedCurve: "P-256", - }, - true, - ["sign", "verify"], - ); - - return { - subspace: new Uint8Array( - await window.crypto.subtle.exportKey("raw", publicKey), - ), - privateKey, - }; -} - -function importPublicKey(raw: ArrayBuffer) { - return crypto.subtle.importKey( - "raw", - raw, - { - name: "ECDSA", - namedCurve: "P-256", - }, - true, - ["verify"], - ); -} - -const textEncoder = new TextEncoder(); -const textDecoder = new TextDecoder(); - -const authorPair = await makeKeypair(); -const author2Pair = await makeKeypair(); - -const protocolParameters: ProtocolParameters< - Uint8Array, - Uint8Array, - ArrayBuffer, - CryptoKey, - ArrayBuffer -> = { - namespaceScheme: { - encode: (v) => v, - decode: (v) => v, - encodedLength: (v) => v.byteLength, - isEqual: bytesEquals, - }, - subspaceScheme: { - encode: (v) => v, - decode: (v) => v.subarray(0, 65), - encodedLength: () => 65, - isEqual: bytesEquals, - }, - pathLengthEncoding: { - encode(length) { - return new Uint8Array([length]); - }, - decode(bytes) { - return bytes[0]; - }, - encodedLength() { - return 1; - }, - }, - payloadScheme: { - encode(hash) { - return new Uint8Array(hash); - }, - decode(bytes) { - return bytes.subarray(0, 32); - }, - encodedLength() { - return 32; - }, - async fromBytes(bytes) { - return new Uint8Array(await crypto.subtle.digest("SHA-256", bytes)); - }, - order(a, b) { - return compareBytes(new Uint8Array(a), new Uint8Array(b)) as 1 | 0 | -1; - }, - }, - authorisationScheme: { - async authorise(entry, secretKey) { - const encodedEntry = encodeEntry(entry, { - namespacePublicKeyEncoding: { - encode: (v) => v, - decode: (v) => v, - encodedLength: (v) => v.byteLength, - }, - subspacePublicKeyEncoding: { - encode: (v) => v, - decode: (v) => v, - encodedLength: (v) => v.byteLength, - }, - pathLengthScheme: { - encode(path) { - const bytes = new Uint8Array(1 + path.byteLength); - bytes[0] = path.byteLength; - - bytes.set(path, 1); - return bytes; - }, - decode(bytes) { - const length = bytes[0]; - return bytes.subarray(1, 1 + length); - }, - encodedLength(path) { - return 1 + path.byteLength; - }, - }, - payloadScheme: { - encode(hash) { - return new Uint8Array(hash); - }, - decode(bytes) { - return bytes.buffer; - }, - encodedLength(hash) { - return hash.byteLength; - }, - }, - }); - - const res = await crypto.subtle.sign( - { - name: "ECDSA", - hash: { name: "SHA-256" }, - }, - secretKey, - encodedEntry, - ); - - return new Uint8Array(res); - }, - async isAuthorised(entry, token) { - const cryptoKey = await importPublicKey(entry.identifier.subspace); - - const encodedEntry = encodeEntry(entry, { - namespacePublicKeyEncoding: { - encode: (v) => v, - decode: (v) => v, - encodedLength: (v) => v.byteLength, - }, - subspacePublicKeyEncoding: { - encode: (v) => v, - decode: (v) => v, - encodedLength: (v) => v.byteLength, - }, - pathLengthScheme: { - encode(path) { - const bytes = new Uint8Array(1 + path.byteLength); - bytes[0] = path.byteLength; - - bytes.set(path, 1); - return bytes; - }, - decode(bytes) { - const length = bytes[0]; - return bytes.subarray(1, 1 + length); - }, - encodedLength(path) { - return 1 + path.byteLength; - }, - }, - payloadScheme: { - encode(hash) { - return new Uint8Array(hash); - }, - decode(bytes) { - return bytes.buffer; - }, - encodedLength(hash) { - return hash.byteLength; - }, - }, - }); - - return crypto.subtle.verify( - { - name: "ECDSA", - hash: { name: "SHA-256" }, - }, - cryptoKey, - token, - encodedEntry, - ); - }, - tokenEncoding: { - encode: (ab) => new Uint8Array(ab), - decode: (bytes) => bytes.buffer, - encodedLength: (ab) => ab.byteLength, - }, - }, -}; - -const drivers = await getPersistedDrivers( - "./debug/replica_test", - protocolParameters, -); - -const replica = new Replica< - Uint8Array, - Uint8Array, - ArrayBuffer, - CryptoKey, - ArrayBuffer ->({ - namespace: new Uint8Array(new Uint8Array([137])), - protocolParameters, - //...drivers, -}); - -// Won't be inserted -await replica.set({ - path: textEncoder.encode("unauthorised"), - payload: textEncoder.encode("I should really not be here!"), - subspace: authorPair.subspace, -}, author2Pair.privateKey); - -// Two entries at the same path by different authors -// Both will be inserted! -await replica.set({ - path: textEncoder.encode("pathA"), - payload: textEncoder.encode("I'm here!"), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -await replica.set({ - path: textEncoder.encode("pathA"), - payload: textEncoder.encode("Me too!"), - subspace: author2Pair.subspace, -}, author2Pair.privateKey); - -// Two entries at another path, but by the same author. -// Only the second one will remain, it being later! -await replica.set({ - path: textEncoder.encode("pathB"), - payload: textEncoder.encode("I want to win... and shouldn't be here."), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -await replica.set({ - path: textEncoder.encode("pathB"), - payload: textEncoder.encode("I win!"), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -// The first and second will be removed! - -await replica.set({ - path: textEncoder.encode("prefixed"), - payload: textEncoder.encode("I am not newest... and shouldn't be here."), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -await replica.set({ - path: textEncoder.encode("prefixed2"), - payload: textEncoder.encode( - "I am not newest either... and shouldn't be here.", - ), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -await replica.set({ - path: textEncoder.encode("prefix"), - payload: textEncoder.encode("I'm the newest, and a prefix of the others!"), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -// The second one won't be inserted! - -await replica.set({ - path: textEncoder.encode("willbe"), - payload: textEncoder.encode( - "I am still the newest prefix!", - ), - timestamp: BigInt((Date.now() + 10) * 1000), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -await replica.set({ - path: textEncoder.encode("willbeprefixed"), - payload: textEncoder.encode("I shouldn't be here..."), - subspace: authorPair.subspace, -}, authorPair.privateKey); - -console.group("All entries"); - -for await ( - const [entry, payload, authToken] of replica.query({ - order: "path", - }) -) { - const pathName = textDecoder.decode(entry.identifier.path); - - console.group(`${pathName}`); - console.log( - `Subspace: ${entry.identifier.subspace.slice(0, 4)} (etc. etc.)`, - ); - console.log(`Timestamp: ${entry.record.timestamp}`); - - console.log(`Auth token: ${new Uint8Array(authToken).slice(0, 4)}... etc.`); - console.log( - `Payload: ${ - payload ? textDecoder.decode(await payload.bytes()) : "Not in possession" - }`, - ); - console.groupEnd(); -} - -console.groupEnd(); diff --git a/debug/skiplist_insert.ts b/debug/skiplist_insert.ts index 7e2282b..3d3e394 100644 --- a/debug/skiplist_insert.ts +++ b/debug/skiplist_insert.ts @@ -1,6 +1,15 @@ -import { KvDriverDeno } from "../src/replica/storage/kv/kv_driver_deno.ts"; -import { concatMonoid } from "../src/replica/storage/summarisable_storage/lifting_monoid.ts"; -import { Skiplist } from "../src/replica/storage/summarisable_storage/monoid_skiplist.ts"; +import { KvDriverDeno } from "../src/store/storage/kv/kv_driver_deno.ts"; +import { LiftingMonoid } from "../src/store/storage/summarisable_storage/lifting_monoid.ts"; +import { Skiplist } from "../src/store/storage/summarisable_storage/monoid_skiplist.ts"; + +const concatMonoid: LiftingMonoid = { + lift: (key: string, value: Uint8Array) => + Promise.resolve(key + new TextDecoder().decode(value)), + combine: (a: string, b: string) => { + return a + b; + }, + neutral: "", +}; const compare = (a: string, b: string) => { if (a > b) { diff --git a/debug/skiplist_remove.ts b/debug/skiplist_remove.ts index fb1cdfc..bd3d592 100644 --- a/debug/skiplist_remove.ts +++ b/debug/skiplist_remove.ts @@ -1,6 +1,15 @@ -import { KvDriverDeno } from "../src/replica/storage/kv/kv_driver_deno.ts"; -import { concatMonoid } from "../src/replica/storage/summarisable_storage/lifting_monoid.ts"; -import { Skiplist } from "../src/replica/storage/summarisable_storage/monoid_skiplist.ts"; +import { KvDriverDeno } from "../src/store/storage/kv/kv_driver_deno.ts"; +import { LiftingMonoid } from "../src/store/storage/summarisable_storage/lifting_monoid.ts"; +import { Skiplist } from "../src/store/storage/summarisable_storage/monoid_skiplist.ts"; + +const concatMonoid: LiftingMonoid = { + lift: (key: string, value: Uint8Array) => + Promise.resolve(key + new TextDecoder().decode(value)), + combine: (a: string, b: string) => { + return a + b; + }, + neutral: "", +}; const compare = (a: string, b: string) => { if (a > b) { diff --git a/deps.ts b/deps.ts index 1ff6de0..e8c9551 100644 --- a/deps.ts +++ b/deps.ts @@ -1,4 +1,4 @@ -export * from "https://deno.land/x/willow_utils@0.2.0/mod.ts"; +export * from "https://deno.land/x/willow_utils@0.2.1/mod.ts"; export { deferred } from "https://deno.land/std@0.202.0/async/deferred.ts"; export { concat } from "https://deno.land/std@0.202.0/bytes/concat.ts"; diff --git a/extra_requirements.md b/extra_requirements.md deleted file mode 100644 index 70fefa5..0000000 --- a/extra_requirements.md +++ /dev/null @@ -1,55 +0,0 @@ -# Requirements - -Here are some non-trivial things _not_ mandated by the Willow spec which this -library should do. - -## Drivers - -Willow, like Earthstar, must be able to run in different runtimes with their own -capabilities. I think the best way to do this is with drivers. - -### Replica drivers - -Replicas should be able to use drivers which let them persist and retrieve data -using different storage technologies. - -- [ ] In-memory -- [ ] Something which can be used from Deno -- [ ] IndexedDB - -### Sync transport drivers - -A sync session should be able to use different transport drivers, e.g. HTTP, -TCP. - -- [ ] Some kind of in-process, local adapter. -- [ ] WebSocket -- [ ] TCP - -## Events - -> Earthstar used all kinds of homespun solutions for events, but maybe it's time -> to just use -> [EventTarget](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget). - -### Replica events - -It must be possible to subscribe to some stream of events from a replica, -primarily for record updates. This makes it possible to write interfaces which -update as the underlying data does. - -### Sync events - -It must be possible to subscribe to some stream of events from a sync session, -with detailed information concerning the number of records requested and -received, as well as detailed information on transfers of record payloads. - -# Secret "not a requirement but would be nice" area - -## Streaming querying - -Earthstar had a `getQueryStream` function which would return the results of a -query as a readable stream. However, this was pretty superficial as all the -results were loaded into memory first. It would be great to have a truly -streaming version where results can be read out of the underlying DB one by one. -This is really handy for building indexes on very large sets of entries. diff --git a/mod.universal.ts b/mod.universal.ts index 6748856..f930c19 100644 --- a/mod.universal.ts +++ b/mod.universal.ts @@ -1,15 +1,17 @@ -export * from "./src/replica/types.ts"; -export * from "./src/replica/replica.ts"; +export * from "./src/store/types.ts"; +export * from "./src/store/store.ts"; -export * from "./src/replica/storage/entry_drivers/memory.ts"; +export * from "./src/store/storage/entry_drivers/memory.ts"; -export * from "./src/replica/storage/payload_drivers/memory.ts"; +export * from "./src/store/storage/payload_drivers/memory.ts"; -export * from "./src/replica/storage/prefix_iterators/types.ts"; -export * from "./src/replica/storage/prefix_iterators/radixish_tree.ts"; +export * from "./src/store/storage/prefix_iterators/types.ts"; +export * from "./src/store/storage/prefix_iterators/radix_tree.ts"; -export * from "./src/replica/storage/summarisable_storage/types.ts"; -export * from "./src/replica/storage/summarisable_storage/monoid_rbtree.ts"; -export * from "./src/replica/storage/summarisable_storage/lifting_monoid.ts"; +export * from "./src/store/storage/summarisable_storage/types.ts"; +export * from "./src/store/storage/summarisable_storage/monoid_rbtree.ts"; +export * from "./src/store/storage/summarisable_storage/lifting_monoid.ts"; + +export * from "./src/utils/encryption.ts"; export * from "./src/errors.ts"; diff --git a/src/replica/storage/storage_3d/triple_storage.ts b/src/replica/storage/storage_3d/triple_storage.ts deleted file mode 100644 index bdf89cd..0000000 --- a/src/replica/storage/storage_3d/triple_storage.ts +++ /dev/null @@ -1,581 +0,0 @@ -import { - ANY_SUBSPACE, - AreaOfInterest, - bigintToBytes, - concat, - Entry, - isIncludedRange, - isPathPrefixed, - OPEN_END, - orderTimestamp, - Path, - PathScheme, - successorPrefix, -} from "../../../../deps.ts"; -import { - FingerprintScheme, - PayloadScheme, - QueryOrder, - SubspaceScheme, -} from "../../types.ts"; -import { - decodeEntryKey, - decodeSummarisableStorageValue, - encodeEntryKeys, - encodeSummarisableStorageValue, -} from "../../util.ts"; -import { LiftingMonoid } from "../summarisable_storage/lifting_monoid.ts"; -import { SummarisableStorage } from "../summarisable_storage/types.ts"; -import { Storage3d } from "./types.ts"; - -export type TripleStorageOpts< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint, -> = { - namespace: NamespaceKey; - /** Creates a {@link SummarisableStorage} with a given ID, used for storing entries and their data. */ - createSummarisableStorage: ( - monoid: LiftingMonoid, - id: string, - ) => SummarisableStorage; - subspaceScheme: SubspaceScheme; - payloadScheme: PayloadScheme; - pathScheme: PathScheme; - fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint - >; -}; - -export class TripleStorage< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint, -> implements - Storage3d< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint - > { - private namespace: NamespaceKey; - - private ptsStorage: SummarisableStorage; - private sptStorage: SummarisableStorage; - private tspStorage: SummarisableStorage; - private subspaceScheme: SubspaceScheme; - private payloadScheme: PayloadScheme; - private pathScheme: PathScheme; - private fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint - >; - - constructor( - opts: TripleStorageOpts< - NamespaceKey, - SubspaceKey, - PayloadDigest, - Fingerprint - >, - ) { - this.namespace = opts.namespace; - - const lift = ( - key: Uint8Array, - value: Uint8Array, - order: "path" | "subspace" | "timestamp", - ) => { - const values = decodeSummarisableStorageValue( - value, - this.payloadScheme, - ); - - // Decode the key. - const { subspace, timestamp, path } = decodeEntryKey( - key, - order, - this.subspaceScheme, - values.encodedPathLength, - ); - - const entry: Entry = { - namespaceId: this.namespace, - subspaceId: subspace, - path, - timestamp, - payloadDigest: values.payloadHash, - payloadLength: values.payloadLength, - }; - - return opts.fingerprintScheme.fingerprintSingleton(entry); - }; - - this.ptsStorage = opts.createSummarisableStorage({ - lift: (key, value) => lift(key, value, "path"), - combine: opts.fingerprintScheme.fingerprintCombine, - neutral: opts.fingerprintScheme.neutral, - }, "pts"); - this.sptStorage = opts.createSummarisableStorage({ - lift: (key, value) => lift(key, value, "subspace"), - combine: opts.fingerprintScheme.fingerprintCombine, - neutral: opts.fingerprintScheme.neutral, - }, "spt"); - this.tspStorage = opts.createSummarisableStorage({ - lift: (key, value) => lift(key, value, "timestamp"), - combine: opts.fingerprintScheme.fingerprintCombine, - neutral: opts.fingerprintScheme.neutral, - }, "tsp"); - - this.subspaceScheme = opts.subspaceScheme; - this.payloadScheme = opts.payloadScheme; - this.pathScheme = opts.pathScheme; - - this.fingerprintScheme = opts.fingerprintScheme; - } - - async get( - subspace: SubspaceKey, - path: Path, - ): Promise< - { - entry: Entry; - authTokenHash: PayloadDigest; - } | undefined - > { - const firstResult = this.query({ - area: { - includedSubspaceId: subspace, - pathPrefix: path, - timeRange: { - start: BigInt(0), - end: OPEN_END, - }, - }, - maxCount: 1, - maxSize: BigInt(0), - }, "subspace"); - - for await (const result of firstResult) { - return result; - } - } - - async insert( - { path, subspace, payloadDigest, timestamp, length, authTokenDigest }: { - path: Path; - subspace: SubspaceKey; - payloadDigest: PayloadDigest; - timestamp: bigint; - length: bigint; - authTokenDigest: PayloadDigest; - }, - ): Promise { - const keys = encodeEntryKeys( - { - path, - timestamp, - subspace, - subspaceEncoding: this.subspaceScheme, - }, - ); - - // console.log(keys.spt); - - const toStore = encodeSummarisableStorageValue( - { - payloadDigest, - payloadLength: length, - authTokenDigest: authTokenDigest, - payloadScheme: this.payloadScheme, - encodedPathLength: keys.encodedPathLength, - }, - ); - - await Promise.all([ - this.ptsStorage.insert(keys.pts, toStore), - this.sptStorage.insert(keys.spt, toStore), - this.tspStorage.insert(keys.tsp, toStore), - ]); - } - - async remove( - entry: Entry, - ): Promise { - const keys = encodeEntryKeys( - { - path: entry.path, - timestamp: entry.timestamp, - subspace: entry.subspaceId, - subspaceEncoding: this.subspaceScheme, - }, - ); - - const results = await Promise.all([ - this.ptsStorage.remove(keys.pts), - this.tspStorage.remove(keys.tsp), - this.sptStorage.remove(keys.spt), - ]); - - return results[0]; - } - - async summarise( - areaOfInterest: AreaOfInterest, - ): Promise<{ fingerprint: Fingerprint; size: number }> { - let fingerprint = this.fingerprintScheme.neutral; - /** The size of the fingerprint. */ - let size = 0; - - let countUsed = 0; - let sizeUsed = BigInt(0); - - // Iterate through all the entries of each range. - const subspaceEntriesLowerBound = - areaOfInterest.area.includedSubspaceId === ANY_SUBSPACE - ? undefined - : areaOfInterest.area.includedSubspaceId; - const subspaceEntriesUpperBound = - areaOfInterest.area.includedSubspaceId === ANY_SUBSPACE - ? undefined - : this.subspaceScheme.successor(areaOfInterest.area.includedSubspaceId); - - const subspaceEntries = this.sptStorage.entries( - subspaceEntriesLowerBound - ? this.subspaceScheme.encode(subspaceEntriesLowerBound) - : undefined, - subspaceEntriesUpperBound - ? this.subspaceScheme.encode(subspaceEntriesUpperBound) - : undefined, - { - reverse: true, - }, - ); - - /** The least excluded item we've run into. - * This is going to be the upper bound of a summarise op we run when we detect a contiguous range of included entries. - */ - let leastExcluded = this.subspaceScheme.encode( - subspaceEntriesUpperBound - ? subspaceEntriesUpperBound - : this.subspaceScheme.minimalSubspaceKey, - ); - - /** The least included item we've run into. - * This is going to be the lower bound of a summarise op we run when we detect a contiguous range of included entries. - */ - let leastIncluded: Uint8Array | undefined; - - /** Run this when we detect a contiguous range of included entries. */ - const updateFingerprint = async (start: Uint8Array) => { - const { fingerprint: includedFp, size: includedSize } = await this - .sptStorage.summarise( - start, - leastExcluded, - ); - - fingerprint = this.fingerprintScheme.fingerprintCombine( - fingerprint, - includedFp, - ); - - size += includedSize; - - // Prevent this from running again until we run into another included entry. - leastIncluded = undefined; - }; - - for await (const subspaceEntry of subspaceEntries) { - // Decode the key. - const values = decodeSummarisableStorageValue( - subspaceEntry.value, - this.payloadScheme, - ); - - // Decode the key. - const { timestamp, path } = decodeEntryKey( - subspaceEntry.key, - "subspace", - this.subspaceScheme, - values.encodedPathLength, - ); - - // Check that decoded time and subspace are included by both other dimensions - let pathIncluded = false; - - if (isPathPrefixed(areaOfInterest.area.pathPrefix, path)) { - pathIncluded = true; - } - - // If it's not included, and we ran into an included item earlier, - // that indicates the end of a contiguous range. - // Recalculate the fingerprint! - if (!pathIncluded) { - if (leastIncluded) { - await updateFingerprint(leastIncluded); - } - - // This entry is now the least excluded entry we've run into. - leastExcluded = subspaceEntry.key; - continue; - } - - let timeIncluded = false; - - if ( - isIncludedRange( - orderTimestamp, - areaOfInterest.area.timeRange, - timestamp, - ) - ) { - timeIncluded = true; - } - - // If it's not included, and we ran into an included item earlier, - // that indicates the end of a contiguous range. - // Recalculate the fingerprint! - if (!timeIncluded) { - if (leastIncluded) { - await updateFingerprint(leastIncluded); - } - - // This entry is now the least excluded entry we've run into. - leastExcluded = subspaceEntry.key; - continue; - } - - // Now we know this entry is included. - - // Check all dimension count and size limits. - // If any limits have been exceeded, we have to stop here. - - // Boring. - - const nextCountUsed = countUsed + 1; - const nextSizeUsed = sizeUsed + values.payloadLength; - - if ( - (areaOfInterest.maxCount !== 0 && - nextCountUsed > areaOfInterest.maxCount) || - (areaOfInterest.maxSize !== BigInt(0) && - nextSizeUsed > areaOfInterest.maxSize) - ) { - break; - } - - countUsed = nextCountUsed; - sizeUsed = nextSizeUsed; - - // This entry is part of a contiguous range of included entries, - // and it's the least included key we've encountered so far. - leastIncluded = subspaceEntry.key; - } - - // Calculate a range that was left over, if any. - if (leastIncluded) { - await updateFingerprint(leastIncluded); - } - - return { - fingerprint, - size, - }; - } - - async *query( - areaOfInterest: AreaOfInterest, - order: QueryOrder, - reverse = false, - ): AsyncIterable<{ - entry: Entry; - authTokenHash: PayloadDigest; - }> { - const storage = order === "subspace" - ? this.sptStorage - : order === "path" - ? this.ptsStorage - : this.tspStorage; - - const includesAllTime = areaOfInterest.area.timeRange.start === BigInt(0) && - areaOfInterest.area.timeRange.end === OPEN_END; - const includesAllPaths = areaOfInterest.area.pathPrefix.length === 0; - const includesAllSubspaces = - areaOfInterest.area.includedSubspaceId === ANY_SUBSPACE; - - // Do the simplest thing if the area starts from the lowest value and is open ended in all dimensions. - if ( - includesAllTime && - includesAllPaths && - includesAllSubspaces && - areaOfInterest.maxSize === BigInt(0) && - areaOfInterest.maxCount === 0 - ) { - const allEntriesOnOrder = storage.entries(undefined, undefined, { - limit: areaOfInterest.maxCount, - reverse: reverse, - }); - - for await (const { key, value } of allEntriesOnOrder) { - const values = decodeSummarisableStorageValue( - value, - this.payloadScheme, - ); - - // Decode the key. - const { subspace, timestamp, path } = decodeEntryKey( - key, - order, - this.subspaceScheme, - values.encodedPathLength, - ); - - yield { - entry: { - namespaceId: this.namespace, - subspaceId: subspace, - path, - payloadDigest: values.payloadHash, - payloadLength: values.payloadLength, - timestamp, - }, - authTokenHash: values.authTokenHash, - }; - } - - return; - } - - let lowerBound: Uint8Array | undefined; - let upperBound: Uint8Array | undefined; - - if (order === "path") { - lowerBound = concat(...areaOfInterest.area.pathPrefix); - - const maybeSuccessorPrefix = successorPrefix( - areaOfInterest.area.pathPrefix, - ); - - if (maybeSuccessorPrefix) { - upperBound = concat(...maybeSuccessorPrefix); - } - } else if ( - order === "subspace" && - areaOfInterest.area.includedSubspaceId !== ANY_SUBSPACE - ) { - lowerBound = this.subspaceScheme.encode( - areaOfInterest.area.includedSubspaceId, - ); - - const maybeSuccessorSubspace = this.subspaceScheme.successor( - areaOfInterest.area.includedSubspaceId, - ); - - if (maybeSuccessorSubspace) { - upperBound = this.subspaceScheme.encode(maybeSuccessorSubspace); - } - } else if (order === "timestamp") { - if (areaOfInterest.area.timeRange.start > BigInt(0)) { - lowerBound = bigintToBytes(areaOfInterest.area.timeRange.start); - } - - if (areaOfInterest.area.timeRange.end !== OPEN_END) { - upperBound = bigintToBytes(areaOfInterest.area.timeRange.start); - } - } - - let entriesYielded = 0; - let payloadBytesYielded = BigInt(0); - - const iterator = storage.entries(lowerBound, upperBound, { - reverse, - }); - - for await (const { key, value } of iterator) { - const values = decodeSummarisableStorageValue( - value, - this.payloadScheme, - ); - - // Decode the key. - const { subspace, timestamp, path } = decodeEntryKey( - key, - order, - this.subspaceScheme, - values.encodedPathLength, - ); - - if ( - (order === "path" || order === "timestamp") && - areaOfInterest.area.includedSubspaceId !== ANY_SUBSPACE - ) { - const isSubspace = this.subspaceScheme.order( - subspace, - areaOfInterest.area.includedSubspaceId, - ); - - if (!isSubspace) { - continue; - } - } - - if ((order === "path" || order === "subspace") && !includesAllTime) { - const isIncluded = isIncludedRange( - orderTimestamp, - areaOfInterest.area.timeRange, - timestamp, - ); - - if (!isIncluded) { - continue; - } - } - - if ( - (order === "subspace" || order === "timestamp") && !includesAllPaths - ) { - const isIncluded = isPathPrefixed(areaOfInterest.area.pathPrefix, path); - - if (!isIncluded) { - continue; - } - } - - entriesYielded += 1; - payloadBytesYielded += values.payloadLength; - - if ( - areaOfInterest.maxSize !== BigInt(0) && - payloadBytesYielded >= areaOfInterest.maxSize - ) { - break; - } - - yield { - entry: { - namespaceId: this.namespace, - subspaceId: subspace, - path, - payloadDigest: values.payloadHash, - payloadLength: values.payloadLength, - timestamp, - }, - authTokenHash: values.authTokenHash, - }; - - if ( - areaOfInterest.maxCount !== 0 && - entriesYielded >= areaOfInterest.maxCount - ) { - break; - } - } - } -} diff --git a/src/replica/util.ts b/src/replica/util.ts deleted file mode 100644 index 5d257f7..0000000 --- a/src/replica/util.ts +++ /dev/null @@ -1,364 +0,0 @@ -import { join } from "https://deno.land/std@0.188.0/path/mod.ts"; -import { EntryDriverKvStore } from "./storage/entry_drivers/kv_store.ts"; -import { PayloadDriverFilesystem } from "./storage/payload_drivers/filesystem.ts"; -import { PayloadScheme, ProtocolParameters } from "./types.ts"; -import { ensureDir } from "https://deno.land/std@0.188.0/fs/ensure_dir.ts"; -import { bigintToBytes, concat, EncodingScheme, Path } from "../../deps.ts"; -import { KvDriverDeno } from "./storage/kv/kv_driver_deno.ts"; - -/** Create a pair of entry and payload drivers for use with a {@link Replica} which will store their data at a given filesystem path. */ -export async function getPersistedDrivers< - NamespacePublicKey, - SubspacePublicKey, - PayloadDigest, - AuthorisationOpts, - AuthorisationToken, - Fingerprint, ->( - /** The filesystem path to store entry and payload data within. */ - path: string, - protocolParameters: ProtocolParameters< - NamespacePublicKey, - SubspacePublicKey, - PayloadDigest, - AuthorisationOpts, - AuthorisationToken, - Fingerprint - >, -) { - const kvPath = join(path, "entries"); - const payloadPath = join(path, "payloads"); - - await ensureDir(path); - - // TODO: Use the platform appropriate KV driver. - const kv = await Deno.openKv(kvPath); - - return { - entryDriver: new EntryDriverKvStore({ - ...protocolParameters, - kvDriver: new KvDriverDeno(kv), - }), - payloadDriver: new PayloadDriverFilesystem( - payloadPath, - protocolParameters.payloadScheme, - ), - }; -} - -// Keys - -export function encodePathWithSeparators(path: Path): Uint8Array { - const encodedComponents: Uint8Array[] = []; - - for (const component of path) { - const bytes: number[] = []; - - for (const byte of component) { - if (byte !== 0) { - bytes.push(byte); - continue; - } - - bytes.push(0, 1); - } - - bytes.push(0, 0); - const encodedComponent = new Uint8Array(bytes); - encodedComponents.push(encodedComponent); - } - - return concat(...encodedComponents); -} - -export function decodePathWithSeparators( - encoded: Uint8Array, -): Path { - const path: Path = []; - - let currentComponentBytes = []; - let previousWasZero = false; - - for (const byte of encoded) { - if (previousWasZero && byte === 0) { - // Separator - previousWasZero = false; - - const component = new Uint8Array(currentComponentBytes); - - path.push(component); - - currentComponentBytes = []; - - continue; - } - - if (previousWasZero && byte === 1) { - // Encoded zero. - currentComponentBytes.push(0); - previousWasZero = false; - continue; - } - - if (byte === 0) { - previousWasZero = true; - continue; - } - - currentComponentBytes.push(byte); - previousWasZero = false; - } - - return path; -} - -export function encodeEntryKeys( - opts: { - path: Path; - timestamp: bigint; - subspace: SubspacePublicKey; - subspaceEncoding: EncodingScheme; - }, -): { - spt: Uint8Array; - pts: Uint8Array; - tsp: Uint8Array; - encodedPathLength: number; -} { - const encodedSubspace = opts.subspaceEncoding.encode(opts.subspace); - - const encodedPath = encodePathWithSeparators(opts.path); - - const keyLength = 8 + encodedPath.byteLength + - encodedSubspace.byteLength; - - const sptBytes = new Uint8Array(keyLength); - const ptsBytes = new Uint8Array(keyLength); - const tspBytes = new Uint8Array(keyLength); - - // Subspace, path, timestamp - sptBytes.set(encodedSubspace, 0); - sptBytes.set( - encodedPath, - encodedSubspace.byteLength, - ); - const sptDv = new DataView(sptBytes.buffer); - sptDv.setBigUint64( - encodedSubspace.byteLength + encodedPath.byteLength, - opts.timestamp, - ); - - // Path, timestamp, subspace - ptsBytes.set(encodedPath, 0); - const ptsDv = new DataView(ptsBytes.buffer); - ptsDv.setBigUint64( - encodedPath.byteLength, - opts.timestamp, - ); - ptsBytes.set(encodedSubspace, encodedPath.byteLength + 8); - - // Timestamp, subspace, path - const tapDv = new DataView(tspBytes.buffer); - tapDv.setBigUint64( - 0, - opts.timestamp, - ); - tspBytes.set(encodedSubspace, 8); - tspBytes.set(encodedPath, 8 + encodedSubspace.byteLength); - - return { - spt: sptBytes, - pts: ptsBytes, - tsp: tspBytes, - encodedPathLength: encodedPath.byteLength, - }; -} - -export function decodeEntryKey( - encoded: Uint8Array, - order: "subspace" | "path" | "timestamp", - subspaceEncoding: EncodingScheme, - encodedPathLength: number, -): { - subspace: SubspacePublicKey; - path: Path; - timestamp: bigint; -} { - let subspace: SubspacePublicKey; - let timestamp: bigint; - let path: Path; - - switch (order) { - case "subspace": { - subspace = subspaceEncoding.decode(encoded); - - const encodedSubspaceLength = subspaceEncoding.encodedLength(subspace); - - const pathComponentPos = encodedSubspaceLength; - - path = decodePathWithSeparators( - encoded.subarray( - pathComponentPos, - pathComponentPos + encodedPathLength, - ), - ); - - const dataView = new DataView(encoded.buffer); - timestamp = dataView.getBigUint64(encoded.byteLength - 8); - - break; - } - case "path": { - path = decodePathWithSeparators( - encoded.subarray( - 0, - encodedPathLength, - ), - ); - - const dataView = new DataView(encoded.buffer); - - timestamp = dataView.getBigUint64( - encodedPathLength, - ); - - subspace = subspaceEncoding.decode(encoded.subarray( - encodedPathLength + 8, - )); - - break; - } - case "timestamp": { - const dataView = new DataView(encoded.buffer); - timestamp = dataView.getBigUint64( - 0, - ); - - subspace = subspaceEncoding.decode( - encoded.subarray(8), - ); - - const encodedSubspaceLength = subspaceEncoding.encodedLength(subspace); - - path = decodePathWithSeparators( - encoded.subarray( - encodedSubspaceLength, - encodedSubspaceLength + encodedPathLength, - ), - ); - } - } - - return { - subspace, - path, - timestamp, - }; -} - -export function encodeSummarisableStorageValue( - { - authTokenDigest, - payloadDigest, - payloadLength, - payloadScheme, - encodedPathLength, - }: { - authTokenDigest: PayloadDigest; - payloadDigest: PayloadDigest; - payloadLength: bigint; - payloadScheme: PayloadScheme; - encodedPathLength: number; - }, -): Uint8Array { - const pathLengthBytes = new Uint8Array(4); - const view = new DataView(pathLengthBytes.buffer); - view.setUint32(0, encodedPathLength); - - return concat( - pathLengthBytes, - bigintToBytes(payloadLength), - payloadScheme.encode(payloadDigest), - payloadScheme.encode(authTokenDigest), - ); -} - -export function decodeSummarisableStorageValue( - encoded: Uint8Array, - payloadEncoding: EncodingScheme, -): { - encodedPathLength: number; - payloadLength: bigint; - payloadHash: PayloadDigest; - authTokenHash: PayloadDigest; -} { - const dataView = new DataView(encoded.buffer); - - const encodedPathLength = dataView.getUint32(0); - - const payloadLength = dataView.getBigUint64(4); - - const payloadHash = payloadEncoding.decode( - encoded.subarray(4 + 8), - ); - - const payloadHashLength = payloadEncoding.encodedLength(payloadHash); - - const authTokenHash = payloadEncoding.decode( - encoded.subarray(4 + 8 + payloadHashLength), - ); - - return { - encodedPathLength, - payloadLength, - payloadHash, - authTokenHash, - }; -} - -// The successor of a path depends on the maximum length a path can have. -// Once a path reaches the maximum length, the bytestring is incremented to the left, -// e.g. [0, 0, 0, 255] -> [0, 0, 1, 255]. -export function makeSuccessorPath( - maxLength: number, -): (bytes: Uint8Array) => Uint8Array { - return (bytes: Uint8Array) => { - if (bytes.byteLength < maxLength) { - const newBytes = new Uint8Array(bytes.byteLength + 1); - - newBytes.set(bytes, 0); - newBytes.set([0], bytes.byteLength); - - return newBytes; - } else { - return incrementBytesLeft(bytes); - } - }; -} - -function incrementBytesLeft(bytes: Uint8Array): Uint8Array { - const newBytes = new Uint8Array(bytes.byteLength); - - const last = bytes[bytes.byteLength - 1]; - - if (last === 255 && bytes.byteLength > 1) { - newBytes.set([last + 1], bytes.byteLength - 1); - - const left = incrementBytesLeft(bytes.slice(0, bytes.byteLength - 1)); - - if (last === 255 && left[left.byteLength - 1] === 255) { - return bytes; - } - - newBytes.set(left, 0); - - return newBytes; - } else if (last === 255) { - return bytes; - } else { - newBytes.set([last + 1], bytes.byteLength - 1); - newBytes.set(bytes.slice(0, bytes.byteLength - 1), 0); - - return newBytes; - } -} diff --git a/src/replica/events.ts b/src/store/events.ts similarity index 100% rename from src/replica/events.ts rename to src/store/events.ts diff --git a/src/replica/storage/entry_drivers/kv_store.ts b/src/store/storage/entry_drivers/kv_store.ts similarity index 88% rename from src/replica/storage/entry_drivers/kv_store.ts rename to src/store/storage/entry_drivers/kv_store.ts index f8b2dc8..9f4b3d1 100644 --- a/src/replica/storage/entry_drivers/kv_store.ts +++ b/src/store/storage/entry_drivers/kv_store.ts @@ -22,19 +22,19 @@ import { Skiplist } from "../summarisable_storage/monoid_skiplist.ts"; import { EntryDriver } from "../types.ts"; type EntryDriverKvOpts< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > = { kvDriver: KvDriver; - namespaceScheme: NamespaceScheme; - subspaceScheme: SubspaceScheme; + namespaceScheme: NamespaceScheme; + subspaceScheme: SubspaceScheme; payloadScheme: PayloadScheme; pathScheme: PathScheme; fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; @@ -42,24 +42,24 @@ type EntryDriverKvOpts< /** Store and retrieve entries in a key-value store. */ export class EntryDriverKvStore< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > implements EntryDriver< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint > { - private namespaceScheme: NamespaceScheme; - private subspaceScheme: SubspaceScheme; + private namespaceScheme: NamespaceScheme; + private subspaceScheme: SubspaceScheme; private payloadScheme: PayloadScheme; private pathScheme: PathScheme; private fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; @@ -69,8 +69,8 @@ export class EntryDriverKvStore< constructor( opts: EntryDriverKvOpts< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >, @@ -89,8 +89,8 @@ export class EntryDriverKvStore< } makeStorage( - namespace: NamespaceKey, - ): Storage3d { + namespace: NamespaceId, + ): Storage3d { const prefixedStorageDriver = new PrefixedDriver( ["entries"], this.kvDriver, @@ -173,7 +173,7 @@ export class EntryDriverKvStore< return entry; }, flagInsertion: async ( - entry: Entry, + entry: Entry, authTokenHash: PayloadDigest, ) => { const entryEncoded = encodeEntry({ @@ -196,7 +196,7 @@ export class EntryDriverKvStore< ); }, - flagRemoval: (entry: Entry) => { + flagRemoval: (entry: Entry) => { const entryEncoded = encodeEntry({ namespaceScheme: this.namespaceScheme, subspaceScheme: this.subspaceScheme, diff --git a/src/replica/storage/entry_drivers/memory.ts b/src/store/storage/entry_drivers/memory.ts similarity index 78% rename from src/replica/storage/entry_drivers/memory.ts rename to src/store/storage/entry_drivers/memory.ts index 5fa9adc..9d63ac9 100644 --- a/src/replica/storage/entry_drivers/memory.ts +++ b/src/store/storage/entry_drivers/memory.ts @@ -12,17 +12,17 @@ import { Entry, orderBytes, PathScheme } from "../../../../deps.ts"; import { RadixTree } from "../prefix_iterators/radix_tree.ts"; type EntryDriverMemoryOpts< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > = { - subspaceScheme: SubspaceScheme; + subspaceScheme: SubspaceScheme; payloadScheme: PayloadScheme; pathScheme: PathScheme; fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; @@ -30,31 +30,30 @@ type EntryDriverMemoryOpts< /** Store and retrieve entries in memory. */ export class EntryDriverMemory< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, -> implements - EntryDriver { +> implements EntryDriver { constructor( readonly opts: EntryDriverMemoryOpts< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >, ) {} private wafInsert: - | [Entry, PayloadDigest] + | [Entry, PayloadDigest] | undefined; private wafRemove: - | Entry + | Entry | undefined; makeStorage( - namespace: NamespaceKey, - ): Storage3d { + namespace: NamespaceId, + ): Storage3d { return new TripleStorage({ namespace, createSummarisableStorage: ( @@ -85,14 +84,14 @@ export class EntryDriverMemory< return Promise.resolve(this.wafRemove); }, flagInsertion: ( - entry: Entry, + entry: Entry, authTokenHash: PayloadDigest, ) => { this.wafInsert = [entry, authTokenHash]; return Promise.resolve(); }, - flagRemoval: (entry: Entry) => { + flagRemoval: (entry: Entry) => { this.wafRemove = entry; return Promise.resolve(); diff --git a/src/replica/storage/kv/kv_driver_deno.ts b/src/store/storage/kv/kv_driver_deno.ts similarity index 100% rename from src/replica/storage/kv/kv_driver_deno.ts rename to src/store/storage/kv/kv_driver_deno.ts diff --git a/src/replica/storage/kv/prefixed_driver.ts b/src/store/storage/kv/prefixed_driver.ts similarity index 100% rename from src/replica/storage/kv/prefixed_driver.ts rename to src/store/storage/kv/prefixed_driver.ts diff --git a/src/replica/storage/kv/types.ts b/src/store/storage/kv/types.ts similarity index 100% rename from src/replica/storage/kv/types.ts rename to src/store/storage/kv/types.ts diff --git a/src/replica/storage/payload_drivers/filesystem.ts b/src/store/storage/payload_drivers/filesystem.ts similarity index 100% rename from src/replica/storage/payload_drivers/filesystem.ts rename to src/store/storage/payload_drivers/filesystem.ts diff --git a/src/replica/storage/payload_drivers/memory.ts b/src/store/storage/payload_drivers/memory.ts similarity index 100% rename from src/replica/storage/payload_drivers/memory.ts rename to src/store/storage/payload_drivers/memory.ts diff --git a/src/replica/storage/prefix_iterators/prefix_iterator.test.ts b/src/store/storage/prefix_iterators/prefix_iterator.test.ts similarity index 100% rename from src/replica/storage/prefix_iterators/prefix_iterator.test.ts rename to src/store/storage/prefix_iterators/prefix_iterator.test.ts diff --git a/src/replica/storage/prefix_iterators/radix_tree.ts b/src/store/storage/prefix_iterators/radix_tree.ts similarity index 100% rename from src/replica/storage/prefix_iterators/radix_tree.ts rename to src/store/storage/prefix_iterators/radix_tree.ts diff --git a/src/replica/storage/prefix_iterators/simple_key_iterator.ts b/src/store/storage/prefix_iterators/simple_key_iterator.ts similarity index 100% rename from src/replica/storage/prefix_iterators/simple_key_iterator.ts rename to src/store/storage/prefix_iterators/simple_key_iterator.ts diff --git a/src/replica/storage/prefix_iterators/types.ts b/src/store/storage/prefix_iterators/types.ts similarity index 100% rename from src/replica/storage/prefix_iterators/types.ts rename to src/store/storage/prefix_iterators/types.ts diff --git a/src/replica/storage/storage_3d/storage_3d.test.ts b/src/store/storage/storage_3d/storage_3d.test.ts similarity index 80% rename from src/replica/storage/storage_3d/storage_3d.test.ts rename to src/store/storage/storage_3d/storage_3d.test.ts index 94fe823..b4ce161 100644 --- a/src/replica/storage/storage_3d/storage_3d.test.ts +++ b/src/store/storage/storage_3d/storage_3d.test.ts @@ -4,6 +4,8 @@ import { AreaOfInterest, bigintToBytes, concat, + encodeBase64, + encodeEntry, Entry, isIncludedRange, isPathPrefixed, @@ -26,17 +28,24 @@ import { testSchemePayload, testSchemeSubspace, } from "../../../test/test_schemes.ts"; -import { randomPath, randomTimestamp } from "../../../test/utils.ts"; -import { ProtocolParameters } from "../../types.ts"; +import { + getSubspaces, + randomPath, + randomTimestamp, +} from "../../../test/utils.ts"; +import { ProtocolParameters, QueryOrder } from "../../types.ts"; import { MonoidRbTree } from "../summarisable_storage/monoid_rbtree.ts"; -import { TripleStorage } from "./triple_storage.ts"; +import { encodePathWithSeparators, TripleStorage } from "./triple_storage.ts"; import { Storage3d } from "./types.ts"; import { assertEquals } from "https://deno.land/std@0.202.0/assert/assert_equals.ts"; -import { encodePathWithSeparators } from "../../util.ts"; +import { sample } from "https://deno.land/std@0.202.0/collections/mod.ts"; + +import { Store } from "../../store.ts"; +import { RadixTree } from "../prefix_iterators/radix_tree.ts"; export type Storage3dScenario< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, @@ -44,10 +53,10 @@ export type Storage3dScenario< > = { name: string; makeScenario: ( - namespace: NamespaceKey, + namespace: NamespaceId, params: ProtocolParameters< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, @@ -56,8 +65,8 @@ export type Storage3dScenario< ) => Promise< { storage: Storage3d< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; @@ -69,17 +78,17 @@ export type Storage3dScenario< const tripleStorageScenario = { name: "Triple storage", makeScenario: < - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, Fingerprint, >( - namespace: NamespaceKey, + namespace: NamespaceId, params: ProtocolParameters< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, @@ -263,7 +272,7 @@ Deno.test("Storage3d.summarise", async () => { if (a > b) return 1; return 0; }, - minimalSubspaceKey: 0, + minimalSubspaceId: 0, successor(a: number) { return a + 1; }, @@ -537,7 +546,6 @@ Deno.test("Storage3d.summarise", async () => { } }); -/* Deno.test("Storage3d.query", async (test) => { for (const scenario of scenarios) { const namespaceKeypair = await makeNamespaceKeypair(); @@ -554,14 +562,13 @@ Deno.test("Storage3d.query", async (test) => { }, ); - - const replica = new Replica({ + const store = new Store({ namespace: namespaceKeypair.namespace, protocolParameters: { authorisationScheme: testSchemeAuthorisation, namespaceScheme: testSchemeNamespace, subspaceScheme: testSchemeSubspace, - pathLengthScheme: testSchemePathLength, + pathScheme: testSchemePath, payloadScheme: testSchemePayload, fingerprintScheme: testSchemeFingerprint, }, @@ -569,7 +576,7 @@ Deno.test("Storage3d.query", async (test) => { makeStorage: () => { return storage; }, - prefixIterator: new RadixishTree(), + prefixIterator: new RadixTree(), writeAheadFlag: { wasInserting: () => Promise.resolve(undefined), wasRemoving: () => Promise.resolve(undefined), @@ -581,11 +588,32 @@ Deno.test("Storage3d.query", async (test) => { }, }); - await test.step(scenario.name, async () => { // Generate the test queries - const areaParams: AreaOfInterest[] = []; + const queryParams: { + aoi: AreaOfInterest; + order: QueryOrder; + reverse: boolean; + }[] = []; + + const subspaces = await getSubspaces(10); + const bytes = []; + const paths = []; + + for (let i = 0; i < 50; i++) { + paths.push(randomPath()); + } + + for (let i = 0; i < 50; i++) { + bytes.push(crypto.getRandomValues(new Uint8Array(4))); + } + + const timestamps = []; + + for (let i = 0; i < 25; i++) { + timestamps.push(randomTimestamp()); + } for (let i = 0; i < 100; i++) { const randomCount = () => { @@ -600,12 +628,6 @@ Deno.test("Storage3d.query", async (test) => { : BigInt(0); }; - const randomSubspaceId = () => { - return Math.random() > 0.5 - ? Math.floor(Math.random() * 255) - : ANY_SUBSPACE; - }; - const randomTimeRange = () => { const isOpen = Math.random() > 0.5; @@ -623,71 +645,91 @@ Deno.test("Storage3d.query", async (test) => { return { start, end }; }; - areaParams.push({ - area: { - includedSubspaceId: randomSubspaceId(), - pathPrefix: randomPath(), - timeRange: randomTimeRange(), + const orderRoll = Math.random(); + + queryParams.push({ + aoi: { + area: { + includedSubspaceId: sample(subspaces)!.subspace, + pathPrefix: randomPath(), + timeRange: randomTimeRange(), + }, + maxCount: randomCount(), + maxSize: randomSize(), }, - maxCount: randomCount(), - maxSize: randomSize(), + reverse: Math.random() < 0.25 ? true : false, + order: orderRoll < 0.33 + ? "subspace" + : orderRoll < 0.66 + ? "path" + : "timestamp", }); } // A function which returns all the areas a given spt is included by const isIncludedByAreas = ( - subspace: number, + subspace: Uint8Array, path: Path, time: bigint, - ): AreaOfInterest[] => { - const inclusiveAreas: AreaOfInterest[] = []; - - for (const aoi of areaParams) { + ): { + aoi: AreaOfInterest; + order: QueryOrder; + reverse: boolean; + }[] => { + const inclusiveParams: { + aoi: AreaOfInterest; + order: QueryOrder; + reverse: boolean; + }[] = []; + + for (const params of queryParams) { if ( - aoi.area.includedSubspaceId !== ANY_SUBSPACE && - aoi.area.includedSubspaceId !== subspace + params.aoi.area.includedSubspaceId !== ANY_SUBSPACE && + orderBytes(params.aoi.area.includedSubspaceId, subspace) !== 0 ) { continue; } if ( - isPathPrefixed(aoi.area.pathPrefix, path) === false + isPathPrefixed(params.aoi.area.pathPrefix, path) === false ) { continue; } if ( - isIncludedRange(orderTimestamp, aoi.area.timeRange, time) === false + isIncludedRange(orderTimestamp, params.aoi.area.timeRange, time) === + false ) { continue; } - inclusiveAreas.push(aoi); + inclusiveParams.push(params); } - return inclusiveAreas; + return inclusiveParams; }; - const actualResultMap = new Map< - AreaOfInterest, - Set - >(); + const actualResultMap = new Map<{ + aoi: AreaOfInterest; + order: QueryOrder; + reverse: boolean; + }, Set>(); - for (const areaOfInterest of areaParams) { - actualResultMap.set(areaOfInterest, new Set()); + for (const params of queryParams) { + actualResultMap.set(params, new Set()); } - replica.addEventListener("entryremove", (event) => { + store.addEventListener("entryremove", (event) => { const { detail: { removed } } = event as CustomEvent< { removed: Entry } >; - const encodedEntry = encodeEntry(removed, { + const encodedEntry = encodeEntry({ namespaceScheme: testSchemeNamespace, subspaceScheme: testSchemeSubspace, - pathLengthScheme: testSchemePathLength, + pathScheme: testSchemePath, payloadScheme: testSchemePayload, - }); + }, removed); testSchemePayload.fromBytes(encodedEntry).then((hash) => { const b64 = encodeBase64(hash); @@ -702,13 +744,15 @@ Deno.test("Storage3d.query", async (test) => { // Generate the entries for (let i = 0; i < 50; i++) { - const pathAndPayload = sample(bytes)!; + const path = sample(paths)!; + const payload = sample(bytes)!; + const chosenSubspace = sample(subspaces)!; const timestamp = sample(timestamps)!; - const result = await replica.set({ - path: pathAndPayload, - payload: pathAndPayload, + const result = await store.set({ + path: path, + payload: payload, subspace: chosenSubspace.subspace, timestamp, }, chosenSubspace.privateKey); @@ -718,18 +762,18 @@ Deno.test("Storage3d.query", async (test) => { } // See if it belongs to any of the test queries. - const correspondingQueries = includedByQueries( + const correspondingQueries = isIncludedByAreas( chosenSubspace.subspace, - pathAndPayload, + path, timestamp, ); - const encodedEntry = encodeEntry(result.entry, { + const encodedEntry = encodeEntry({ namespaceScheme: testSchemeNamespace, subspaceScheme: testSchemeSubspace, - pathLengthScheme: testSchemePathLength, + pathScheme: testSchemePath, payloadScheme: testSchemePayload, - }); + }, result.entry); const entryHash = await testSchemePayload.fromBytes(encodedEntry); @@ -739,6 +783,7 @@ Deno.test("Storage3d.query", async (test) => { const authTokenHash = await testSchemePayload.fromBytes( new Uint8Array(result.authToken), ); + const b64AuthHash = encodeBase64(authTokenHash); entryAuthHashMap.set(b64EntryHash, b64AuthHash); @@ -750,10 +795,10 @@ Deno.test("Storage3d.query", async (test) => { } } - for (const query of queries) { + for (const params of queryParams) { let entriesRead = 0; - const awaiting = new Set(actualResultMap.get(query)); + const awaiting = new Set(actualResultMap.get(params)); const prevIsCorrectOrder = ( prev: Entry, @@ -762,48 +807,48 @@ Deno.test("Storage3d.query", async (test) => { ): boolean => { switch (ord) { case "path": { - const order = compareBytes( - prev.identifier.path, - curr.identifier.path, + const order = orderPath( + prev.path, + curr.path, ); if (order === 0) { return prevIsCorrectOrder(prev, curr, "timestamp"); } - if (query.reverse) { + if (params.reverse) { return order === 1; } return order === -1; } case "timestamp": { - const order = Products.orderTimestamps( - prev.record.timestamp, - curr.record.timestamp, + const order = orderTimestamp( + prev.timestamp, + curr.timestamp, ); if (order === 0) { return prevIsCorrectOrder(prev, curr, "subspace"); } - if (query.reverse) { + if (params.reverse) { return order === 1; } return order === -1; } case "subspace": { - const order = compareBytes( - prev.identifier.subspace, - curr.identifier.subspace, + const order = orderBytes( + prev.subspaceId, + curr.subspaceId, ); if (order === 0) { return prevIsCorrectOrder(prev, curr, "path"); } - if (query.reverse) { + if (params.reverse) { return order === 1; } @@ -815,14 +860,18 @@ Deno.test("Storage3d.query", async (test) => { let prevEntry: Entry | undefined; for await ( - const { entry, authTokenHash } of storage.entriesByQuery(query) + const { entry, authTokenHash } of storage.query( + params.aoi, + params.order, + params.reverse, + ) ) { - const encodedEntry = encodeEntry(entry, { + const encodedEntry = encodeEntry({ namespaceScheme: testSchemeNamespace, subspaceScheme: testSchemeSubspace, - pathLengthScheme: testSchemePathLength, + pathScheme: testSchemePath, payloadScheme: testSchemePayload, - }); + }, entry); const entryHash = await testSchemePayload.fromBytes(encodedEntry); @@ -835,27 +884,30 @@ Deno.test("Storage3d.query", async (test) => { // Test order if (prevEntry) { - assert(prevIsCorrectOrder(prevEntry, entry, query.order)); + assert(prevIsCorrectOrder(prevEntry, entry, params.order)); } - assert(actualResultMap.get(query)?.has(b64EntryHash)); + assert(actualResultMap.get(params)?.has(b64EntryHash)); entriesRead += 1; prevEntry = entry; awaiting.delete(b64EntryHash); - if (query.limit && entriesRead > query.limit) { + if (params.aoi.maxCount && entriesRead > params.aoi.maxCount) { assert(false, "Too many entries received for query"); } } - if (query.limit) { - assertEquals( - entriesRead, - Math.min(query.limit, actualResultMap.get(query)!.size), - ); - } else { - assertEquals(entriesRead, actualResultMap.get(query)!.size); + if (params.aoi.maxCount !== 0) { + assert(entriesRead <= params.aoi.maxCount); + } + + if (params.aoi.maxSize !== BigInt(0)) { + assert(entriesRead * 4 <= params.aoi.maxSize); + } + + if (params.aoi.maxCount === 0 && params.aoi.maxSize === BigInt(0)) { + assertEquals(entriesRead, actualResultMap.get(params)!.size); } } }); @@ -863,11 +915,3 @@ Deno.test("Storage3d.query", async (test) => { await dispose(); } }); - -*/ - -function orderNumbers(a: number, b: number) { - if (a < b) return -1; - if (a > b) return 1; - return 0; -} diff --git a/src/store/storage/storage_3d/triple_storage.ts b/src/store/storage/storage_3d/triple_storage.ts new file mode 100644 index 0000000..7a69601 --- /dev/null +++ b/src/store/storage/storage_3d/triple_storage.ts @@ -0,0 +1,995 @@ +import { + ANY_SUBSPACE, + Area, + AreaOfInterest, + bigintToBytes, + concat, + EncodingScheme, + Entry, + isIncludedRange, + isPathPrefixed, + OPEN_END, + orderTimestamp, + Path, + PathScheme, + successorPrefix, +} from "../../../../deps.ts"; +import { + FingerprintScheme, + PayloadScheme, + QueryOrder, + SubspaceScheme, +} from "../../types.ts"; +import { LiftingMonoid } from "../summarisable_storage/lifting_monoid.ts"; +import { SummarisableStorage } from "../summarisable_storage/types.ts"; +import { Storage3d } from "./types.ts"; + +export type TripleStorageOpts< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint, +> = { + namespace: NamespaceId; + /** Creates a {@link SummarisableStorage} with a given ID, used for storing entries and their data. */ + createSummarisableStorage: ( + monoid: LiftingMonoid, + id: string, + ) => SummarisableStorage; + subspaceScheme: SubspaceScheme; + payloadScheme: PayloadScheme; + pathScheme: PathScheme; + fingerprintScheme: FingerprintScheme< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint + >; +}; + +export class TripleStorage< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint, +> implements + Storage3d< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint + > { + private namespace: NamespaceId; + + private ptsStorage: SummarisableStorage; + private sptStorage: SummarisableStorage; + private tspStorage: SummarisableStorage; + private subspaceScheme: SubspaceScheme; + private payloadScheme: PayloadScheme; + private fingerprintScheme: FingerprintScheme< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint + >; + + constructor( + opts: TripleStorageOpts< + NamespaceId, + SubspaceId, + PayloadDigest, + Fingerprint + >, + ) { + this.namespace = opts.namespace; + + const lift = ( + key: Uint8Array, + value: Uint8Array, + order: "path" | "subspace" | "timestamp", + ) => { + const values = decodeKvValue( + value, + this.payloadScheme, + ); + + // Decode the key. + const { subspace, timestamp, path } = decodeEntryKey( + key, + order, + this.subspaceScheme, + ); + + const entry: Entry = { + namespaceId: this.namespace, + subspaceId: subspace, + path, + timestamp, + payloadDigest: values.payloadHash, + payloadLength: values.payloadLength, + }; + + return opts.fingerprintScheme.fingerprintSingleton(entry); + }; + + this.ptsStorage = opts.createSummarisableStorage({ + lift: (key, value) => lift(key, value, "path"), + combine: opts.fingerprintScheme.fingerprintCombine, + neutral: opts.fingerprintScheme.neutral, + }, "pts"); + this.sptStorage = opts.createSummarisableStorage({ + lift: (key, value) => lift(key, value, "subspace"), + combine: opts.fingerprintScheme.fingerprintCombine, + neutral: opts.fingerprintScheme.neutral, + }, "spt"); + this.tspStorage = opts.createSummarisableStorage({ + lift: (key, value) => lift(key, value, "timestamp"), + combine: opts.fingerprintScheme.fingerprintCombine, + neutral: opts.fingerprintScheme.neutral, + }, "tsp"); + + this.subspaceScheme = opts.subspaceScheme; + this.payloadScheme = opts.payloadScheme; + + this.fingerprintScheme = opts.fingerprintScheme; + } + + async get( + subspace: SubspaceId, + path: Path, + ): Promise< + { + entry: Entry; + authTokenHash: PayloadDigest; + } | undefined + > { + const firstResult = this.query({ + area: { + includedSubspaceId: subspace, + pathPrefix: path, + timeRange: { + start: BigInt(0), + end: OPEN_END, + }, + }, + maxCount: 1, + maxSize: BigInt(0), + }, "subspace"); + + for await (const result of firstResult) { + return result; + } + } + + async insert( + { path, subspace, payloadDigest, timestamp, length, authTokenDigest }: { + path: Path; + subspace: SubspaceId; + payloadDigest: PayloadDigest; + timestamp: bigint; + length: bigint; + authTokenDigest: PayloadDigest; + }, + ): Promise { + const keys = encodeEntryKeys( + { + path, + timestamp, + subspace, + subspaceEncoding: this.subspaceScheme, + }, + ); + + const toStore = encodeKvValue( + { + payloadDigest, + payloadLength: length, + authTokenDigest: authTokenDigest, + payloadScheme: this.payloadScheme, + }, + ); + + await Promise.all([ + this.ptsStorage.insert(keys.pts, toStore), + this.sptStorage.insert(keys.spt, toStore), + this.tspStorage.insert(keys.tsp, toStore), + ]); + } + + async remove( + entry: Entry, + ): Promise { + const keys = encodeEntryKeys( + { + path: entry.path, + timestamp: entry.timestamp, + subspace: entry.subspaceId, + subspaceEncoding: this.subspaceScheme, + }, + ); + + const results = await Promise.all([ + this.ptsStorage.remove(keys.pts), + this.tspStorage.remove(keys.tsp), + this.sptStorage.remove(keys.spt), + ]); + + return results[0]; + } + + async summarise( + areaOfInterest: AreaOfInterest, + ): Promise<{ fingerprint: Fingerprint; size: number }> { + let fingerprint = this.fingerprintScheme.neutral; + /** The size of the fingerprint. */ + let size = 0; + + let countUsed = 0; + let sizeUsed = BigInt(0); + + // Iterate through all the entries of each range. + const subspaceEntriesLowerBound = + areaOfInterest.area.includedSubspaceId === ANY_SUBSPACE + ? undefined + : areaOfInterest.area.includedSubspaceId; + const subspaceEntriesUpperBound = + areaOfInterest.area.includedSubspaceId === ANY_SUBSPACE + ? undefined + : this.subspaceScheme.successor(areaOfInterest.area.includedSubspaceId); + + const subspaceEntries = this.sptStorage.entries( + subspaceEntriesLowerBound + ? this.subspaceScheme.encode(subspaceEntriesLowerBound) + : undefined, + subspaceEntriesUpperBound + ? this.subspaceScheme.encode(subspaceEntriesUpperBound) + : undefined, + { + reverse: true, + }, + ); + + /** The least excluded item we've run into. + * This is going to be the upper bound of a summarise op we run when we detect a contiguous range of included entries. + */ + let leastExcluded = this.subspaceScheme.encode( + subspaceEntriesUpperBound + ? subspaceEntriesUpperBound + : this.subspaceScheme.minimalSubspaceId, + ); + + /** The least included item we've run into. + * This is going to be the lower bound of a summarise op we run when we detect a contiguous range of included entries. + */ + let leastIncluded: Uint8Array | undefined; + + /** Run this when we detect a contiguous range of included entries. */ + const updateFingerprint = async (start: Uint8Array) => { + const { fingerprint: includedFp, size: includedSize } = await this + .sptStorage.summarise( + start, + leastExcluded, + ); + + fingerprint = this.fingerprintScheme.fingerprintCombine( + fingerprint, + includedFp, + ); + + size += includedSize; + + // Prevent this from running again until we run into another included entry. + leastIncluded = undefined; + }; + + for await (const subspaceEntry of subspaceEntries) { + // Decode the key. + const values = decodeKvValue( + subspaceEntry.value, + this.payloadScheme, + ); + + // Decode the key. + const { timestamp, path } = decodeEntryKey( + subspaceEntry.key, + "subspace", + this.subspaceScheme, + ); + + // Check that decoded time and subspace are included by both other dimensions + let pathIncluded = false; + + if (isPathPrefixed(areaOfInterest.area.pathPrefix, path)) { + pathIncluded = true; + } + + // If it's not included, and we ran into an included item earlier, + // that indicates the end of a contiguous range. + // Recalculate the fingerprint! + if (!pathIncluded) { + if (leastIncluded) { + await updateFingerprint(leastIncluded); + } + + // This entry is now the least excluded entry we've run into. + leastExcluded = subspaceEntry.key; + continue; + } + + let timeIncluded = false; + + if ( + isIncludedRange( + orderTimestamp, + areaOfInterest.area.timeRange, + timestamp, + ) + ) { + timeIncluded = true; + } + + // If it's not included, and we ran into an included item earlier, + // that indicates the end of a contiguous range. + // Recalculate the fingerprint! + if (!timeIncluded) { + if (leastIncluded) { + await updateFingerprint(leastIncluded); + } + + // This entry is now the least excluded entry we've run into. + leastExcluded = subspaceEntry.key; + continue; + } + + // Now we know this entry is included. + + // Check all dimension count and size limits. + // If any limits have been exceeded, we have to stop here. + + // Boring. + + const nextCountUsed = countUsed + 1; + const nextSizeUsed = sizeUsed + values.payloadLength; + + if ( + (areaOfInterest.maxCount !== 0 && + nextCountUsed > areaOfInterest.maxCount) || + (areaOfInterest.maxSize !== BigInt(0) && + nextSizeUsed > areaOfInterest.maxSize) + ) { + break; + } + + countUsed = nextCountUsed; + sizeUsed = nextSizeUsed; + + // This entry is part of a contiguous range of included entries, + // and it's the least included key we've encountered so far. + leastIncluded = subspaceEntry.key; + } + + // Calculate a range that was left over, if any. + if (leastIncluded) { + await updateFingerprint(leastIncluded); + } + + return { + fingerprint, + size, + }; + } + + async *query( + areaOfInterest: AreaOfInterest, + order: QueryOrder, + reverse = false, + ): AsyncIterable<{ + entry: Entry; + authTokenHash: PayloadDigest; + }> { + const storage = order === "subspace" + ? this.sptStorage + : order === "path" + ? this.ptsStorage + : this.tspStorage; + + const { lowerBound, upperBound } = order === "subspace" + ? this.createSptBounds(areaOfInterest.area) + : order === "path" + ? this.createPtsBounds(areaOfInterest.area) + : this.createTspBounds(areaOfInterest.area); + + let entriesYielded = 0; + let payloadBytesYielded = BigInt(0); + + const iterator = storage.entries(lowerBound, upperBound, { + reverse, + }); + + for await (const { key, value } of iterator) { + const values = decodeKvValue( + value, + this.payloadScheme, + ); + + // Decode the key. + const { subspace, timestamp, path } = decodeEntryKey( + key, + order, + this.subspaceScheme, + ); + + if ( + areaOfInterest.area.includedSubspaceId !== ANY_SUBSPACE + ) { + const isSubspace = this.subspaceScheme.order( + subspace, + areaOfInterest.area.includedSubspaceId, + ) === 0; + + if (!isSubspace) { + continue; + } + } + + const isOpenTime = areaOfInterest.area.timeRange.start === BigInt(0) && + areaOfInterest.area.timeRange.end === OPEN_END; + + if ( + !isOpenTime + ) { + const isTimeIncluded = isIncludedRange( + orderTimestamp, + areaOfInterest.area.timeRange, + timestamp, + ); + + if (!isTimeIncluded) { + continue; + } + } + + if ( + areaOfInterest.area.pathPrefix.length !== 0 + ) { + const isPathIncluded = isPathPrefixed( + areaOfInterest.area.pathPrefix, + path, + ); + + if (!isPathIncluded) { + continue; + } + } + + entriesYielded += 1; + payloadBytesYielded += values.payloadLength; + + if ( + areaOfInterest.maxSize !== BigInt(0) && + payloadBytesYielded >= areaOfInterest.maxSize + ) { + break; + } + + yield { + entry: { + namespaceId: this.namespace, + subspaceId: subspace, + path, + payloadDigest: values.payloadHash, + payloadLength: values.payloadLength, + timestamp, + }, + authTokenHash: values.authTokenHash, + }; + + if ( + areaOfInterest.maxCount !== 0 && + entriesYielded >= areaOfInterest.maxCount + ) { + break; + } + } + } + + private encodeKey( + doNotEscapeIdx: number, + ...parts: Uint8Array[] + ): Uint8Array { + const toConcat = []; + + for (let i = 0; i < parts.length; i++) { + if (i === doNotEscapeIdx) { + toConcat.push(parts[i], new Uint8Array([0, 0])); + continue; + } + + const escapedBytes = escapeBytes(parts[i]); + + toConcat.push(escapedBytes, new Uint8Array([0, 0])); + } + + return concat(...toConcat); + } + + private createSptBounds( + area: Area, + ): { + lowerBound: Uint8Array | undefined; + upperBound: Uint8Array | undefined; + } { + if (area.includedSubspaceId === ANY_SUBSPACE) { + return { lowerBound: undefined, upperBound: undefined }; + } + + const encodedSubspace = this.subspaceScheme.encode(area.includedSubspaceId); + + const successorSubspace = this.subspaceScheme.successor( + area.includedSubspaceId, + ); + + if (!successorSubspace) { + return { + lowerBound: this.encodeKey(1, encodedSubspace), + upperBound: undefined, + }; + } + + const encodeddSubspaceSuccessor = this.subspaceScheme.encode( + successorSubspace, + ); + + if (area.pathPrefix.length === 0) { + return { + lowerBound: this.encodeKey(1, encodedSubspace), + upperBound: this.encodeKey(1, encodeddSubspaceSuccessor), + }; + } + + const encodedAreaPrefix = encodePathWithSeparators(area.pathPrefix); + + const areaPrefixSuccessor = successorPrefix(area.pathPrefix); + + if (!areaPrefixSuccessor) { + return { + lowerBound: this.encodeKey(1, encodedSubspace, encodedAreaPrefix), + upperBound: this.encodeKey(1, encodeddSubspaceSuccessor), + }; + } + + const encodedTime = bigintToBytes(area.timeRange.start); + + const encodedAreaPrefixSuccessor = encodePathWithSeparators( + areaPrefixSuccessor, + ); + + if (area.timeRange.end === OPEN_END) { + return { + lowerBound: this.encodeKey( + 1, + encodedSubspace, + encodedAreaPrefix, + encodedTime, + ), + upperBound: this.encodeKey( + 1, + encodedSubspace, + encodedAreaPrefixSuccessor, + ), + }; + } + + const encodedTimeEnd = bigintToBytes(area.timeRange.end); + + return { + lowerBound: this.encodeKey( + 1, + encodedSubspace, + encodedAreaPrefix, + encodedTime, + ), + upperBound: this.encodeKey( + 1, + encodeddSubspaceSuccessor, + encodedAreaPrefixSuccessor, + encodedTimeEnd, + ), + }; + } + + private createPtsBounds( + area: Area, + ): { + lowerBound: Uint8Array | undefined; + upperBound: Uint8Array | undefined; + } { + if (area.pathPrefix.length === 0) { + return { + lowerBound: undefined, + upperBound: undefined, + }; + } + + const encodedPrefix = encodePathWithSeparators(area.pathPrefix); + + const areaPrefixSuccessor = successorPrefix(area.pathPrefix); + const encodedTimeStart = bigintToBytes(area.timeRange.start); + + if (!areaPrefixSuccessor) { + return { + lowerBound: this.encodeKey(0, encodedPrefix, encodedTimeStart), + upperBound: undefined, + }; + } + + const encodedPrefixSuccessor = encodePathWithSeparators( + areaPrefixSuccessor, + ); + + if (area.timeRange.end === OPEN_END) { + return { + lowerBound: this.encodeKey(0, encodedPrefix, encodedTimeStart), + upperBound: encodedPrefixSuccessor, + }; + } + + if (area.includedSubspaceId === ANY_SUBSPACE) { + return { + lowerBound: this.encodeKey(0, encodedPrefix, encodedTimeStart), + upperBound: encodedPrefixSuccessor, + }; + } + + const encodedSubspace = this.subspaceScheme.encode(area.includedSubspaceId); + + return { + lowerBound: this.encodeKey( + 0, + encodedPrefix, + encodedTimeStart, + encodedSubspace, + ), + upperBound: this.encodeKey(0, encodedPrefixSuccessor), + }; + } + + private createTspBounds( + area: Area, + ): { + lowerBound: Uint8Array | undefined; + upperBound: Uint8Array | undefined; + } { + if (area.timeRange.start === BigInt(0)) { + return { + lowerBound: undefined, + upperBound: undefined, + }; + } + + const encodedTimeStart = bigintToBytes(area.timeRange.start); + + if (area.timeRange.end === OPEN_END) { + return { + lowerBound: this.encodeKey(2, encodedTimeStart), + upperBound: undefined, + }; + } + + const encodedTimeEnd = bigintToBytes(area.timeRange.end); + + if (area.includedSubspaceId === ANY_SUBSPACE) { + return { + lowerBound: this.encodeKey(2, encodedTimeStart), + upperBound: this.encodeKey(2, encodedTimeEnd), + }; + } + + const encodedSubspace = this.subspaceScheme.encode(area.includedSubspaceId); + + const encodedPathPrefix = encodePathWithSeparators(area.pathPrefix); + + return { + lowerBound: this.encodeKey( + 2, + encodedTimeStart, + encodedSubspace, + encodedPathPrefix, + ), + upperBound: this.encodeKey(2, encodedTimeEnd), + }; + } +} + +/** Escape all 0 bytes to 0x02, and encode separators as 0x01. + * + * This is all so the keys triple storage uses (which are concatenations of subspace / path / time [in differing orders] are ordered in KV the same way as we would outside the KV. + */ +export function encodePathWithSeparators(path: Path): Uint8Array { + const encodedComponents: Uint8Array[] = []; + + for (const component of path) { + const bytes: number[] = []; + + for (const byte of component) { + if (byte !== 0) { + bytes.push(byte); + continue; + } + + bytes.push(0, 2); + } + + bytes.push(0, 1); + const encodedComponent = new Uint8Array(bytes); + encodedComponents.push(encodedComponent); + } + + return concat(...encodedComponents); +} + +/** Decodes an escaped encoded path. */ +export function decodePathWithSeparators( + encoded: Uint8Array, +): Path { + const path: Path = []; + + let currentComponentBytes = []; + let previousWasZero = false; + + for (const byte of encoded) { + if (previousWasZero && byte === 1) { + // Separator + previousWasZero = false; + + const component = new Uint8Array(currentComponentBytes); + + path.push(component); + + currentComponentBytes = []; + + continue; + } + + if (previousWasZero && byte === 2) { + // Encoded zero. + currentComponentBytes.push(0); + previousWasZero = false; + continue; + } + + if (byte === 0) { + previousWasZero = true; + continue; + } + + currentComponentBytes.push(byte); + previousWasZero = false; + } + + return path; +} + +/** Escape all 0 bytes as 0x01. */ +export function escapeBytes(bytes: Uint8Array): Uint8Array { + const newBytes: number[] = []; + + for (const byte of bytes) { + if (byte !== 0) { + newBytes.push(byte); + continue; + } + + newBytes.push(0, 1); + } + + return new Uint8Array(newBytes); +} + +/** Unescape all 0x01 back to 0x0. */ +export function unescapeBytes(escaped: Uint8Array): Uint8Array { + let previousWasZero = false; + + const escapedBytes = []; + + for (const byte of escaped) { + if (previousWasZero && byte === 1) { + escapedBytes.push(0); + previousWasZero = false; + continue; + } + + if (byte === 0) { + previousWasZero = true; + continue; + } + + previousWasZero = false; + escapedBytes.push(byte); + } + + return new Uint8Array(escapedBytes); +} + +/** Join all the parts of a key (which are presumed to be escaped and contain no 0x0 bytes) with a 0x00 separator between them. */ +export function joinKey( + ...parts: Uint8Array[] +): Uint8Array { + const newParts = []; + + for (const part of parts) { + newParts.push(part, new Uint8Array([0, 0])); + } + + return concat(...newParts); +} + +/** Split all the semantic parts of a key (subspace, path, time) out of a key. Returns them in the same order as they are in the key. */ +export function splitKey( + key: Uint8Array, +): [Uint8Array, Uint8Array, Uint8Array] { + const parts = []; + + let previousWasZero = false; + + let currentPartBytes: number[] = []; + + for (const byte of key) { + if (previousWasZero && byte === 0) { + parts.push(new Uint8Array(currentPartBytes)); + currentPartBytes = []; + previousWasZero = false; + continue; + } + + if (!previousWasZero && byte === 0) { + previousWasZero = true; + continue; + } + + if (previousWasZero && byte !== 0) { + currentPartBytes.push(0); + } + + previousWasZero = false; + + currentPartBytes.push(byte); + } + + return [parts[0], parts[1], parts[2]]; +} + +/** Encodes the subspace, path, and time of an entry into three keys for three respective orderings. + */ +export function encodeEntryKeys( + opts: { + path: Path; + timestamp: bigint; + subspace: SubspacePublicKey; + subspaceEncoding: EncodingScheme; + }, +): { + spt: Uint8Array; + pts: Uint8Array; + tsp: Uint8Array; +} { + const escapedSubspace = escapeBytes( + opts.subspaceEncoding.encode(opts.subspace), + ); + const escapedPath = encodePathWithSeparators(opts.path); + const escapedTime = escapeBytes(bigintToBytes(opts.timestamp)); + + const sptBytes = joinKey(escapedSubspace, escapedPath, escapedTime); + const ptsBytes = joinKey(escapedPath, escapedTime, escapedSubspace); + const tspBytes = joinKey(escapedTime, escapedSubspace, escapedPath); + + return { + spt: sptBytes, + pts: ptsBytes, + tsp: tspBytes, + }; +} + +/** Decodes a key back into subspace, path, and timestamp. */ +export function decodeEntryKey( + encoded: Uint8Array, + order: "subspace" | "path" | "timestamp", + subspaceEncoding: EncodingScheme, +): { + subspace: SubspacePublicKey; + path: Path; + timestamp: bigint; +} { + let subspace: SubspacePublicKey; + let timestamp: bigint; + let path: Path; + + const [fst, snd, thd] = splitKey(encoded); + + switch (order) { + case "subspace": { + subspace = subspaceEncoding.decode(unescapeBytes(fst)); + + path = decodePathWithSeparators(snd); + + const dataView = new DataView(unescapeBytes(thd).buffer); + timestamp = dataView.getBigUint64(0); + + break; + } + case "path": { + path = decodePathWithSeparators(fst); + + const dataView = new DataView(unescapeBytes(snd).buffer); + timestamp = dataView.getBigUint64(0); + + subspace = subspaceEncoding.decode(unescapeBytes(thd)); + + break; + } + case "timestamp": { + const dataView = new DataView(unescapeBytes(fst).buffer); + timestamp = dataView.getBigUint64(0); + + subspace = subspaceEncoding.decode(unescapeBytes(snd)); + + path = decodePathWithSeparators(thd); + } + } + + return { + subspace, + path, + timestamp, + }; +} + +/** Encodes some values associated with an entry to a single value to be stored in KV. */ +export function encodeKvValue( + { + authTokenDigest, + payloadDigest, + payloadLength, + payloadScheme, + }: { + authTokenDigest: PayloadDigest; + payloadDigest: PayloadDigest; + payloadLength: bigint; + payloadScheme: PayloadScheme; + }, +): Uint8Array { + return concat( + bigintToBytes(payloadLength), + payloadScheme.encode(payloadDigest), + payloadScheme.encode(authTokenDigest), + ); +} + +export function decodeKvValue( + encoded: Uint8Array, + payloadEncoding: EncodingScheme, +): { + payloadLength: bigint; + payloadHash: PayloadDigest; + authTokenHash: PayloadDigest; +} { + const dataView = new DataView(encoded.buffer); + + const payloadLength = dataView.getBigUint64(0); + + const payloadHash = payloadEncoding.decode( + encoded.subarray(8), + ); + + const payloadHashLength = payloadEncoding.encodedLength(payloadHash); + + const authTokenHash = payloadEncoding.decode( + encoded.subarray(8 + payloadHashLength), + ); + + return { + payloadLength, + payloadHash, + authTokenHash, + }; +} diff --git a/src/replica/storage/storage_3d/types.ts b/src/store/storage/storage_3d/types.ts similarity index 69% rename from src/replica/storage/storage_3d/types.ts rename to src/store/storage/storage_3d/types.ts index 1de4541..db037b7 100644 --- a/src/replica/storage/storage_3d/types.ts +++ b/src/store/storage/storage_3d/types.ts @@ -2,25 +2,25 @@ import { AreaOfInterest, Entry, Path } from "../../../../deps.ts"; import { QueryOrder } from "../../types.ts"; export interface Storage3d< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > { /** Retrieve a value */ get( - subspace: SubspaceKey, + subspace: SubspaceId, path: Path, ): Promise< { - entry: Entry; + entry: Entry; authTokenHash: PayloadDigest; } | undefined >; insert(opts: { path: Path; - subspace: SubspaceKey; + subspace: SubspaceId; payloadDigest: PayloadDigest; timestamp: bigint; length: bigint; @@ -28,23 +28,23 @@ export interface Storage3d< }): Promise; remove( - entry: Entry, + entry: Entry, ): Promise; // Used during sync summarise( - areaOfInterest: AreaOfInterest, + areaOfInterest: AreaOfInterest, ): Promise<{ fingerprint: Fingerprint; size: number }>; // Used to fetch entries for transfer during sync. // All three dimensions are defined query( - areaOfInterest: AreaOfInterest, + areaOfInterest: AreaOfInterest, order: QueryOrder, reverse?: boolean, ): AsyncIterable< { - entry: Entry; + entry: Entry; authTokenHash: PayloadDigest; } >; diff --git a/src/replica/storage/summarisable_storage/lifting_monoid.ts b/src/store/storage/summarisable_storage/lifting_monoid.ts similarity index 100% rename from src/replica/storage/summarisable_storage/lifting_monoid.ts rename to src/store/storage/summarisable_storage/lifting_monoid.ts diff --git a/src/replica/storage/summarisable_storage/monoid_rbtree.ts b/src/store/storage/summarisable_storage/monoid_rbtree.ts similarity index 100% rename from src/replica/storage/summarisable_storage/monoid_rbtree.ts rename to src/store/storage/summarisable_storage/monoid_rbtree.ts diff --git a/src/replica/storage/summarisable_storage/monoid_skiplist.ts b/src/store/storage/summarisable_storage/monoid_skiplist.ts similarity index 100% rename from src/replica/storage/summarisable_storage/monoid_skiplist.ts rename to src/store/storage/summarisable_storage/monoid_skiplist.ts diff --git a/src/replica/storage/summarisable_storage/simple_kv.ts b/src/store/storage/summarisable_storage/simple_kv.ts similarity index 100% rename from src/replica/storage/summarisable_storage/simple_kv.ts rename to src/store/storage/summarisable_storage/simple_kv.ts diff --git a/src/replica/storage/summarisable_storage/summarisable_storage.bench.ts b/src/store/storage/summarisable_storage/summarisable_storage.bench.ts similarity index 100% rename from src/replica/storage/summarisable_storage/summarisable_storage.bench.ts rename to src/store/storage/summarisable_storage/summarisable_storage.bench.ts diff --git a/src/replica/storage/summarisable_storage/summarisable_storage.test.ts b/src/store/storage/summarisable_storage/summarisable_storage.test.ts similarity index 100% rename from src/replica/storage/summarisable_storage/summarisable_storage.test.ts rename to src/store/storage/summarisable_storage/summarisable_storage.test.ts diff --git a/src/replica/storage/summarisable_storage/types.ts b/src/store/storage/summarisable_storage/types.ts similarity index 100% rename from src/replica/storage/summarisable_storage/types.ts rename to src/store/storage/summarisable_storage/types.ts diff --git a/src/replica/storage/types.ts b/src/store/storage/types.ts similarity index 77% rename from src/replica/storage/types.ts rename to src/store/storage/types.ts index 88711c0..dcdfd5a 100644 --- a/src/replica/storage/types.ts +++ b/src/store/storage/types.ts @@ -6,47 +6,47 @@ import { PrefixIterator } from "./prefix_iterators/types.ts"; import { Storage3d } from "./storage_3d/types.ts"; export interface WriteAheadFlag< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, > { wasInserting(): Promise< { - entry: Entry; + entry: Entry; authTokenHash: PayloadDigest; } | undefined >; wasRemoving(): Promise< - Entry | undefined + Entry | undefined >; flagInsertion( - entry: Entry, + entry: Entry, authTokenHash: PayloadDigest, ): Promise; unflagInsertion: () => Promise; flagRemoval( - entry: Entry, + entry: Entry, ): Promise; unflagRemoval(): Promise; } -/** Provides methods for storing and retrieving entries for a {@link Replica}. */ +/** Provides methods for storing and retrieving entries for a {@link Store}. */ export interface EntryDriver< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > { - makeStorage: (namespace: NamespaceKey) => Storage3d< - NamespaceKey, - SubspaceKey, + makeStorage: (namespace: NamespaceId) => Storage3d< + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; - /** Helps a Replica recover from unexpected shutdowns mid-write. */ + /** Helps a Store recover from unexpected shutdowns mid-write. */ writeAheadFlag: WriteAheadFlag< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest >; /** Used to find paths that are prefixes of, or prefixed by, another path. */ diff --git a/src/replica/replica.test.ts b/src/store/store.test.ts similarity index 77% rename from src/replica/replica.test.ts rename to src/store/store.test.ts index 79f6109..4e87ab2 100644 --- a/src/replica/replica.test.ts +++ b/src/store/store.test.ts @@ -2,9 +2,8 @@ import { assert, assertEquals, } from "https://deno.land/std@0.177.0/testing/asserts.ts"; -import { Replica } from "./replica.ts"; +import { Store } from "./store.ts"; import { crypto } from "https://deno.land/std@0.188.0/crypto/crypto.ts"; -import { encodeEntryKeys, encodeSummarisableStorageValue } from "./util.ts"; import { testSchemeAuthorisation, testSchemeFingerprint, @@ -16,7 +15,7 @@ import { import { makeSubspaceKeypair } from "../test/crypto.ts"; import { fullArea, orderBytes, orderPath } from "../../deps.ts"; -class TestReplica extends Replica< +class TestStore extends Store< Uint8Array, Uint8Array, ArrayBuffer, @@ -54,15 +53,15 @@ class TestReplica extends Replica< // Namespace length must equal protocol parameter pub key length -Deno.test("Replica.set", async (test) => { +Deno.test("Store.set", async (test) => { const authorKeypair = await makeSubspaceKeypair(); const author2Keypair = await makeSubspaceKeypair(); await test.step("Fails with invalid ingestions", async () => { - const replica = new TestReplica(); + const store = new TestStore(); // Returns an error and does not ingest payload if the entry is invalid - const badKeypairRes = await replica.set( + const badKeypairRes = await store.set( { path: [new Uint8Array([1, 2, 3, 4])], payload: new Uint8Array([1, 1, 1, 1]), @@ -77,7 +76,7 @@ Deno.test("Replica.set", async (test) => { const entries = []; for await ( - const entry of replica.query({ + const entry of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -90,9 +89,9 @@ Deno.test("Replica.set", async (test) => { }); await test.step("Succeeds with valid ingestions", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - const goodKeypairRes = await replica.set( + const goodKeypairRes = await store.set( { path: [new Uint8Array([1, 2, 3, 4])], payload: new Uint8Array([1, 1, 1, 1]), @@ -106,7 +105,7 @@ Deno.test("Replica.set", async (test) => { const entries = []; for await ( - const entry of replica.query({ + const entry of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -120,9 +119,9 @@ Deno.test("Replica.set", async (test) => { }); await test.step("If a timestamp is set, it is used", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - const res = await replica.set( + const res = await store.set( { path: [new Uint8Array([1, 2, 3, 4])], payload: new Uint8Array([1, 1, 1, 1]), @@ -137,11 +136,11 @@ Deno.test("Replica.set", async (test) => { }); await test.step("If no timestamp is set, and there is nothing else at the same path, use the current time.", async () => { - const replica = new TestReplica(); + const store = new TestStore(); const timestampBefore = BigInt(Date.now() * 1000); - const res = await replica.set( + const res = await store.set( { path: [new Uint8Array([1, 2, 3, 4])], payload: new Uint8Array([1, 1, 1, 1]), @@ -159,16 +158,16 @@ Deno.test("Replica.set", async (test) => { // ================================== // ingestEntry -Deno.test("Replica.ingestEntry", async (test) => { +Deno.test("Store.ingestEntry", async (test) => { const authorKeypair = await makeSubspaceKeypair(); const author2Keypair = await makeSubspaceKeypair(); // rejects stuff from a different namespace await test.step("Rejects entries from a different namespace", async () => { - const otherReplica = new TestReplica(new Uint8Array([9, 9, 9, 9])); - const replica = new TestReplica(); + const otherStore = new TestStore(new Uint8Array([9, 9, 9, 9])); + const store = new TestStore(); - const otherReplicaRes = await otherReplica.set( + const otherStoreRes = await otherStore.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array(), @@ -177,11 +176,11 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - assert(otherReplicaRes.kind === "success"); + assert(otherStoreRes.kind === "success"); - const ingestRes = await replica.ingestEntry( - otherReplicaRes.entry, - otherReplicaRes.authToken, + const ingestRes = await store.ingestEntry( + otherStoreRes.entry, + otherStoreRes.authToken, ); assert(ingestRes.kind === "failure"); @@ -189,10 +188,10 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("Rejects entries with bad signatures", async () => { - const otherReplica = new TestReplica(); - const replica = new TestReplica(); + const otherStore = new TestStore(); + const store = new TestStore(); - const otherReplicaRes = await otherReplica.set( + const otherStoreRes = await otherStore.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array(), @@ -201,10 +200,10 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - assert(otherReplicaRes.kind === "success"); + assert(otherStoreRes.kind === "success"); - const badAuthorSigRes = await replica.ingestEntry( - otherReplicaRes.entry, + const badAuthorSigRes = await store.ingestEntry( + otherStoreRes.entry, new Uint8Array([1, 2, 3]).buffer, ); @@ -214,9 +213,9 @@ Deno.test("Replica.ingestEntry", async (test) => { // no ops entries for which there are newer entries with paths that are prefixes of that entry await test.step("Does not ingest entries for which there are new entries with paths which are a prefix", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 1]), @@ -226,7 +225,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const secondRes = await replica.set( + const secondRes = await store.set( { path: [new Uint8Array([0, 0, 0, 0]), new Uint8Array([1])], payload: new Uint8Array([0, 1, 2, 3]), @@ -241,9 +240,9 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("Does not ingest entries for which there are newer entries with the same path and author", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 1]), @@ -253,7 +252,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const secondRes = await replica.set( + const secondRes = await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 3]), @@ -268,9 +267,9 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("Does not ingest entries for which there are newer entries with the same path and author and timestamp but smaller hash", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 1]), @@ -280,7 +279,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const secondRes = await replica.set( + const secondRes = await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 3]), @@ -304,9 +303,9 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("replaces older entries with same author and path", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 1]), @@ -316,7 +315,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const secondRes = await replica.set( + const secondRes = await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array([0, 1, 2, 3]), @@ -331,7 +330,7 @@ Deno.test("Replica.ingestEntry", async (test) => { const entries = []; for await ( - const entry of replica.query({ + const entry of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -347,9 +346,9 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("replaces older entries with paths prefixed by the new one", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0]), new Uint8Array([1])], payload: new Uint8Array([0, 1, 2, 1]), @@ -359,7 +358,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - await replica.set( + await store.set( { path: [new Uint8Array([0]), new Uint8Array([2])], payload: new Uint8Array([0, 1, 2, 1]), @@ -369,7 +368,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const prefixRes = await replica.set( + const prefixRes = await store.set( { path: [new Uint8Array([0])], payload: new Uint8Array([0, 1, 2, 3]), @@ -384,7 +383,7 @@ Deno.test("Replica.ingestEntry", async (test) => { const entries = []; for await ( - const entry of replica.query({ + const entry of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -401,9 +400,9 @@ Deno.test("Replica.ingestEntry", async (test) => { }); await test.step("replaces older entries with paths prefixed by the new one, EVEN when that entry was edited", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - await replica.set( + await store.set( { path: [new Uint8Array([0]), new Uint8Array([1])], payload: new Uint8Array([0, 1, 2, 1]), @@ -413,7 +412,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - await replica.set( + await store.set( { path: [new Uint8Array([0]), new Uint8Array([1])], payload: new Uint8Array([0, 1, 2, 3]), @@ -423,7 +422,7 @@ Deno.test("Replica.ingestEntry", async (test) => { authorKeypair.privateKey, ); - const prefixRes = await replica.set( + const prefixRes = await store.set( { path: [new Uint8Array([0])], payload: new Uint8Array([0, 1, 2, 3]), @@ -438,7 +437,7 @@ Deno.test("Replica.ingestEntry", async (test) => { const entries = []; for await ( - const entry of replica.query({ + const entry of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -458,13 +457,13 @@ Deno.test("Replica.ingestEntry", async (test) => { // ================================== // ingestPayload -Deno.test("Replica.ingestPayload", async (test) => { +Deno.test("Store.ingestPayload", async (test) => { const authorKeypair = await makeSubspaceKeypair(); await test.step("does not ingest payload if corresponding entry is missing", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - const res = await replica.ingestPayload({ + const res = await store.ingestPayload({ path: [new Uint8Array([0])], subspace: new Uint8Array([0]), timestamp: BigInt(0), @@ -475,14 +474,14 @@ Deno.test("Replica.ingestPayload", async (test) => { }); await test.step("does not ingest if payload is already held", async () => { - const replica = new TestReplica(); - const otherReplica = new TestReplica(); + const store = new TestStore(); + const otherStore = new TestStore(); const payload = new Uint8Array(32); crypto.getRandomValues(payload); - const res = await otherReplica.set({ + const res = await otherStore.set({ path: [new Uint8Array([0, 2])], payload, subspace: authorKeypair.subspace, @@ -490,11 +489,11 @@ Deno.test("Replica.ingestPayload", async (test) => { assert(res.kind === "success"); - const res2 = await replica.ingestEntry(res.entry, res.authToken); + const res2 = await store.ingestEntry(res.entry, res.authToken); assert(res2.kind === "success"); - const res3 = await replica.ingestPayload({ + const res3 = await store.ingestPayload({ path: res.entry.path, subspace: res.entry.subspaceId, timestamp: res.entry.timestamp, @@ -502,7 +501,7 @@ Deno.test("Replica.ingestPayload", async (test) => { assert(res3.kind === "success"); - const res4 = await replica.ingestPayload({ + const res4 = await store.ingestPayload({ path: res.entry.path, subspace: new Uint8Array(res.entry.subspaceId), timestamp: res.entry.timestamp, @@ -512,14 +511,14 @@ Deno.test("Replica.ingestPayload", async (test) => { }); await test.step("does not ingest if the hash doesn't match the entry's", async () => { - const replica = new TestReplica(); - const otherReplica = new TestReplica(); + const store = new TestStore(); + const otherStore = new TestStore(); const payload = new Uint8Array(32); crypto.getRandomValues(payload); - const res = await otherReplica.set({ + const res = await otherStore.set({ path: [new Uint8Array([0, 2])], payload, subspace: authorKeypair.subspace, @@ -527,11 +526,11 @@ Deno.test("Replica.ingestPayload", async (test) => { assert(res.kind === "success"); - const res2 = await replica.ingestEntry(res.entry, res.authToken); + const res2 = await store.ingestEntry(res.entry, res.authToken); assert(res2.kind === "success"); - const res3 = await replica.ingestPayload({ + const res3 = await store.ingestPayload({ path: res.entry.path, subspace: res.entry.subspaceId, timestamp: res.entry.timestamp, @@ -542,14 +541,14 @@ Deno.test("Replica.ingestPayload", async (test) => { }); await test.step("ingest if everything is valid", async () => { - const replica = new TestReplica(); - const otherReplica = new TestReplica(); + const store = new TestStore(); + const otherStore = new TestStore(); const payload = new Uint8Array(32); crypto.getRandomValues(payload); - const res = await otherReplica.set({ + const res = await otherStore.set({ path: [new Uint8Array([0, 2])], payload, subspace: authorKeypair.subspace, @@ -557,11 +556,11 @@ Deno.test("Replica.ingestPayload", async (test) => { assert(res.kind === "success"); - const res2 = await replica.ingestEntry(res.entry, res.authToken); + const res2 = await store.ingestEntry(res.entry, res.authToken); assert(res2.kind === "success"); - const res3 = await replica.ingestPayload({ + const res3 = await store.ingestPayload({ path: res.entry.path, subspace: res.entry.subspaceId, timestamp: res.entry.timestamp, @@ -572,7 +571,7 @@ Deno.test("Replica.ingestPayload", async (test) => { let retrievedPayload; for await ( - const [_entry, payload] of replica.query({ + const [_entry, payload] of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -597,10 +596,10 @@ Deno.test("Write-ahead flags", async (test) => { const authorKeypair = await makeSubspaceKeypair(); await test.step("Insertion flag inserts (and removes prefixes...)", async () => { - const replica = new TestReplica(); - const otherReplica = new TestReplica(); + const store = new TestStore(); + const otherStore = new TestStore(); - const res = await otherReplica.set( + const res = await otherStore.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array(32), @@ -612,35 +611,9 @@ Deno.test("Write-ahead flags", async (test) => { assert(res.kind === "success"); - // Create PTA flag. - const keys = encodeEntryKeys( - { - path: res.entry.path, - timestamp: res.entry.timestamp, - subspace: res.entry.subspaceId, - - subspaceEncoding: { - encode: (v) => v, - decode: (v) => v.subarray(0, 65), - encodedLength: () => 65, - }, - }, - ); - - // Create storage value. - const storageValue = encodeSummarisableStorageValue({ - payloadDigest: res.entry.payloadDigest, - payloadLength: res.entry.payloadLength, - authTokenDigest: new Uint8Array( - await crypto.subtle.digest("SHA-256", res.authToken), - ), - payloadScheme: testSchemePayload, - encodedPathLength: keys.encodedPathLength, - }); - // Insert - const result = await replica.set( + const result = await store.set( { path: [new Uint8Array([0, 0, 0, 0, 1])], payload: new Uint8Array(32), @@ -652,17 +625,17 @@ Deno.test("Write-ahead flags", async (test) => { assert(result.kind === "success"); - await replica.writeAheadFlag().flagInsertion( + await store.writeAheadFlag().flagInsertion( result.entry, result.authToken, ); - await replica.triggerWriteAheadFlag(); + await store.triggerWriteAheadFlag(); const entries = []; for await ( - const [entry] of replica.query({ + const [entry] of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), @@ -683,9 +656,9 @@ Deno.test("Write-ahead flags", async (test) => { }); await test.step("Removal flag removes", async () => { - const replica = new TestReplica(); + const store = new TestStore(); - const res = await replica.set( + const res = await store.set( { path: [new Uint8Array([0, 0, 0, 0])], payload: new Uint8Array(32), @@ -697,14 +670,14 @@ Deno.test("Write-ahead flags", async (test) => { assert(res.kind === "success"); - await replica.writeAheadFlag().flagRemoval(res.entry); + await store.writeAheadFlag().flagRemoval(res.entry); - await replica.triggerWriteAheadFlag(); + await store.triggerWriteAheadFlag(); const entries = []; for await ( - const [entry] of replica.query({ + const [entry] of store.query({ area: fullArea(), maxCount: 0, maxSize: BigInt(0), diff --git a/src/replica/replica.ts b/src/store/store.ts similarity index 93% rename from src/replica/replica.ts rename to src/store/store.ts index 8b0abd1..53377aa 100644 --- a/src/replica/replica.ts +++ b/src/store/store.ts @@ -7,7 +7,7 @@ import { Payload, ProtocolParameters, QueryOrder, - ReplicaOpts, + StoreOpts, } from "./types.ts"; import { PayloadDriverMemory } from "./storage/payload_drivers/memory.ts"; import { @@ -27,13 +27,13 @@ import { } from "../../deps.ts"; import { Storage3d } from "./storage/storage_3d/types.ts"; -/** A local snapshot of a namespace to be written to, queried from, and synced with other replicas. - * - * Data is stored as many {@link SignedEntry} with a corresponding {@link Payload}, which the replica may or may not possess. +/** A local set of a particular namespace's entries to be written to, read from, and synced with other `Store`s. * * Keeps data in memory unless persisted entry / payload drivers are specified. + * + * https://willowprotocol.org/specs/data-model/index.html#store */ -export class Replica< +export class Store< NamespacePublicKey, SubspacePublicKey, PayloadDigest, @@ -70,7 +70,7 @@ export class Replica< private checkedWriteAheadFlag = deferred(); constructor( - opts: ReplicaOpts< + opts: StoreOpts< NamespacePublicKey, SubspacePublicKey, PayloadDigest, @@ -139,7 +139,7 @@ export class Replica< this.checkedWriteAheadFlag.resolve(); } - /** Create a new {@link SignedEntry} for some data and store both in the replica. */ + /** Create a new authorised entry for a payload, and store both in the store. */ async set( // input: EntryInput, @@ -181,11 +181,11 @@ export class Replica< return ingestResult; } - /** Attempt to store a {@link SignedEntry} in the replica. + /** Attempt to store an authorised entry in the `Store`. * - * An entry will not be ingested if it is found to have an invalid signature; if a newer entry with the same path and author are present; or if a newer entry with a path that is a prefix of the given entry exists. + * An entry will not be ingested if it is unauthorised; if a newer entry with the same path and subspace are present; or if a newer entry with a path that is a prefix of the given entry exists. * - * Additionally, if the entry's path is a prefix of already-held older entries, those entries will be removed from the replica. + * Additionally, if the entry's path is a prefix of already-held older entries, those entries will be removed from the `Store`. */ async ingestEntry( entry: Entry, @@ -214,7 +214,7 @@ export class Replica< return { kind: "failure", reason: "invalid_entry", - message: "Entry's namespace did not match replica's namespace.", + message: "Entry's namespace did not match store's namespace.", err: null, }; } @@ -457,9 +457,9 @@ export class Replica< await this.entryDriver.writeAheadFlag.unflagInsertion(); } - /** Attempt to store the corresponding payload for one of the replica's entries. + /** Attempt to store the corresponding payload for one of the store's entries. * - * A payload will not be ingested if the given entry is not stored in the replica; if the hash of the payload does not match the entry's; or if it is already held. + * A payload will not be ingested if the given entry is not stored in the store; if the hash of the payload does not match the entry's; or if it is already held. */ async ingestPayload( entryDetails: { @@ -519,7 +519,7 @@ export class Replica< }; } - /** Retrieve a list of entry-payload pairs from the replica for a given {@link Query}. */ + /** Retrieve an asynchronous iterator of entry-payload-authorisation triples from the store for a given `Area`. */ async *query( areaOfInterest: AreaOfInterest, order: QueryOrder, diff --git a/src/replica/types.ts b/src/store/types.ts similarity index 82% rename from src/replica/types.ts rename to src/store/types.ts index 11baace..149ea64 100644 --- a/src/replica/types.ts +++ b/src/store/types.ts @@ -15,7 +15,7 @@ export type NamespaceScheme = EncodingScheme & { export type SubspaceScheme = EncodingScheme & { successor: SuccessorFn; order: TotalOrder; - minimalSubspaceKey: SubspaceId; + minimalSubspaceId: SubspaceId; }; export type PayloadScheme = EncodingScheme & { @@ -24,33 +24,33 @@ export type PayloadScheme = EncodingScheme & { }; export type AuthorisationScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, > = { /** Produce an authorisation token from an entry */ authorise( - entry: Entry, + entry: Entry, opts: AuthorisationOpts, ): Promise; /** Verify if an entry is authorised to be written */ isAuthorisedWrite: ( - entry: Entry, + entry: Entry, token: AuthorisationToken, ) => Promise; tokenEncoding: EncodingScheme; }; export type FingerprintScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint, > = { fingerprintSingleton( - entry: Entry, + entry: Entry, ): Promise; fingerprintCombine( a: Fingerprint, @@ -61,8 +61,8 @@ export type FingerprintScheme< /** Concrete parameters peculiar to a specific usage of Willow. */ export interface ProtocolParameters< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, @@ -70,56 +70,56 @@ export interface ProtocolParameters< > { pathScheme: PathScheme; - namespaceScheme: NamespaceScheme; + namespaceScheme: NamespaceScheme; - subspaceScheme: SubspaceScheme; + subspaceScheme: SubspaceScheme; // Learn about payloads and producing them from bytes payloadScheme: PayloadScheme; authorisationScheme: AuthorisationScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken >; fingerprintScheme: FingerprintScheme< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; } -export type ReplicaOpts< - NamespaceKey, - SubspaceKey, +export type StoreOpts< + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, Fingerprint, > = { - /** The public key of the namespace this replica is a snapshot of. */ - namespace: NamespaceKey; - /** The protocol parameters this replica should use. */ + /** The public key of the namespace this store holds entries for. */ + namespace: NamespaceId; + /** The protocol parameters this store should use. */ protocolParameters: ProtocolParameters< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, AuthorisationOpts, AuthorisationToken, Fingerprint >; - /** An optional driver used to store and retrieve a replica's entries. */ + /** An optional driver used to store and retrieve a store's entries. */ entryDriver?: EntryDriver< - NamespaceKey, - SubspaceKey, + NamespaceId, + SubspaceId, PayloadDigest, Fingerprint >; - /** An option driver used to store and retrieve a replica's payloads. */ + /** An option driver used to store and retrieve a store's payloads. */ payloadDriver?: PayloadDriver; }; diff --git a/src/store/util.ts b/src/store/util.ts new file mode 100644 index 0000000..6320632 --- /dev/null +++ b/src/store/util.ts @@ -0,0 +1,47 @@ +import { join } from "https://deno.land/std@0.188.0/path/mod.ts"; +import { EntryDriverKvStore } from "./storage/entry_drivers/kv_store.ts"; +import { PayloadDriverFilesystem } from "./storage/payload_drivers/filesystem.ts"; +import { PayloadScheme, ProtocolParameters } from "./types.ts"; +import { ensureDir } from "https://deno.land/std@0.188.0/fs/ensure_dir.ts"; +import { bigintToBytes, concat, EncodingScheme, Path } from "../../deps.ts"; +import { KvDriverDeno } from "./storage/kv/kv_driver_deno.ts"; + +/** Create a pair of entry and payload drivers for use with a {@link Store} which will store their data at a given filesystem path. */ +export async function getPersistedDrivers< + NamespacePublicKey, + SubspacePublicKey, + PayloadDigest, + AuthorisationOpts, + AuthorisationToken, + Fingerprint, +>( + /** The filesystem path to store entry and payload data within. */ + path: string, + protocolParameters: ProtocolParameters< + NamespacePublicKey, + SubspacePublicKey, + PayloadDigest, + AuthorisationOpts, + AuthorisationToken, + Fingerprint + >, +) { + const kvPath = join(path, "entries"); + const payloadPath = join(path, "payloads"); + + await ensureDir(path); + + // TODO: Use the platform appropriate KV driver. + const kv = await Deno.openKv(kvPath); + + return { + entryDriver: new EntryDriverKvStore({ + ...protocolParameters, + kvDriver: new KvDriverDeno(kv), + }), + payloadDriver: new PayloadDriverFilesystem( + payloadPath, + protocolParameters.payloadScheme, + ), + }; +} diff --git a/src/test/test_schemes.ts b/src/test/test_schemes.ts index 47965ba..023b17b 100644 --- a/src/test/test_schemes.ts +++ b/src/test/test_schemes.ts @@ -12,7 +12,7 @@ import { NamespaceScheme, PayloadScheme, SubspaceScheme, -} from "../replica/types.ts"; +} from "../store/types.ts"; import { importPublicKey } from "./crypto.ts"; export const testSchemeNamespace: NamespaceScheme = { @@ -26,7 +26,7 @@ export const testSchemeSubspace: SubspaceScheme = { encode: (v) => v, decode: (v) => v.subarray(0, 65), encodedLength: () => 65, - minimalSubspaceKey: new Uint8Array(65), + minimalSubspaceId: new Uint8Array(65), order: orderBytes, successor: successorBytesFixedWidth, };