Skip to content

Commit

Permalink
problem: can't subscribe to events
Browse files Browse the repository at this point in the history
  • Loading branch information
gsovereignty committed Apr 13, 2024
1 parent a9d858f commit 7a830be
Show file tree
Hide file tree
Showing 3 changed files with 444 additions and 0 deletions.
232 changes: 232 additions & 0 deletions src/lib/workers/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import type { NDKEvent, NDKFilter } from "@nostr-dev-kit/ndk";
import NDKSvelte, {
type ExtendedBaseType,
type NDKEventStore,
} from "@nostr-dev-kit/ndk-svelte";
import { derived, get, writable, type Writable } from "svelte/store";

import NDKCacheAdapterDexie from "@nostr-dev-kit/ndk-cache-dexie";
import { Command, EventTreeItem, ResponseData, type RecursiveEventMap } from "./firehose.types";

const _ndk = writable(
new NDKSvelte({
//cacheAdapter: new NDKCacheAdapterDexie({ dbName: "wiki" }),
explicitRelayUrls: [
"wss://purplepag.es",
"wss://relay.nostr.band",
// "wss://nos.lol",
// "wss://relay.wikifreedia.xyz",
"wss://relay.nostrocket.org",
"wss://search.nos.today",
"wss://relay.damus.io",
// "wss://relay.nostr.bg",
"wss://relay.snort.social",
// "wss://offchain.pub",
"wss://relay.primal.net",
// "wss://pyramid.fiatjaf.com",
],
enableOutboxModel: false,
})
);

const ndk = get(_ndk);
let sub: NDKEventStore<ExtendedBaseType<NDKEvent>> | undefined = undefined;

let responseData: ResponseData | undefined; // = new ResponseData();
const workerEventMap = new Map<string, NDKEvent>();
const mostRecentReplaceableEvents: Map<string, NDKEvent> = new Map();
const replaceableKinds = [0, 3];
const processedIdForKind: Record<number, string> = {};

let responseStore: Writable<ResponseData> | undefined; // =
// responseStore.subscribe((response) => {
// postMessage(response);
// });

function init(pubkey?: string) {
if (!responseData) {
responseData = new ResponseData(pubkey);
responseStore = writable(responseData);
responseStore.subscribe((response) => {
postMessage(response);
});

let rootEvents = derived(responseStore!, ($responseStore) => {
return $responseStore.rootEvents.size;
});

rootEvents.subscribe(() => {
updateEventMap();
});

// let _masterFollows = writable(responseData.followLists.get(responseData.masterPubkey))
// let masterFollows = derived(_masterFollows, ($mfs)=>{
// if ($mfs) {
// return $mfs.size
// } else {
// return 0
// }
// })

let masterFollows = derived(responseStore, ($responseStore) =>{
if ($responseStore.masterPubkey) {
return $responseStore.followLists.get($responseStore.masterPubkey)?.size
}
})

masterFollows.subscribe(() => {
console.log(78)
let $responseStore = get(responseStore!);
if ($responseStore.masterPubkey) {
let follows = $responseStore.followLists.get($responseStore.masterPubkey);
if (follows) {
subscribe($responseStore.masterPubkey, [...follows]);
}
}

});
}
}

let subscribe = (pubkey: string, pubkeys?: string[]) => {
if (pubkey.length != 64) {
throw new Error("invalid pubkey");
}
if (!responseData) {
init(pubkey);
}
if (!responseData) {throw new Error("this should not happen")}
if (connectionStatus != 2) {
responseStore!.update((current) => {
current.errors.push(new Error("not connected!"));
return current;
});
return;
}
if (pubkey) {responseData.masterPubkey = pubkey}
if (!sub) {
sub = ndk.storeSubscribe(
{ kinds: [0, 1, 3, 7], authors: [pubkey] },
{ subId: "master" }
);
sub.subscribe((events) => {
for (let event of events) {
if (!workerEventMap.has(event.id)) {
workerEventMap.set(event.id, event);
}
let shouldPush = true;
if (replaceableKinds.includes(event.kind!)) {
let existing = mostRecentReplaceableEvents.get(event.deduplicationKey());
if (existing && event.created_at! < existing.created_at!) {
shouldPush = false;
} else {
mostRecentReplaceableEvents.set(event.deduplicationKey(), event);
}
}
if (shouldPush) {
responseData!.PushEvent(event.rawEvent());
}
}
responseStore!.update((current) => {
current.rawCount = events.length;
if (sub?.subscription) {
for (let [s, relay] of sub.subscription.pool.relays) {
current.connections.set(s, relay.activeSubscriptions().size);
}
} else {
current.connections = new Map();
}
return current;
});
});
} else {
let newFilters: NDKFilter[] = [];
let authors = new Set<string>();
if (pubkeys) {
authors = new Set<string>(pubkeys);
}
authors.add(pubkey);
if (sub.filters) {
for (let fi of sub.filters) {
if (fi.authors) {
for (let author of fi.authors) {
authors.add(author);
}
}
}
}
newFilters = [];
newFilters.push({ kinds: [3], authors: [pubkey] });
for (let author of authors) {
newFilters.push({ kinds: [0, 1, 3, 7], authors: [author] });
}
sub.changeFilters(newFilters);
console.log(sub.filters)
sub.unsubscribe();
sub.startSubscription();
}
};

let connectionStatus = 0;

