Skip to content

Commit

Permalink
perf(daemon): optimize seeding
Browse files Browse the repository at this point in the history
  • Loading branch information
Rinse12 committed Aug 17, 2023
1 parent d8828ae commit fa27931
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
58 changes: 30 additions & 28 deletions src/api/seeder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@ import { Plebbit } from "@plebbit/plebbit-js/dist/node/plebbit";
import { Subplebbit } from "@plebbit/plebbit-js/dist/node/subplebbit";
import { BasePages } from "@plebbit/plebbit-js/dist/node/pages";
import { Comment } from "@plebbit/plebbit-js/dist/node/comment";
import pLimit from "p-limit";
import assert from "assert";

async function _loadAllPages(pageCid: string, pagesInstance: BasePages): Promise<Comment[]> {
let sortedCommentsPage = await pagesInstance.getPage(pageCid);
let sortedComments: Comment[] = sortedCommentsPage.comments;
while (sortedCommentsPage.nextCid) {
sortedCommentsPage = await pagesInstance.getPage(sortedCommentsPage.nextCid);
sortedComments = sortedComments.concat(sortedCommentsPage.comments);
const log = Logger("plebbit-cli:server:seed:_loadAllPages");
try {
let sortedCommentsPage = await pagesInstance.getPage(pageCid);
let sortedComments: Comment[] = sortedCommentsPage.comments;
while (sortedCommentsPage.nextCid) {
sortedCommentsPage = await pagesInstance.getPage(sortedCommentsPage.nextCid);
sortedComments = sortedComments.concat(sortedCommentsPage.comments);
}

return sortedComments;
} catch (e) {
log.error(`Failed to load page (${pageCid}) of sub (${pagesInstance._subplebbitAddress}) due to error:`, e);
return [];
}
return sortedComments;
}

const seededIpns: Record<string, { lastSeededAt?: number }> = {};
async function _seedSub(sub: Subplebbit, pinnedCids: string[]) {
const log = Logger("plebbit-cli:server:seed");
if (sub.statsCid) await sub.plebbit.fetchCid(sub.statsCid); // Seed stats
Expand All @@ -36,31 +43,25 @@ async function _seedSub(sub: Subplebbit, pinnedCids: string[]) {
// Fetch all comments CID
allCidsToPin.push(...loadedPagesWithNames["new"].map((comment) => <string>comment.cid));

// Fetch all previousCommentCid of authors
allCidsToPin.push(
...loadedPagesWithNames["new"]
.filter((comment) => typeof comment.author.previousCommentCid === "string")
.map((comment) => <string>comment.author.previousCommentCid)
);

// Fetch all comments' CommentUpdate IPNS
const limit = pLimit(30); // Can only fetch 30 IPNS at a time
// TODO don't load all IPNS, instead only load the ones whose updatedAt did not change. We need to store all updatedAt somewhere
const ipnsRes = await Promise.allSettled(
loadedPagesWithNames["new"].map((comment) =>
limit(() => sub.plebbit._clientsManager.fetchFromMultipleGateways({ ipns: <string>comment.ipnsName }, "subplebbit"))
)
);

log.trace(
`Loaded and seeded ${ipnsRes.filter((res) => res.status === "fulfilled").length} CommentUpdate and failed to seed ${
ipnsRes.filter((res) => res.status === "rejected").length
} of sub (${sub.address})`
);
// Seed IPNS
for (const comment of loadedPagesWithNames["new"]) {
assert(comment.ipnsName);
if (seededIpns[comment.ipnsName]?.lastSeededAt !== comment.updatedAt) {
try {
await comment._clientsManager.fetchCommentUpdate(comment.ipnsName);
seededIpns[comment.ipnsName] = { lastSeededAt: comment.updatedAt };
log.trace(`Seeded comment (${comment.cid}) IPNS (${comment.ipnsName})`);
} catch (e) {
log.error(`Failed to seed comment (${comment.cid}) IPNS (${comment.ipnsName}) due to error`, e);
}
}
}
}

// Pin cids that are not already pinned
const newCidsToPin = lodash.difference(lodash.uniq(allCidsToPin), pinnedCids);
if (newCidsToPin.length > 0) {
log.trace(`Attempting to pin ${newCidsToPin.length} comments' cids from sub (${sub.address}): `, newCidsToPin);
const defaultIpfsClient = Object.values(sub.plebbit.clients.ipfsClients)[0];
assert(defaultIpfsClient);
await defaultIpfsClient._client.pin.addAll(newCidsToPin);
Expand All @@ -85,4 +86,5 @@ export async function seedSubplebbits(subAddresses: string[], plebbit: Plebbit)
log.error(`Failed to load and seed sub (${subAddress}):`, String(e));
}
}
log(`Finished this round of seeding. Will seed again later`);
}
6 changes: 4 additions & 2 deletions src/api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ export async function startApi(
console.log(`You can find Plebbit API documentation at: http://localhost:${plebbitApiPort}/api/v0/docs`);
console.log(`Plebbit data path: ${path.resolve(<string>sharedSingleton.plebbit.dataPath)}`);
if (Array.isArray(seedSubs)) {
const seedSubsLoop = () => {
seedSubplebbits(seedSubs, sharedSingleton.plebbit).then(() => setTimeout(seedSubsLoop, 600000)); // Seed subs every 10 minutes
};
console.log(`Seeding subplebbits:`, seedSubs);
seedSubplebbits(seedSubs, sharedSingleton.plebbit);
setInterval(() => seedSubplebbits(seedSubs, sharedSingleton.plebbit), 600000); // Seed subs every 10 minutes
seedSubsLoop();
}
});
}

0 comments on commit fa27931

Please sign in to comment.