From 0d804324c4c5622917edbfdacb2edeebd1231cbf Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Sun, 2 Jun 2024 22:56:41 -0300 Subject: [PATCH] Mongofw: reduced printing of logs. --- src/mongofw/app-defs.js | 2 +- src/mongofw/customized_module.js | 63 +++++++++++++++++--------------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/mongofw/app-defs.js b/src/mongofw/app-defs.js index 1eb6a3b4..cccc42c9 100644 --- a/src/mongofw/app-defs.js +++ b/src/mongofw/app-defs.js @@ -34,5 +34,5 @@ module.exports = { INTERVAL_INTEGRITY: 60*60, // interval in seconds for point database integrity BACKFILL_EXPIRATION: 7, // time to preserve backfill data in days BACKFILL_REPLAY_INTERVAL: 3, // time to auto replay backfill data in days - BACKFILL_DOCS_PER_SEC: 1000, // max number of documents per second to replay backfill data + BACKFILL_DOCS_PER_SEC: 500, // max number of documents per second to replay backfill data } diff --git a/src/mongofw/customized_module.js b/src/mongofw/customized_module.js index 0f2b35bb..56d3a04b 100644 --- a/src/mongofw/customized_module.js +++ b/src/mongofw/customized_module.js @@ -1,25 +1,25 @@ -/* -* One way replication mechanism (mongofw/mongowr) -* Watch for changes via changestream of realtimeData/sourceDataUpdate, send changes via UDP. -* Query realtimeData for full point list sync. -* Store changes on backfillData (timeseries collection) to keep replaying previous changes from a defined period. -* Replayed changes are marked by the isHistorical=true flag. -* -* {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen -* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by +/* + * One way replication mechanism (mongofw/mongowr) + * Watch for changes via changestream of realtimeData/sourceDataUpdate, send changes via UDP. + * Query realtimeData for full point list sync. + * Store changes on backfillData (timeseries collection) to keep replaying previous changes from a defined period. + * Replayed changes are marked by the isHistorical=true flag. + * + * {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by * the Free Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with this program. If not, see . -*/ + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ 'use strict' const Log = require('./simple-logger') @@ -109,7 +109,7 @@ module.exports.CustomProcessor = async function ( } callBackfillDequeue() - // replay backfill data (from mongodb backfillData collection), enqueue to changes queue for UDP sending + // replay backfill data (from mongodb backfillData collection), enqueue to changes queue for UDP sending function callReplayBackfill() { if (clientMongo === null) return replayBackfill(db, Redundancy, MongoStatus) @@ -301,8 +301,8 @@ let pktCnt = 0 // const message = deflate.process(opData) const message = zlib.deflateSync(opData) - Log.log(opData.length + ' ' + message.length) - Log.log('Objects: ' + fwArr.length) + // Log.log(opData.length + ' ' + message.length) + // Log.log('Objects: ' + fwArr.length) const buff = Buffer.from(message) await udpSocket.send( @@ -322,14 +322,6 @@ let pktCnt = 0 if (AppDefs.PACKETS_INTERVAL > 0) { await sleep(AppDefs.PACKETS_INTERVAL) } - // Log.log('Data sent via UDP' + opData); - Log.log('Backfill Queue Size: ' + backfillQueue.size()) - Log.log('Changes Queue Size: ' + chgQueue.size()) - Log.log('UDP Msg Size: ' + buff.length) - Log.log('MaxMsg Size: ' + maxSz) - Log.log('Seq count ' + cntSeq++) - Log.log(' Chg count ' + cntChg) - Log.log(' UDP count ' + pktCnt) if ( cntSeq > AppDefs.MAX_SEQUENCE_OF_UDPMSGS || buff.length > AppDefs.PACKET_SIZE_BREAK_SEQ @@ -341,6 +333,14 @@ let pktCnt = 0 return } } + + // Log.log('Data sent via UDP' + opData); + Log.log('Backfill Queue Size: ' + backfillQueue.size()) + Log.log('Changes Queue Size: ' + chgQueue.size()) + Log.log('MaxMsg Size: ' + maxSz) + Log.log('Seq count ' + cntSeq++) + Log.log(' Chg count ' + cntChg) + Log.log(' UDP count ' + pktCnt) setTimeout(dequeueChangesSend, AppDefs.INTERVAL_AFTER_EMPTY_QUEUE) })() @@ -357,13 +357,16 @@ async function replayBackfill(db, Redundancy, MongoStatus) { // retrieve and enqueue data from moving window of 'limit' entries let skip = 0 for (;;) { - // query from backfillData up to 'limit' entries, when change queue empty double enqueue rate const findResult = db .collection(BackfillDataCollectionName) .find( { timestamp: { $gt: replayBegin, $lte: replayEnd } }, - { sort: { timestamp: 1 }, limit: limit + (chgQueue.isEmpty()?limit:0), skip: skip } + { + sort: { timestamp: 1 }, + limit: limit + (chgQueue.isEmpty() ? limit : 0), + skip: skip, + } ) let cntDocs = 0 @@ -373,7 +376,7 @@ async function replayBackfill(db, Redundancy, MongoStatus) { cntDocs++ if (!doc?.data?.updateDescription?.updatedFields?.sourceDataUpdate) continue - timestamp = doc?.timestamp + timestamp = doc?.timestamp // console.log(">>>>>> " + skip + " - " + timestamp) doc.data.updateDescription.updatedFields.sourceDataUpdate.isHistorical = true chgQueue.enqueue(doc.data)