From 5d27fb69d50a3640458ae9ff59c4a54838588724 Mon Sep 17 00:00:00 2001 From: BlowaterNostr Date: Mon, 8 Apr 2024 15:34:24 +0800 Subject: [PATCH] limit --- deno.json | 2 +- main.tsx | 7 +++-- makefile | 4 ++- resolvers/event.ts | 66 +++++++++++++++++++++++++++++++++++++++++----- test.ts | 8 +++++- ws.ts | 51 ++++++++++++++++++++++++++++------- 6 files changed, 114 insertions(+), 24 deletions(-) diff --git a/deno.json b/deno.json index caaac4c..1eaa94b 100644 --- a/deno.json +++ b/deno.json @@ -9,7 +9,7 @@ "fresh", "recommended" ], - "exclude": ["require-await", "require-yield", "no-unused-vars"] + "exclude": ["require-await", "require-yield", "no-unused-vars", "no-empty"] } }, "exclude": [ diff --git a/main.tsx b/main.tsx index 684f776..a33ec21 100644 --- a/main.tsx +++ b/main.tsx @@ -15,10 +15,7 @@ import { interface_GetEventsByAuthors } from "./resolvers/event.ts"; import Landing from "./routes/landing.tsx"; import Error404 from "./routes/_404.tsx"; import { RelayInformation, RelayInformationStore } from "./resolvers/nip11.ts"; -import { DB } from "https://deno.land/x/sqlite/mod.ts"; - -// Open a database -const db = new DB("test.db"); +import { func_GetEventsByFilter } from "./resolvers/event.ts"; const schema = gql.buildSchema(gql.print(typeDefs)); @@ -97,6 +94,7 @@ export async function run(args: { get_events_by_IDs: eventStore.get_events_by_IDs.bind(eventStore), get_events_by_kinds: eventStore.get_events_by_kinds.bind(eventStore), get_events_by_authors: eventStore.get_events_by_authors.bind(eventStore), + get_events_by_filter: eventStore.get_events_by_filter.bind(eventStore), policyStore, relayInformationStore, kv: args.kv, @@ -123,6 +121,7 @@ export type EventReadWriter = { write_event: func_WriteEvent; get_events_by_IDs: func_GetEventsByIDs; get_events_by_kinds: func_GetEventsByKinds; + get_events_by_filter: func_GetEventsByFilter; } & interface_GetEventsByAuthors; const root_handler = ( diff --git a/makefile b/makefile index 9d091b0..60baee5 100644 --- a/makefile +++ b/makefile @@ -5,4 +5,6 @@ fmt: deno fmt test: fmt - deno task test + deno test --allow-net --unstable --allow-read --allow-write \ + --filter main \ + --coverage test.ts diff --git a/resolvers/event.ts b/resolvers/event.ts index ea8440f..b2add76 100644 --- a/resolvers/event.ts +++ b/resolvers/event.ts @@ -1,4 +1,12 @@ -import { NostrEvent, NostrKind } from "../_libs.ts"; +import { + InMemoryAccountContext, + NostrEvent, + NostrFilter, + NostrKind, + prepareNormalNostrEvent, +} from "../_libs.ts"; +import { EventReadWriter } from "../main.tsx"; +import { assertEquals } from "https://deno.land/std@0.202.0/assert/assert_equals.ts"; export type Actor = { type: "admin"; @@ -27,17 +35,14 @@ export type interface_GetEventsByAuthors = { get_events_by_authors: func_GetEventsByAuthors; }; +export type func_GetEventsByFilter = (filter: NostrFilter) => AsyncIterable; + export type func_WriteEvent = (event: NostrEvent) => Promise; export type interface_WriteEvent = { write_event: func_WriteEvent; }; -export class EventStore - implements - interface_GetEventsByAuthors, - interface_WriteEvent, - interface_GetEventsByIDs, - interface_GetEventsByKinds { +export class EventStore implements EventReadWriter { private constructor( private events: Map, private kv: Deno.Kv, @@ -77,6 +82,18 @@ export class EventStore } } + async *get_events_by_filter(filter: NostrFilter) { + let i = 0; + for (const event of this.events.values()) { + if (isMatched(event, filter)) { + if (filter.limit && i < filter.limit) { + yield event; + } + i++; + } + } + } + async write_event(event: NostrEvent) { console.log("write_event", event); const result = await this.kv.atomic() @@ -92,3 +109,38 @@ export class EventStore return result.ok; } } + +function isMatched(event: NostrEvent, filter: NostrFilter) { + const kinds = filter.kinds || []; + const authors = filter.authors || []; + const ids = filter.ids || []; + const ps = filter["#p"] || []; + const es = filter["#e"] || []; + + const match_kind = kinds.length == 0 ? true : kinds.includes(event.kind); + const match_author = authors.length == 0 ? true : authors.includes(event.pubkey); + const match_id = ids.length == 0 ? true : ids.includes(event.id); + const match_p_tag = ps.length == 0 ? true : ps.includes(event.pubkey); + const match_e_tag = es.length == 0 ? true : es.includes(event.id); + return ( + match_kind && + match_author && + match_id && + match_p_tag && + match_e_tag + ) || + (kinds.length == 0 && authors.length == 0 && ids.length == 0 && + ps.length == 0 && es.length == 0); +} + +Deno.test("isMatched", async () => { + const ctx = InMemoryAccountContext.Generate(); + const event = await prepareNormalNostrEvent(ctx, { + content: "", + kind: 1, + }); + const is = isMatched(event, { + limit: 1, + }); + assertEquals(is, true); +}); diff --git a/test.ts b/test.ts index 3ed4272..b9e60c6 100644 --- a/test.ts +++ b/test.ts @@ -13,6 +13,7 @@ import { SingleRelayConnection, SubscriptionStream, } from "./_libs.ts"; +import { limit } from "https://raw.githubusercontent.com/BlowaterNostr/nostr.ts/main/relay-single-test.ts"; const test_kv = async () => { try { @@ -136,6 +137,11 @@ Deno.test("main", async (t) => { assertIsError(err, Error); }); + + await t.step("nip1", async () => { + await limit(relay.url)() + }); + await client.close(); await relay.shutdown(); }); @@ -210,7 +216,7 @@ async function randomEvent(ctx: InMemoryAccountContext, kind?: NostrKind, conten return event; } -async function queryGql(relay: Relay, query: string, variables?: Record) { +async function queryGql(relay: Relay, query: string, variables?: object) { const { hostname, port } = new URL(relay.url); const res = await fetch(`http://${hostname}:${port}/api`, { method: "POST", diff --git a/ws.ts b/ws.ts index 514a621..46d77da 100644 --- a/ws.ts +++ b/ws.ts @@ -3,6 +3,7 @@ import { func_ResolvePolicyByKind } from "./resolvers/policy.ts"; import { DefaultPolicy, EventReadWriter } from "./main.tsx"; import { func_GetEventsByAuthors, + func_GetEventsByFilter, func_GetEventsByIDs, func_GetEventsByKinds, func_WriteEvent, @@ -56,7 +57,23 @@ export const ws_handler = ( return response; }; -export type SubscriptionMap = Map; +export type SubscriptionMap = Map; +export type Subscription = { + filter: NostrFilter; + eventSent: number; +}[]; + +function send_event_to_subscription(ws: WebSocket, event: NostrEvent, sub_id: string, filter: { + filter: NostrFilter; + eventSent: number; +}) { + if ((filter.filter.limit && filter.eventSent < filter.filter.limit) || filter.filter.limit == undefined) { + ws.send(JSON.stringify(respond_event(sub_id, event))); + filter.eventSent++; + return true; + } + return false; +} function onMessage( deps: { @@ -153,10 +170,7 @@ async function handle_cmd_event(args: { if (policy.read === false) { return; } - send( - matched.socket, - JSON.stringify(respond_event(matched.sub_id, event)), - ); + send_event_to_subscription(matched.socket, event, matched.sub_id, matched.filter); } } @@ -172,7 +186,15 @@ async function handle_cmd_req( const sub_id = nostr_ws_msg[1]; const filters = nostr_ws_msg.slice(2) as NostrFilter[]; - args.connections.get(this_socket)?.set(sub_id, filters); + args.connections.get(this_socket)?.set( + sub_id, + filters.map((f) => { + return { + filter: f, + eventSent: 0, + }; + }), + ); // query this filter for (const filter of filters) { @@ -190,6 +212,7 @@ async function handle_filter(args: { get_events_by_IDs: func_GetEventsByIDs; get_events_by_kinds: func_GetEventsByKinds; get_events_by_authors: func_GetEventsByAuthors; + get_events_by_filter: func_GetEventsByFilter; resolvePolicyByKind: func_ResolvePolicyByKind; }) { const event_candidates = new Map(); @@ -233,6 +256,11 @@ async function handle_filter(args: { } } } + if (filter.limit) { + for await (const event of args.get_events_by_filter(filter)) { + event_candidates.set(event.id, event); + } + } return event_candidates; } @@ -267,19 +295,22 @@ function* matchEventWithSubscriptions( console.log(subscriptions); for (const [sub_id, filters] of subscriptions) { console.log(sub_id, filters); - if (isMatched(event, filters)) { + const matched_filter = isMatched(event, filters); + if (matched_filter) { yield { socket, sub_id, event, + filter: matched_filter, }; } } } } -export function isMatched(event: NostrEvent, filters: NostrFilter[]) { - for (const filter of filters) { +export function isMatched(event: NostrEvent, subscription: Subscription) { + for (const _filter of subscription) { + const filter = _filter.filter; const kinds = filter.kinds || []; const authors = filter.authors || []; const ids = filter.ids || []; @@ -304,7 +335,7 @@ export function isMatched(event: NostrEvent, filters: NostrFilter[]) { // filter.until if (res) { - return res; + return _filter; } } }