From fa279319c5d27e3a9b976ec959f92d252e66b8d9 Mon Sep 17 00:00:00 2001 From: Rinse Date: Thu, 17 Aug 2023 20:31:45 +0000 Subject: [PATCH] perf(daemon): optimize seeding --- src/api/seeder.ts | 58 ++++++++++++++++++++++++----------------------- src/api/server.ts | 6 +++-- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/api/seeder.ts b/src/api/seeder.ts index 818ca9b..267fb00 100644 --- a/src/api/seeder.ts +++ b/src/api/seeder.ts @@ -4,19 +4,26 @@ import { Plebbit } from "@plebbit/plebbit-js/dist/node/plebbit"; import { Subplebbit } from "@plebbit/plebbit-js/dist/node/subplebbit"; import { BasePages } from "@plebbit/plebbit-js/dist/node/pages"; import { Comment } from "@plebbit/plebbit-js/dist/node/comment"; -import pLimit from "p-limit"; import assert from "assert"; async function _loadAllPages(pageCid: string, pagesInstance: BasePages): Promise { - let sortedCommentsPage = await pagesInstance.getPage(pageCid); - let sortedComments: Comment[] = sortedCommentsPage.comments; - while (sortedCommentsPage.nextCid) { - sortedCommentsPage = await pagesInstance.getPage(sortedCommentsPage.nextCid); - sortedComments = sortedComments.concat(sortedCommentsPage.comments); + const log = Logger("plebbit-cli:server:seed:_loadAllPages"); + try { + let sortedCommentsPage = await pagesInstance.getPage(pageCid); + let sortedComments: Comment[] = sortedCommentsPage.comments; + while (sortedCommentsPage.nextCid) { + sortedCommentsPage = await pagesInstance.getPage(sortedCommentsPage.nextCid); + sortedComments = sortedComments.concat(sortedCommentsPage.comments); + } + + return sortedComments; + } catch (e) { + log.error(`Failed to load page (${pageCid}) of sub (${pagesInstance._subplebbitAddress}) due to error:`, e); + return []; } - return sortedComments; } +const seededIpns: Record = {}; async function _seedSub(sub: Subplebbit, pinnedCids: string[]) { const log = Logger("plebbit-cli:server:seed"); if (sub.statsCid) await sub.plebbit.fetchCid(sub.statsCid); // Seed stats @@ -36,31 +43,25 @@ async function _seedSub(sub: Subplebbit, pinnedCids: string[]) { // Fetch all comments CID allCidsToPin.push(...loadedPagesWithNames["new"].map((comment) => comment.cid)); - // Fetch all previousCommentCid of authors - allCidsToPin.push( - ...loadedPagesWithNames["new"] - .filter((comment) => typeof comment.author.previousCommentCid === "string") - .map((comment) => comment.author.previousCommentCid) - ); - - // Fetch all comments' CommentUpdate IPNS - const limit = pLimit(30); // Can only fetch 30 IPNS at a time - // TODO don't load all IPNS, instead only load the ones whose updatedAt did not change. We need to store all updatedAt somewhere - const ipnsRes = await Promise.allSettled( - loadedPagesWithNames["new"].map((comment) => - limit(() => sub.plebbit._clientsManager.fetchFromMultipleGateways({ ipns: comment.ipnsName }, "subplebbit")) - ) - ); - - log.trace( - `Loaded and seeded ${ipnsRes.filter((res) => res.status === "fulfilled").length} CommentUpdate and failed to seed ${ - ipnsRes.filter((res) => res.status === "rejected").length - } of sub (${sub.address})` - ); + // Seed IPNS + for (const comment of loadedPagesWithNames["new"]) { + assert(comment.ipnsName); + if (seededIpns[comment.ipnsName]?.lastSeededAt !== comment.updatedAt) { + try { + await comment._clientsManager.fetchCommentUpdate(comment.ipnsName); + seededIpns[comment.ipnsName] = { lastSeededAt: comment.updatedAt }; + log.trace(`Seeded comment (${comment.cid}) IPNS (${comment.ipnsName})`); + } catch (e) { + log.error(`Failed to seed comment (${comment.cid}) IPNS (${comment.ipnsName}) due to error`, e); + } + } + } } + // Pin cids that are not already pinned const newCidsToPin = lodash.difference(lodash.uniq(allCidsToPin), pinnedCids); if (newCidsToPin.length > 0) { + log.trace(`Attempting to pin ${newCidsToPin.length} comments' cids from sub (${sub.address}): `, newCidsToPin); const defaultIpfsClient = Object.values(sub.plebbit.clients.ipfsClients)[0]; assert(defaultIpfsClient); await defaultIpfsClient._client.pin.addAll(newCidsToPin); @@ -85,4 +86,5 @@ export async function seedSubplebbits(subAddresses: string[], plebbit: Plebbit) log.error(`Failed to load and seed sub (${subAddress}):`, String(e)); } } + log(`Finished this round of seeding. Will seed again later`); } diff --git a/src/api/server.ts b/src/api/server.ts index fbe4c82..d84087f 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -102,9 +102,11 @@ export async function startApi( console.log(`You can find Plebbit API documentation at: http://localhost:${plebbitApiPort}/api/v0/docs`); console.log(`Plebbit data path: ${path.resolve(sharedSingleton.plebbit.dataPath)}`); if (Array.isArray(seedSubs)) { + const seedSubsLoop = () => { + seedSubplebbits(seedSubs, sharedSingleton.plebbit).then(() => setTimeout(seedSubsLoop, 600000)); // Seed subs every 10 minutes + }; console.log(`Seeding subplebbits:`, seedSubs); - seedSubplebbits(seedSubs, sharedSingleton.plebbit); - setInterval(() => seedSubplebbits(seedSubs, sharedSingleton.plebbit), 600000); // Seed subs every 10 minutes + seedSubsLoop(); } }); }