From 03afc92a19ce6700fdbcd10cd97e1316c6928715 Mon Sep 17 00:00:00 2001 From: Jannis R Date: Mon, 25 Nov 2024 16:33:50 +0100 Subject: [PATCH] =?UTF-8?q?matching:=20don't=20publish=20unmatched=20AUS?= =?UTF-8?q?=20IstFahrts=20=F0=9F=92=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/match.js | 52 +++++++++++++++++++++++++++++----------------------- readme.md | 1 + 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/lib/match.js b/lib/match.js index 1808a3a..d2d12e1 100644 --- a/lib/match.js +++ b/lib/match.js @@ -66,6 +66,7 @@ const runGtfsMatching = async (cfg, opt = {}) => { natsConsumerName, natsAckWait, // in milliseconds matchConcurrency, + publishUnmatchedTripUpdates, } = { natsConsumerName: process.env.MATCHING_CONSUMER_NAME ? process.env.MATCHING_CONSUMER_NAME @@ -82,6 +83,9 @@ const runGtfsMatching = async (cfg, opt = {}) => { // SELECT num_cores FROM cpu_cores LIMIT 1 // same as with hafas-gtfs-rt-feed: https://github.com/derhuerst/hafas-gtfs-rt-feed/blob/8.2.6/lib/match.js#L54-L61 : osCpus().length + 1, + publishUnmatchedTripUpdates: process.env.MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES + ? process.env.MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES === 'true' + : false, ...opt, } ok(Number.isInteger(natsAckWait), 'opt.natsAckWait must be an integer') @@ -182,31 +186,33 @@ const runGtfsMatching = async (cfg, opt = {}) => { matchingTime, } = await matchVdvAusIstFahrtWithGtfs(vdvAusIstFahrt) - const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate) - const tPublished = Date.now() + if (isMatched || publishUnmatchedTripUpdates) { + const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate) + const tPublished = Date.now() - logger.trace({ - topic, - isMatched, - isCached, - matchingTime, - gtfsRtTripUpdate, - // todo: log just a slice? - vdvAusIstFahrt, - natsMsgSeq: msg.seq, - }, 'publishing GTFS-RT TripUpdate') - natsClient.publish(topic, natsJson.encode(gtfsRtTripUpdate)) + logger.trace({ + topic, + isMatched, + isCached, + matchingTime, + gtfsRtTripUpdate, + // todo: log just a slice? + vdvAusIstFahrt, + natsMsgSeq: msg.seq, + }, 'publishing GTFS-RT TripUpdate') + natsClient.publish(topic, natsJson.encode(gtfsRtTripUpdate)) - // update NATS metrics - { - // We slice() to keep the cardinality low in case of a bug. - const topic_root = (topic.split('.')[0] || '').slice(0, 7) - natsNrOfMessagesSentTotal.inc({ - topic_root, - }) - natsLatestMessageSentTimestampSeconds.set({ - topic_root, - }, tPublished / 1000) + // update NATS metrics + { + // We slice() to keep the cardinality low in case of a bug. + const topic_root = (topic.split('.')[0] || '').slice(0, 7) + natsNrOfMessagesSentTotal.inc({ + topic_root, + }) + natsLatestMessageSentTimestampSeconds.set({ + topic_root, + }, tPublished / 1000) + } } if (isMatched) { diff --git a/readme.md b/readme.md index aaafa68..b4a26e3 100644 --- a/readme.md +++ b/readme.md @@ -281,6 +281,7 @@ todo: `$METRICS_SERVER_PORT` todo: `$MATCHING_CONCURRENCY` todo: `$MATCH_GTFS_RT_TO_GTFS_CACHING` todo: `$MATCHING_CONSUMER_NAME` +todo: `$MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES` todo: `$PG_POOL_SIZE` ### Alternative: Docker Compose setup