Skip to content

Commit

Permalink
Mongofw: wait first db sync before starting backfilling.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed Jun 3, 2024
1 parent 1e3e04f commit dd42953
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions src/mongofw/customized_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,29 @@ module.exports.CustomProcessor = async function (
Log.log('Collection backfillData already exists or error creating.')
})

// enqueue point database sync data
;(function callEnqueueDbSync() {
// enqueue point database sync data for UDP sending
async function callEnqueueDbSync() {
if (clientMongo === null) return
enqueueDbSync(db, Redundancy, MongoStatus)
await enqueueDbSync(db, Redundancy, MongoStatus)
setTimeout(callEnqueueDbSync, 1000 * AppDefs.INTERVAL_INTEGRITY)
})()
}
await callEnqueueDbSync()

// consume queue of historical backfill changes, writing to mongodb backfillData collection
;(function callBackfillDequeue() {
function callBackfillDequeue() {
if (clientMongo === null) return
backfillDequeue(db, Redundancy, MongoStatus)
setTimeout(callBackfillDequeue, 1013)
})()
}
callBackfillDequeue()

// replay backfill data, enqueue to changes queue
;(function callReplayBackfill() {
// replay backfill data (from mongodb backfillData collection), enqueue to changes queue for UDP sending
function callReplayBackfill() {
if (clientMongo === null) return
replayBackfill(db, Redundancy, MongoStatus)
setTimeout(callReplayBackfill, 1000 * AppDefs.INTERVAL_INTEGRITY)
})()
}
callReplayBackfill()

// set up change streams monitoring for updates
const changeStreamUserActions = db
Expand Down Expand Up @@ -179,6 +182,8 @@ async function enqueueDbSync(db, Redundancy, MongoStatus) {
if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected)
return // do nothing if process is inactive

Log.log('Begin enqueue of db sync data...')
let cnt = 0
const findResult = db.collection(RealtimeDataCollectionName).find({})
for await (const doc of findResult) {
if (doc?.sourceDataUpdate) delete doc.sourceDataUpdate
Expand All @@ -200,7 +205,9 @@ async function enqueueDbSync(db, Redundancy, MongoStatus) {
documentKey: { _id: doc._id },
updateDescription: { updatedFields: { ...doc } },
})
cnt++
}
Log.log('End enqueueing of db sync data - ' + cnt + ' entries')
}

// insert documents queued in backfillQueue collection
Expand Down

0 comments on commit dd42953

Please sign in to comment.