Skip to content

Commit

Permalink
nats: more work on eventually consistent key value store
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 7, 2025
1 parent f4579bc commit 8041513
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 36 deletions.
38 changes: 35 additions & 3 deletions src/packages/frontend/nats/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { join } from "path";
import * as jetstream from "@nats-io/jetstream";
import { createSyncTable, type SyncTable } from "@cocalc/nats/sync/synctable";
import { randomId } from "@cocalc/nats/names";
import { browserSubject, projectSubject } from "@cocalc/nats/names";
import { browserSubject, projectSubject, kvName } from "@cocalc/nats/names";
import { parse_query } from "@cocalc/sync/table/util";
import { sha1 } from "@cocalc/util/misc";
import { keys } from "lodash";
Expand All @@ -20,6 +20,7 @@ import { PubSub } from "@cocalc/nats/sync/pubsub";
import type { ChatOptions } from "@cocalc/util/types/llm";
import { SystemKv } from "@cocalc/nats/system";
import { KV } from "@cocalc/nats/sync/kv";
import { EventuallyConsistentKV } from "@cocalc/nats/sync/eventually-consistent-kv";
import { initApi } from "@cocalc/frontend/nats/api";
import { delay } from "awaiting";
import { Svcm } from "@nats-io/services";
Expand Down Expand Up @@ -394,14 +395,17 @@ export class NatsClient {
private kvCache: { [key: string]: KV } = {};
kv = reuseInFlight(
async ({
name,
account_id,
project_id,
filter,
options,
}: {
name: string;
account_id?: string;
project_id?: string;
filter?: string | string[];
options?;
}) => {
const name = kvName({ account_id, project_id });
const key = JSON.stringify([name, filter, options]);
if (this.kvCache[key] == null) {
const kv = new KV({
Expand All @@ -420,6 +424,34 @@ export class NatsClient {
},
);

eckv = async ({
account_id,
project_id,
filter,
options,
resolve,
}: {
account_id?: string;
project_id?: string;
filter?: string | string[];
options?;
resolve: (opts: { ancestor; local; remote }) => any;
}) => {
if (!account_id && !project_id) {
account_id = this.client.account_id;
}
const name = kvName({ account_id, project_id });
const eckv = new EventuallyConsistentKV({
name,
filter,
options,
resolve,
env: await this.getEnv(),
});
await eckv.init();
return eckv;
};

microservicesClient = async () => {
const nc = await this.getConnection();
// @ts-ignore
Expand Down
19 changes: 19 additions & 0 deletions src/packages/nats/names.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ export function randomId() {
return generateVouchers({ count: 1, length: 10 })[0];
}

export function kvName({
project_id,
account_id,
}: {
project_id?: string;
account_id?: string;
}) {
if (project_id) {
if (account_id) {
throw Error("both account_id and project_id can't be set");
}
return `project-${project_id}`;
}
if (!account_id) {
throw Error("at least one of account_id and project_id must be set");
}
return `account-${account_id}`;
}

export function projectSubject({
project_id,
compute_server_id = 0,
Expand Down
90 changes: 77 additions & 13 deletions src/packages/nats/sync/eventually-consistent-kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@ DEVELOPMENT:
Welcome to Node.js v18.17.1.
Type ".help" for more information.
> env = await require("@cocalc/backend/nats/env").getEnv(); a = require("@cocalc/nats/sync/eventually-consistent-kv"); s = new a.EventuallyConsistentKV({name:'test',env,filter:['foo.>'],resolve:({parent,local,remote})=>{return {...remote,...local}}}); await s.init();
In the browser console:
> s = await cc.client.nats_client.eckv({filter:['foo.>'],resolve:({parent,local,remote})=>{return {...remote,...local}}})
# NOTE that the name is account-{account_id} or project-{project_id},
# and if not given defaults to the account-{user's account id}
> s.kv.name
'account-6aae57c6-08f1-4bb5-848b-3ceb53e61ede'
> s.on('change',(key)=>console.log(key));0;
*/

import { EventEmitter } from "events";
import { KV } from "./kv";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { type NatsEnv } from "@cocalc/nats/types";
import { isEqual } from "lodash";
import { delay } from "awaiting";

const TOMBSTONE = Symbol("tombstone");

export class EventuallyConsistentKV extends EventEmitter {
private kv: KV;
private kv?: KV;
private local: { [key: string]: any } = {};
private resolve: (opts: { ancestor; local; remote }) => any;
private changed: Set<string> = new Set();
Expand All @@ -45,10 +58,24 @@ export class EventuallyConsistentKV extends EventEmitter {
}

init = reuseInFlight(async () => {
if (this.kv == null) {
throw Error("closed");
}
this.kv.on("change", this.handleRemoteChange);
await this.kv.init();
this.emit("connected");
});

close = () => {
if (this.kv == null) {
return;
}
this.kv.close();
this.emit("closed");
this.removeAllListeners();
delete this.kv;
};

private handleRemoteChange = (key, remote, ancestor) => {
const local = this.local[key];
if (local !== undefined) {
Expand All @@ -59,9 +86,21 @@ export class EventuallyConsistentKV extends EventEmitter {
this.local[key] = value ?? TOMBSTONE;
}
}
this.emit("change", key);
};

get = () => {
get = (key?) => {
if (this.kv == null) {
throw Error("closed");
}
if (key != null) {
this.assertValidKey(key);
const local = this.local[key];
if (local === TOMBSTONE) {
return undefined;
}
return local ?? this.kv.get(key);
}
const x = { ...this.kv.get(), ...this.local };
for (const key in this.local) {
if (this.local[key] === TOMBSTONE) {
Expand All @@ -71,37 +110,62 @@ export class EventuallyConsistentKV extends EventEmitter {
return x;
};

private assertValidKey = (key) => {
if (this.kv == null) {
throw Error("closed");
}
this.kv.assertValidKey(key);
};

delete = (key) => {
this.assertValidKey(key);
this.local[key] = TOMBSTONE;
this.changed.add(key);
this.save();
};

set = (...args) => {
if (args.length == 2) {
this.assertValidKey(args[0]);
this.local[args[0]] = args[1] ?? TOMBSTONE;
this.changed.add(args[0]);
} else {
const obj = args[0];
for (const key in obj) {
this.assertValidKey(key);
this.local[key] = obj[key] ?? TOMBSTONE;
this.changed.add(key);
}
}
this.tryToSave();
this.save();
};

private tryToSave = async () => {
try {
await this.save();
} catch (err) {
console.log("problem saving", err);
}
if (Object.keys(this.local).length > 0) {
setTimeout(this.tryToSave, 100);
}
};
hasUnsavedChanges = () =>
this.changed.size > 0 || Object.keys(this.local).length > 0;

private save = reuseInFlight(async () => {
let d = 100;
while (true) {
try {
await this.attemptToSave();
//console.log("successfully saved");
} catch {
//(err) {
// console.log("problem saving", err);
}
if (this.hasUnsavedChanges()) {
d = Math.min(10000, d * 1.3) + Math.random() * 100;
await delay(d);
} else {
return;
}
}
});

private attemptToSave = reuseInFlight(async () => {
if (this.kv == null) {
throw Error("closed");
}
this.changed.clear();
const obj = { ...this.local };
for (const key in obj) {
Expand Down
32 changes: 12 additions & 20 deletions src/packages/nats/sync/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ export class KV extends EventEmitter {
}
};

assertValidKey = (key: string) => {
if (!this.isValidKey(key)) {
throw Error(
`delete: key (=${key}) must match the filter: ${JSON.stringify(this.filter)}`,
);
}
};

isValidKey = (key: string) => {
if (this.filter == null) {
return true;
Expand All @@ -212,11 +220,7 @@ export class KV extends EventEmitter {
};

delete = async (key, revision?) => {
if (!this.isValidKey(key)) {
throw Error(
`delete: key (=${key}) must match the filter: ${JSON.stringify(this.filter)}`,
);
}
this.assertValidKey(key);
if (this.all == null || this.revisions == null || this.times == null) {
throw Error("not ready");
}
Expand Down Expand Up @@ -306,20 +310,8 @@ export class KV extends EventEmitter {
}
const revision = this.revisions[key];
const val = this.env.jc.encode(value);
const cur = this.all[key];
try {
this.all[key] = value;
const newRevision = await this.kv.put(key, val, {
previousSeq: revision,
});
this.revisions[key] = newRevision;
} catch (err) {
if (cur === undefined) {
delete this.all[key];
} else {
this.all[key] = cur;
}
throw err;
}
await this.kv.put(key, val, {
previousSeq: revision,
});
};
}

0 comments on commit 8041513

Please sign in to comment.