Skip to content

Commit

Permalink
recreate cursor when closed
Browse files Browse the repository at this point in the history
  • Loading branch information
ayasayadi1 committed Oct 26, 2023
1 parent f5bf219 commit 59ac762
Showing 1 changed file with 24 additions and 34 deletions.
58 changes: 24 additions & 34 deletions bin/scripts/fix-data/recheck_merges.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,27 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
console.log("Processing app: ", app.name);
//get all drill collections for this app
var collections = await getDrillCollections(app._id);
//start session
const session = await countlyDb.client.startSession();
//create batch cursor
var batchRemaining = true;
var batchStart = 0;
var batchSize = 100;
const usersCollection = session.client.db("countly").collection('app_users' + app._id);
//while there are users left
while (batchRemaining) {
var usersCursor = usersCollection.find({merges: {$gt: 0}}, {_id: 1, uid: 1, merged_uid: 1}).skip(batchStart).limit(batchSize).addCursorFlag('noCursorTimeout', true);
var refreshTimestamp = new Date();
if (!await usersCursor.hasNext()) {
batchRemaining = false;
//cursor
const usersCollection = countlyDb.collection('app_users' + app._id);
var usersCursor = usersCollection.find({merges: {$gt: 0}}, {_id: 1, uid: 1, merged_uid: 1});
//processed users count
var processedUsersCount = 0;
//for each user
while (usersCursor && await usersCursor.hasNext()) {
//increment processed users count
processedUsersCount++;
//get next user
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
}
else {
//increment batch start
batchStart += batchSize;
//for each user
while (await usersCursor.hasNext()) {
if ((new Date() - refreshTimestamp) / 1000 > 0.001) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
}
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
}
}
//if cursor is closed, recreate it and skip processed users
if (!usersCursor) {
usersCursor = usersCollection.find({merges: {$gt: 0}}, {_id: 1, uid: 1, merged_uid: 1}).skip(processedUsersCount);
}
}
session.endSession();

}, function(err) {
return close(err);
});
Expand Down Expand Up @@ -145,11 +133,13 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
}

function close(err) {
if (err) {
console.log("Error: ", err);
}
countlyDb.close();
drillDb.close();
console.log("Done.");
if (err) {
console.log("Finished with errors: ", err);
}
else {
console.log("Finished successfully.");
}
}
});

0 comments on commit 59ac762

Please sign in to comment.