Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SER-849] Ensure data consistency when user merging #4788

Merged
merged 14 commits into from
Dec 21, 2023
2 changes: 1 addition & 1 deletion api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ plugins.connectToAllDatabases().then(function() {
jobs.job('api:clearTokens').replace().schedule('every 1 day');
jobs.job('api:clearAutoTasks').replace().schedule('every 1 day');
jobs.job('api:task').replace().schedule('every 5 minutes');
//jobs.job('api:userMerge').replace().schedule('every 1 hour on the 10th min');
jobs.job('api:userMerge').replace().schedule('every 10 minutes');
//jobs.job('api:appExpire').replace().schedule('every 1 day');
}, 10000);
}
Expand Down
226 changes: 161 additions & 65 deletions api/jobs/userMerge.js
Original file line number Diff line number Diff line change
@@ -1,84 +1,180 @@
'use strict';

const job = require('../parts/jobs/job.js'),
async = require('async'),
moment = require('moment'),
plugins = require('../../plugins/pluginManager.js'),
log = require('../utils/log.js')('job:userMerge');
var Promise = require("bluebird");
var usersApi = require('../parts/mgmt/app_users.js');

/** Class for the user mergind job **/
class UserMergeJob extends job.Job {
/**
* Run the job
* @param {Db} db connection
* @param {done} done callback
*/
run(db, done) {
log.d('Merging users ...');
var startTime = moment().subtract(1, 'hour').startOf('hour');
var endTime = moment(startTime).endOf("hour");
log.d("query from", startTime, "to", endTime);
/**
* read historical merges for the lest hour and process them
* @param {object} app - app db document
* @param {object} callback - when procssing finished
**/
function handleMerge(app, callback) {
db.collection('app_user_merges' + app._id).find({
cd: {
$gte: startTime.toDate(),
$lte: endTime.toDate()
}
}).toArray(function(err, res) {
if (!err && res && res.length) {
log.d('Found merges for ' + app._id + ' ', res);
var merged = [];
async.eachSeries(res, function(user, done2) {
var handleMerges = function(db, callback) {
log.d('looking for unfinished merges ...');

var date = Math.round(new Date().getTime() / 1000) - 60;
db.collection('app_user_merges').find({"lu": {"$lt": date}}).limit(100).toArray(function(err, mergedocs) {
if (err) {
callback(err);
}
if (mergedocs && mergedocs.length > 0) {
log.d('found ' + mergedocs.length + ' unfinished merges');
Promise.each(mergedocs, function(user) {
return new Promise((resolve)=>{
var dd = user._id.split("_");
if (dd.length !== 3) {
log.e("deleting unexpected document in merges with bad _id: " + user._id);
db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
if (err2) {
log.e("error deleting document in merges with bad _id: " + user._id);
log.e(err2);
}
resolve();
});
}
else if (user.t > 100) {
log.e("deleting document in merges with too many retries: " + user._id);
db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
if (err2) {
log.e("error deleting document in merges with _id: " + user._id);
log.e(err2);
}
resolve();
});
}
else {
var app_id = dd[0];
var olduid = dd[2];
//user docuument is not saved merged - try merginfg it at first
if (user.merged_to) {
merged.push(user._id);
log.d('Dispatching', {
app_id: app._id + "",
oldUser: {uid: user._id},
newUser: {uid: user.merged_to}
});
plugins.dispatch("/i/device_id", {
app_id: app._id + "",
oldUser: {uid: user._id},
newUser: {uid: user.merged_to}
}, function() {
done2();
});
}
else {
done2();
}
}, function() {
//delete merged users if they still exist
if (merged.length) {
db.collection("app_users" + app._id).remove({uid: {$in: merged}}, function() {
callback();
});
if (!user.u) { //user documents are not merged. Could be just failed state.
log.e("user doc not saved as merged. Processing it.");
db.collection('app_users' + app_id).find({"uid": {"$in": [olduid, user.merged_to]}}).toArray((err5, docs)=>{
if (err5) {
log.e("error fetching users for merge", err5);
resolve();
return;
}
var oldAppUser;
var newAppUser;
for (var z = 0; z < docs.length;z++) {
if (docs[z].uid === olduid) {
oldAppUser = docs[z];
}
if (docs[z].uid === user.merged_to) {
newAppUser = docs[z];
}
}
if (!oldAppUser && newAppUser) {
//old user was merged to new user, but state update failed - we can mark it as merged and process other plugins
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"mc": true, "cc": true, "u": true}, resolve);
}
if (!newAppUser) {
//new user do not exists - we can delete merging record
db.collection('app_user_merges').remove({"_id": user._id}, (err4)=>{
if (err4) {
log.e("error deleting document in merges with bad _id: " + user._id);
log.e(err4);
}
resolve();
});
}
else if (oldAppUser && newAppUser) {
db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
if (err0) {
log.e(err0);
}
//Both documents exists. We can assume that documents were not merged
plugins.dispatch("/i/user_merge", {
app_id: app_id,
newAppUser: newAppUser,
oldAppUser: oldAppUser
}, function() {
//merge user data
usersApi.mergeUserProperties(newAppUser, oldAppUser);
//update new user

db.collection('app_users' + app_id).update({_id: newAppUser._id}, {'$set': newAppUser}, function(err6) {
if (callback && typeof callback === 'function') {
callback(null, newAppUser);//we do not return error as merge is already registred. Doc merging will be retried in job.
}
//Dispatch to other plugins only after callback.
if (!err6) {
//update metric changes document
db.collection("metric_changes" + app_id).update({uid: oldAppUser.uid}, {'$set': {uid: newAppUser.uid}}, {multi: true}, function(err7) {
if (err7) {
log.e("Failed metric changes update in app_users merge", err7);
}
});
//delete old app users document
db.collection('app_users' + app_id).remove({_id: oldAppUser._id}, function(errRemoving) {
if (errRemoving) {
log.e("Failed to remove merged user from database", errRemoving);
}
else {
usersApi.mergeOtherPlugins(db, app_id, newAppUser, oldAppUser, {"cc": true, "u": true}, resolve);
}
});
}
});
});
});
}
});
}
else if (!user.mc) { //documents are merged, but metric changes and other plugins are not yet
db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
if (err0) {
log.e(err0);
}
db.collection("metric_changes" + app_id).update({uid: olduid}, {'$set': {uid: usersApi.merged_to}}, {multi: true}, function(err7) {
if (err7) {
log.e("Failed metric changes update in app_users merge", err7);
}
else {
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"cc": true, "mc": true}, resolve);
}
});
});
}
else {
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"cc": true}, resolve);
}
}
else {
callback();
resolve();
}
});
}
});
}).then(()=>{
if (mergedocs.length === 100) {
setTimeout(()=>{
handleMerges(db, callback);
}, 0); //To do not grow stack.
}
else {
callback();
}
}).catch((errThrown)=>{
log.e("finished with errors");
log.e(errThrown);
callback(errThrown);
});
}
db.collection('apps').find({}, {_id: 1}).toArray(function(err, apps) {
if (!err && apps && apps.length) {
async.eachSeries(apps, handleMerge, function() {
log.d('Merging users finished ...');
done();
});
}
else {
done(err);
}
else {
log.d('all users merged');
callback();
}
});
};
/** Class for the user mergind job **/
class UserMergeJob extends job.Job {
/**
* Run the job
* @param {Db} db connection
* @param {done} done callback
*/
run(db, done) {
log.d('finishing up not finished merges merges...');
handleMerges(db, ()=>{
done();
});
}
}
Expand Down
Loading
Loading