onmessage = (m: MessageEvent<Command>) => {
if (!responseData) {
init()
}
if (m.data.command == "connect") {
if (connectionStatus == 0) {
responseData!.connected = 1
connectionStatus = 1;
ndk.connect(5000).then(() => {
responseData!.connected = 2
connectionStatus = 2;
if (m.data.pubkey) {
subscribe(m.data.pubkey);
}
});
}
}

if (m.data.command == "start") {
if (m.data.pubkey) {
subscribe(m.data.pubkey);
}
}
if (m.data.command == "stop") {
if (sub) {
sub.unsubscribe();
}
}
if (m.data.command == "print") {
updateEventMap();
}
};

function updateEventMap() {
//todo: crawl over all events and fetch parents up to the root if we don't already have them
//todo: find all replies to root events (use outbox mode too)
//these need to be live subs when user is viewing a thread, so we need to add/remove filters based on what they're looking at.
let m: RecursiveEventMap = new Map();
for (let id of responseData!.rootEvents) {
m.set(id, new EventTreeItem(workerEventMap.get(id)!.rawEvent()));
}
responseData!.recursiveEvents.children = iterate(m);
}

function iterate(m: Map<string, EventTreeItem>): Map<string, EventTreeItem> {
for (let [id, treeItem] of m) {
let children: Map<string, EventTreeItem> = new Map();
let _children = responseData!.etags.get(treeItem.event.id!);
if (_children) {
for (let eventID of _children) {
let _child_event = responseData!.events.get(eventID);
if (_child_event) {
children.set(eventID, new EventTreeItem(_child_event));
}
}
}
treeItem.children = iterate(children);
m.set(id, treeItem);
}
return m;
}
155 changes: 155 additions & 0 deletions src/lib/workers/firehose.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { NDKEvent, type NDKSigner, type NDKTag, type NostrEvent } from "@nostr-dev-kit/ndk";

export class Command {
command: "start" | "stop" | "connect" | "print" | "nip07";
pubkey?: string;
signer?:NDKSigner;
constructor(
command: "start" | "stop" | "connect" | "print" | "nip07",
pubkey?: string
) {
this.signer = this.signer;
this.command = command;
this.pubkey = pubkey ? pubkey : "";
}
}

export class ResponseData {
connected: number = 0;
rawCount: number = 0;
count(): number {
return this.events.size;
}
connections: Map<string, number>;
errors: Error[];
events: Map<string, NostrEvent>;
etags: Map<string, Set<string>>;
kinds: Map<number, number>;
rootEvents: Set<string>;
recursiveEvents: EventTreeItem;
followLists: Map<string, Set<string>>;
masterPubkey: string | undefined;
constructor(pubkey?:string) {
this.masterPubkey = pubkey;
this.recursiveEvents = new EventTreeItem({id:"", created_at:0, content:"", tags:[], pubkey:""})
this.etags = new Map();
this.followLists = new Map();
this.connections = new Map();
this.errors = [];
this.events = new Map();
this.kinds = new Map();
this.rootEvents = new Set();
}

PushEvent(ev: NostrEvent): void {
if (!this.events.has(ev.id!)) {
this.events.set(ev.id!, ev);
{
let existing = this.kinds.get(ev.kind!);
if (!existing) {
existing = 0;
}
existing++;
this.kinds.set(ev.kind!, existing);
}
if (ev.kind == 3) {
this.followLists.set(ev.pubkey, new Set(ev.tags.filter((t: NDKTag) => t[0] == 'p').map((t: NDKTag) => t[1])))
}
if (ev.kind == 1 || ev.kind == 7) {
let e = new NDKEvent(undefined, ev);
let found = false;
for (let t of e.getMatchingTags("e")) {
if (t[1].length == 64 && !t.includes("root")) {
let existing = this.etags.get(t[1]);
if (!existing) {
existing = new Set<string>();
}
existing.add(e.id);
this.etags.set(t[1], existing);
}

if (!t.includes("mention")) {
found = true;
}
}
if (!found) {
this.rootEvents.add(ev.id!);
}
}
}
}
}



function getNestedEvents(events: Map<string, NostrEvent>,
etags: Map<string, Set<string>>) {

}

export type RecursiveEventMap = Map<string, EventTreeItem>;
function PopulateEventMap(
m: RecursiveEventMap,
nempool: Map<string, NDKEvent>
) {
m.forEach((treeItem) => {
if (treeItem.dirty) {
}
});
}

export class EventTreeItem {
//id: string; //if we have replies etc but don't have the event itself we need to fetch it
event: NostrEvent; //pseudo event if root
children: RecursiveEventMap;
allChildrenInMap():Set<string> {
return iterate(this.children)
}
findChild(id:string):EventTreeItem | undefined {
return find(this.children, id)
}
reacts(): Map<string, NostrEvent> {
let m: Map<string, NostrEvent> = new Map();
for (let [id, { event }] of this.children) {
if (event.kind! == 7) {
m.set(id, event);
}
}
return m;
}
push(ev: NDKEvent) {
for (let t of ev.getMatchingTags("e")) {
if (t.includes(this.event.id!)) {
if (!this.children.has(ev.id)) {
this.children.set(ev.id, new EventTreeItem(ev.rawEvent()));
}
}
}
}
dirty: boolean;
root: string | undefined;
constructor(e: NostrEvent) {
this.dirty = true;
this.event = e;
this.children = new Map();
}
}

function iterate(m:RecursiveEventMap):Set<string> {
let s = new Set<string>()
for (let [id, item] of m) {
iterate(item.children)
s.add(id);
}
return s
}

function find(m:RecursiveEventMap, id:string):EventTreeItem | undefined {
if (m.get(id)) {return m.get(id)!}
for (let [_, item] of m) {
let c = find(item.children, id)
if (c) {
return c
}
}
}
Loading

0 comments on commit 7a830be

Please sign in to comment.