Skip to content

Commit

Permalink
Mongofw: reduced printing of logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed Jun 3, 2024
1 parent dd42953 commit 0d80432
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/mongofw/app-defs.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ module.exports = {
INTERVAL_INTEGRITY: 60*60, // interval in seconds for point database integrity
BACKFILL_EXPIRATION: 7, // time to preserve backfill data in days
BACKFILL_REPLAY_INTERVAL: 3, // time to auto replay backfill data in days
BACKFILL_DOCS_PER_SEC: 1000, // max number of documents per second to replay backfill data
BACKFILL_DOCS_PER_SEC: 500, // max number of documents per second to replay backfill data
}
63 changes: 33 additions & 30 deletions src/mongofw/customized_module.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
/*
* One way replication mechanism (mongofw/mongowr)
* Watch for changes via changestream of realtimeData/sourceDataUpdate, send changes via UDP.
* Query realtimeData for full point list sync.
* Store changes on backfillData (timeseries collection) to keep replaying previous changes from a defined period.
* Replayed changes are marked by the isHistorical=true flag.
*
* {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
/*
* One way replication mechanism (mongofw/mongowr)
* Watch for changes via changestream of realtimeData/sourceDataUpdate, send changes via UDP.
* Query realtimeData for full point list sync.
* Store changes on backfillData (timeseries collection) to keep replaying previous changes from a defined period.
* Replayed changes are marked by the isHistorical=true flag.
*
* {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

'use strict'
const Log = require('./simple-logger')
Expand Down Expand Up @@ -109,7 +109,7 @@ module.exports.CustomProcessor = async function (
}
callBackfillDequeue()

// replay backfill data (from mongodb backfillData collection), enqueue to changes queue for UDP sending
// replay backfill data (from mongodb backfillData collection), enqueue to changes queue for UDP sending
function callReplayBackfill() {
if (clientMongo === null) return
replayBackfill(db, Redundancy, MongoStatus)
Expand Down Expand Up @@ -301,8 +301,8 @@ let pktCnt = 0
// const message = deflate.process(opData)
const message = zlib.deflateSync(opData)

Log.log(opData.length + ' ' + message.length)
Log.log('Objects: ' + fwArr.length)
// Log.log(opData.length + ' ' + message.length)
// Log.log('Objects: ' + fwArr.length)

const buff = Buffer.from(message)
await udpSocket.send(
Expand All @@ -322,14 +322,6 @@ let pktCnt = 0
if (AppDefs.PACKETS_INTERVAL > 0) {
await sleep(AppDefs.PACKETS_INTERVAL)
}
// Log.log('Data sent via UDP' + opData);
Log.log('Backfill Queue Size: ' + backfillQueue.size())
Log.log('Changes Queue Size: ' + chgQueue.size())
Log.log('UDP Msg Size: ' + buff.length)
Log.log('MaxMsg Size: ' + maxSz)
Log.log('Seq count ' + cntSeq++)
Log.log(' Chg count ' + cntChg)
Log.log(' UDP count ' + pktCnt)
if (
cntSeq > AppDefs.MAX_SEQUENCE_OF_UDPMSGS ||
buff.length > AppDefs.PACKET_SIZE_BREAK_SEQ
Expand All @@ -341,6 +333,14 @@ let pktCnt = 0
return
}
}

// Log.log('Data sent via UDP' + opData);
Log.log('Backfill Queue Size: ' + backfillQueue.size())
Log.log('Changes Queue Size: ' + chgQueue.size())
Log.log('MaxMsg Size: ' + maxSz)
Log.log('Seq count ' + cntSeq++)
Log.log(' Chg count ' + cntChg)
Log.log(' UDP count ' + pktCnt)

setTimeout(dequeueChangesSend, AppDefs.INTERVAL_AFTER_EMPTY_QUEUE)
})()
Expand All @@ -357,13 +357,16 @@ async function replayBackfill(db, Redundancy, MongoStatus) {
// retrieve and enqueue data from moving window of 'limit' entries
let skip = 0
for (;;) {

// query from backfillData up to 'limit' entries, when change queue empty double enqueue rate
const findResult = db
.collection(BackfillDataCollectionName)
.find(
{ timestamp: { $gt: replayBegin, $lte: replayEnd } },
{ sort: { timestamp: 1 }, limit: limit + (chgQueue.isEmpty()?limit:0), skip: skip }
{
sort: { timestamp: 1 },
limit: limit + (chgQueue.isEmpty() ? limit : 0),
skip: skip,
}
)

let cntDocs = 0
Expand All @@ -373,7 +376,7 @@ async function replayBackfill(db, Redundancy, MongoStatus) {
cntDocs++
if (!doc?.data?.updateDescription?.updatedFields?.sourceDataUpdate)
continue
timestamp = doc?.timestamp
timestamp = doc?.timestamp
// console.log(">>>>>> " + skip + " - " + timestamp)
doc.data.updateDescription.updatedFields.sourceDataUpdate.isHistorical = true
chgQueue.enqueue(doc.data)
Expand Down

0 comments on commit 0d80432

Please sign in to comment.