From 72e5ac0276c2421af94b1a93403b48e094d77e5e Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Thu, 13 Jun 2024 16:43:06 -0300 Subject: [PATCH] Fixed update and added bulk updates on cs_data_processor. --- src/cs_data_processor/cs_data_processor.js | 45 +++++++++++++--------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/cs_data_processor/cs_data_processor.js b/src/cs_data_processor/cs_data_processor.js index 12cacf84..a2d946ab 100644 --- a/src/cs_data_processor/cs_data_processor.js +++ b/src/cs_data_processor/cs_data_processor.js @@ -134,29 +134,38 @@ const pipeline = [ }, 17317) setInterval(async function () { - if (!collection || !MongoStatus.HintMongoIsConnected) return + if ( + !collection || + !MongoStatus.HintMongoIsConnected || + mongoRtDataQueue.isEmpty() + ) + return let cnt = 0 + let updArr = [] while (!mongoRtDataQueue.isEmpty()) { - let upd = mongoRtDataQueue.peek() - delete upd._id // remove _id for update - collection - .updateOne( - { _id: upd._id }, - { - $set: upd, - }, - { - writeConcern: { - w: 0, - }, - } - ) - .catch(function (err) { - Log.log('Error on Mongodb query!', err) - }) + const upd = mongoRtDataQueue.peek() mongoRtDataQueue.dequeue() + const _id = upd._id + delete upd._id // remove _id for update + updArr.push({ + updateOne: { + filter: { _id: _id }, + update: { $set: upd }, + }, + }) cnt++ } + const res = await collection + .bulkWrite(updArr, { + ordered: false, + writeConcern: { + w: 0, + }, + }) + .catch(function (err) { + Log.log('Error on Mongodb query!', err) + }) + // Log.log(JSON.stringify(res)) if (cnt) Log.log('Mongo Updates ' + cnt) }, 150)