Skip to content

Commit

Permalink
refresh sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
ayasayadi1 committed Nov 2, 2023
1 parent 302b3fd commit 3218c19
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions bin/scripts/fix-data/recheck_merges_new.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,47 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
//for each app serially process users
asyncjs.eachSeries(apps, async function(app) {
console.log("Processing app: ", app.name);
var usersCursor;
//get all drill collections for this app
var collections = await getDrillCollections(app._id);
//cursor: get users with merges > 0 and merges_rechecked not set or set to false
var usersCursor = countlyDb.collection('app_users' + app._id).find(
{merges: {$gt: 0}, merges_rechecked: {$ne: true}},
{_id: 1, uid: 1, merged_uid: 1}
);
//for each user
while (usersCursor && await usersCursor.hasNext()) {
//get next user
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
//start session
const session = await countlyDb.client.startSession();
//get users collection
const usersCollection = session.client.db("countly").collection('app_users' + app._id);
try {
usersCursor = generateCursor(usersCollection);
try {
var refreshTimestamp = new Date();
//for each user
while (usersCursor && await usersCursor.hasNext()) {
//refresh session every 5 minutes
if ((new Date() - refreshTimestamp) / 1000 > 300) {
console.log("Refreshing session");
await session.client.db("countly").admin().command({ refreshSessions: [session.id] });
refreshTimestamp = new Date();
}
//get next user
const user = await usersCursor.next();
//check if old uid still exists in drill collections
if (user && user.merged_uid) {
await processUser(user.merged_uid, user.uid, collections, app);
}
//if cursor is closed, recreate it and skip processed users
if (usersCursor.isClosed()) {
usersCursor = generateCursor(usersCollection);
}
await addRecheckedFlag(app._id, user.uid);
}
session.endSession();
}
//if cursor is closed, recreate it and skip processed users
if (usersCursor.isClosed()) {
usersCursor = countlyDb.collection('app_users' + app._id).find(
{merges: {$gt: 0}, merges_rechecked: {$ne: true}},
{_id: 1, uid: 1, merged_uid: 1}
);
catch (err) {
console.log("Cursor error: ", err);
usersCursor = generateCursor(usersCollection);
}
await addRecheckedFlag(app._id, user.uid);
}
catch (err) {
console.log("Could not get cursor for app ", app.name, "error: ", err);
return close(err);
}

}, function(err) {
Expand Down Expand Up @@ -140,6 +158,13 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection("
}
}

function generateCursor(collection) {
return collection.find(
{merges: {$gt: 0}, merges_rechecked: {$ne: true}},
{_id: 1, uid: 1, merged_uid: 1}
);
}

function close(err) {
countlyDb.close();
drillDb.close();
Expand Down

0 comments on commit 3218c19

Please sign in to comment.