Skip to content

Commit

Permalink
restart cursor process upon failure
Browse files Browse the repository at this point in the history
  • Loading branch information
ayasayadi1 committed Nov 2, 2023
1 parent 3218c19 commit 2953076
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions bin/scripts/fix-data/recheck_merges_new.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,7 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
//for each app serially process users
asyncjs.eachSeries(apps, async function(app) {
console.log("Processing app: ", app.name);
var usersCursor;
//get all drill collections for this app
var collections = await getDrillCollections(app._id);
//start session
const session = await countlyDb.client.startSession();
//get users collection
const usersCollection = session.client.db("countly").collection('app_users' + app._id);
try {
usersCursor = generateCursor(usersCollection);
try {
var refreshTimestamp = new Date();
//for each user
while (usersCursor && await usersCursor.hasNext()) {
//refresh session every 5 minutes
if ((new Date() - refreshTimestamp) / 1000 > 300) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
}
//get next user
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
}
//if cursor is closed, recreate it and skip processed users
if (usersCursor.isClosed()) {
usersCursor = generateCursor(usersCollection);
}
await addRecheckedFlag(app._id, user.uid);
}
session.endSession();
}
catch (err) {
console.log("Cursor error: ", err);
usersCursor = generateCursor(usersCollection);
}
}
catch (err) {
console.log("Could not get cursor for app ", app.name, "error: ", err);
return close(err);
}
await processCursor(app);

}, function(err) {
return close(err);
Expand Down Expand Up @@ -165,6 +124,46 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
);
}

async function processCursor(app) {
//get all drill collections for this app
var drillCollections = await getDrillCollections(app._id);
//start session
const session = await countlyDb.client.startSession();
//get users collection
const usersCollection = session.client.db("countly").collection('app_users' + app._id);
//create cursor
var usersCursor = generateCursor(usersCollection);
try {
var refreshTimestamp = new Date();
//for each user
while (usersCursor && await usersCursor.hasNext()) {
//refresh session every 5 minutes
if ((new Date() - refreshTimestamp) / 1000 > 300) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
}
//get next user
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, drillCollections, app);
}
//if cursor is closed, recreate it and skip processed users
if (usersCursor.isClosed()) {
usersCursor = generateCursor(usersCollection);
}
await addRecheckedFlag(app._id, user.uid);
}
session.endSession();
}
catch (err) {
console.log("Cursor error: ", err);
console.log("Restarting cursor process...");
processCursor(app);
}
}

function close(err) {
countlyDb.close();
drillDb.close();
Expand Down

0 comments on commit 2953076

Please sign in to comment.