Skip to content

Commit

Permalink
batch processing in cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
ayasayadi1 committed Oct 24, 2023
1 parent 31e3790 commit f5bf219
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions bin/scripts/fix-data/recheck_merges.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,34 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
var collections = await getDrillCollections(app._id);
//start session
const session = await countlyDb.client.startSession();
//create batch cursor
var batchRemaining = true;
var batchStart = 0;
var batchSize = 100;
const usersCollection = session.client.db("countly").collection('app_users' + app._id);
const usersCursor = usersCollection.find({merges: {$gt: 0}}, {_id: 1, uid: 1, merged_uid: 1}).addCursorFlag('noCursorTimeout', true);
var refreshTimestamp = new Date();
//for each user
while (await usersCursor.hasNext()) {
if ((new Date() - refreshTimestamp) / 1000 > 300) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
//while there are users left
while (batchRemaining) {
var usersCursor = usersCollection.find({merges: {$gt: 0}}, {_id: 1, uid: 1, merged_uid: 1}).skip(batchStart).limit(batchSize).addCursorFlag('noCursorTimeout', true);
var refreshTimestamp = new Date();
if (!await usersCursor.hasNext()) {
batchRemaining = false;
}
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
else {
//increment batch start
batchStart += batchSize;
//for each user
while (await usersCursor.hasNext()) {
if ((new Date() - refreshTimestamp) / 1000 > 0.001) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
}
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
}
}
}
}
session.endSession();
Expand Down

0 comments on commit f5bf219

Please sign in to comment.