Skip to content

Commit

Permalink
limit
Browse files Browse the repository at this point in the history
  • Loading branch information
BlowaterNostr committed Apr 8, 2024
1 parent 32aeca7 commit 5d27fb6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 24 deletions.
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"fresh",
"recommended"
],
"exclude": ["require-await", "require-yield", "no-unused-vars"]
"exclude": ["require-await", "require-yield", "no-unused-vars", "no-empty"]
}
},
"exclude": [
Expand Down
7 changes: 3 additions & 4 deletions main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ import { interface_GetEventsByAuthors } from "./resolvers/event.ts";
import Landing from "./routes/landing.tsx";
import Error404 from "./routes/_404.tsx";
import { RelayInformation, RelayInformationStore } from "./resolvers/nip11.ts";
import { DB } from "https://deno.land/x/sqlite/mod.ts";

// Open a database
const db = new DB("test.db");
import { func_GetEventsByFilter } from "./resolvers/event.ts";

const schema = gql.buildSchema(gql.print(typeDefs));

Expand Down Expand Up @@ -97,6 +94,7 @@ export async function run(args: {
get_events_by_IDs: eventStore.get_events_by_IDs.bind(eventStore),
get_events_by_kinds: eventStore.get_events_by_kinds.bind(eventStore),
get_events_by_authors: eventStore.get_events_by_authors.bind(eventStore),
get_events_by_filter: eventStore.get_events_by_filter.bind(eventStore),
policyStore,
relayInformationStore,
kv: args.kv,
Expand All @@ -123,6 +121,7 @@ export type EventReadWriter = {
write_event: func_WriteEvent;
get_events_by_IDs: func_GetEventsByIDs;
get_events_by_kinds: func_GetEventsByKinds;
get_events_by_filter: func_GetEventsByFilter;
} & interface_GetEventsByAuthors;

const root_handler = (
Expand Down
4 changes: 3 additions & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ fmt:
deno fmt

test: fmt
deno task test
deno test --allow-net --unstable --allow-read --allow-write \
--filter main \
--coverage test.ts
66 changes: 59 additions & 7 deletions resolvers/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { NostrEvent, NostrKind } from "../_libs.ts";
import {
InMemoryAccountContext,
NostrEvent,
NostrFilter,
NostrKind,
prepareNormalNostrEvent,
} from "../_libs.ts";
import { EventReadWriter } from "../main.tsx";
import { assertEquals } from "https://deno.land/[email protected]/assert/assert_equals.ts";

export type Actor = {
type: "admin";
Expand Down Expand Up @@ -27,17 +35,14 @@ export type interface_GetEventsByAuthors = {
get_events_by_authors: func_GetEventsByAuthors;
};

export type func_GetEventsByFilter = (filter: NostrFilter) => AsyncIterable<NostrEvent>;

export type func_WriteEvent = (event: NostrEvent) => Promise<boolean>;
export type interface_WriteEvent = {
write_event: func_WriteEvent;
};

export class EventStore
implements
interface_GetEventsByAuthors,
interface_WriteEvent,
interface_GetEventsByIDs,
interface_GetEventsByKinds {
export class EventStore implements EventReadWriter {
private constructor(
private events: Map<string, NostrEvent>,
private kv: Deno.Kv,
Expand Down Expand Up @@ -77,6 +82,18 @@ export class EventStore
}
}

async *get_events_by_filter(filter: NostrFilter) {
let i = 0;
for (const event of this.events.values()) {
if (isMatched(event, filter)) {
if (filter.limit && i < filter.limit) {
yield event;
}
i++;
}
}
}

async write_event(event: NostrEvent) {
console.log("write_event", event);
const result = await this.kv.atomic()
Expand All @@ -92,3 +109,38 @@ export class EventStore
return result.ok;
}
}

function isMatched(event: NostrEvent, filter: NostrFilter) {
const kinds = filter.kinds || [];
const authors = filter.authors || [];
const ids = filter.ids || [];
const ps = filter["#p"] || [];
const es = filter["#e"] || [];

const match_kind = kinds.length == 0 ? true : kinds.includes(event.kind);
const match_author = authors.length == 0 ? true : authors.includes(event.pubkey);
const match_id = ids.length == 0 ? true : ids.includes(event.id);
const match_p_tag = ps.length == 0 ? true : ps.includes(event.pubkey);
const match_e_tag = es.length == 0 ? true : es.includes(event.id);
return (
match_kind &&
match_author &&
match_id &&
match_p_tag &&
match_e_tag
) ||
(kinds.length == 0 && authors.length == 0 && ids.length == 0 &&
ps.length == 0 && es.length == 0);
}

Deno.test("isMatched", async () => {
const ctx = InMemoryAccountContext.Generate();
const event = await prepareNormalNostrEvent(ctx, {
content: "",
kind: 1,
});
const is = isMatched(event, {
limit: 1,
});
assertEquals(is, true);
});
8 changes: 7 additions & 1 deletion test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
SingleRelayConnection,
SubscriptionStream,
} from "./_libs.ts";
import { limit } from "https://raw.githubusercontent.com/BlowaterNostr/nostr.ts/main/relay-single-test.ts";

const test_kv = async () => {
try {
Expand Down Expand Up @@ -136,6 +137,11 @@ Deno.test("main", async (t) => {
assertIsError(err, Error);
});


await t.step("nip1", async () => {
await limit(relay.url)()
});

await client.close();
await relay.shutdown();
});
Expand Down Expand Up @@ -210,7 +216,7 @@ async function randomEvent(ctx: InMemoryAccountContext, kind?: NostrKind, conten
return event;
}

async function queryGql(relay: Relay, query: string, variables?: Record<string, any>) {
async function queryGql(relay: Relay, query: string, variables?: object) {
const { hostname, port } = new URL(relay.url);
const res = await fetch(`http://${hostname}:${port}/api`, {
method: "POST",
Expand Down
51 changes: 41 additions & 10 deletions ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { func_ResolvePolicyByKind } from "./resolvers/policy.ts";
import { DefaultPolicy, EventReadWriter } from "./main.tsx";
import {
func_GetEventsByAuthors,
func_GetEventsByFilter,
func_GetEventsByIDs,
func_GetEventsByKinds,
func_WriteEvent,
Expand Down Expand Up @@ -56,7 +57,23 @@ export const ws_handler = (
return response;
};

export type SubscriptionMap = Map<string, NostrFilter[]>;
export type SubscriptionMap = Map<string, Subscription>;
export type Subscription = {
filter: NostrFilter;
eventSent: number;
}[];

function send_event_to_subscription(ws: WebSocket, event: NostrEvent, sub_id: string, filter: {
filter: NostrFilter;
eventSent: number;
}) {
if ((filter.filter.limit && filter.eventSent < filter.filter.limit) || filter.filter.limit == undefined) {
ws.send(JSON.stringify(respond_event(sub_id, event)));
filter.eventSent++;
return true;
}
return false;
}

function onMessage(
deps: {
Expand Down Expand Up @@ -153,10 +170,7 @@ async function handle_cmd_event(args: {
if (policy.read === false) {
return;
}
send(
matched.socket,
JSON.stringify(respond_event(matched.sub_id, event)),
);
send_event_to_subscription(matched.socket, event, matched.sub_id, matched.filter);
}
}

Expand All @@ -172,7 +186,15 @@ async function handle_cmd_req(
const sub_id = nostr_ws_msg[1];
const filters = nostr_ws_msg.slice(2) as NostrFilter[];

args.connections.get(this_socket)?.set(sub_id, filters);
args.connections.get(this_socket)?.set(
sub_id,
filters.map((f) => {
return {
filter: f,
eventSent: 0,
};
}),
);

// query this filter
for (const filter of filters) {
Expand All @@ -190,6 +212,7 @@ async function handle_filter(args: {
get_events_by_IDs: func_GetEventsByIDs;
get_events_by_kinds: func_GetEventsByKinds;
get_events_by_authors: func_GetEventsByAuthors;
get_events_by_filter: func_GetEventsByFilter;
resolvePolicyByKind: func_ResolvePolicyByKind;
}) {
const event_candidates = new Map<string, NostrEvent>();
Expand Down Expand Up @@ -233,6 +256,11 @@ async function handle_filter(args: {
}
}
}
if (filter.limit) {
for await (const event of args.get_events_by_filter(filter)) {
event_candidates.set(event.id, event);
}
}
return event_candidates;
}

Expand Down Expand Up @@ -267,19 +295,22 @@ function* matchEventWithSubscriptions(
console.log(subscriptions);
for (const [sub_id, filters] of subscriptions) {
console.log(sub_id, filters);
if (isMatched(event, filters)) {
const matched_filter = isMatched(event, filters);
if (matched_filter) {
yield {
socket,
sub_id,
event,
filter: matched_filter,
};
}
}
}
}

export function isMatched(event: NostrEvent, filters: NostrFilter[]) {
for (const filter of filters) {
export function isMatched(event: NostrEvent, subscription: Subscription) {
for (const _filter of subscription) {
const filter = _filter.filter;
const kinds = filter.kinds || [];
const authors = filter.authors || [];
const ids = filter.ids || [];
Expand All @@ -304,7 +335,7 @@ export function isMatched(event: NostrEvent, filters: NostrFilter[]) {
// filter.until

if (res) {
return res;
return _filter;
}
}
}
Expand Down

0 comments on commit 5d27fb6

Please sign in to comment.