Skip to content

Commit

Permalink
matching: don't publish unmatched AUS IstFahrts 💥
Browse files Browse the repository at this point in the history
  • Loading branch information
derhuerst committed Nov 25, 2024
1 parent 442c7ea commit 03afc92
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
52 changes: 29 additions & 23 deletions lib/match.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
natsConsumerName,
natsAckWait, // in milliseconds
matchConcurrency,
publishUnmatchedTripUpdates,
} = {
natsConsumerName: process.env.MATCHING_CONSUMER_NAME
? process.env.MATCHING_CONSUMER_NAME
Expand All @@ -82,6 +83,9 @@ const runGtfsMatching = async (cfg, opt = {}) => {
// SELECT num_cores FROM cpu_cores LIMIT 1
// same as with hafas-gtfs-rt-feed: https://github.com/derhuerst/hafas-gtfs-rt-feed/blob/8.2.6/lib/match.js#L54-L61
: osCpus().length + 1,
publishUnmatchedTripUpdates: process.env.MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES
? process.env.MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES === 'true'
: false,
...opt,
}
ok(Number.isInteger(natsAckWait), 'opt.natsAckWait must be an integer')
Expand Down Expand Up @@ -182,31 +186,33 @@ const runGtfsMatching = async (cfg, opt = {}) => {
matchingTime,
} = await matchVdvAusIstFahrtWithGtfs(vdvAusIstFahrt)

const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate)
const tPublished = Date.now()
if (isMatched || publishUnmatchedTripUpdates) {
const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate)
const tPublished = Date.now()

logger.trace({
topic,
isMatched,
isCached,
matchingTime,
gtfsRtTripUpdate,
// todo: log just a slice?
vdvAusIstFahrt,
natsMsgSeq: msg.seq,
}, 'publishing GTFS-RT TripUpdate')
natsClient.publish(topic, natsJson.encode(gtfsRtTripUpdate))
logger.trace({
topic,
isMatched,
isCached,
matchingTime,
gtfsRtTripUpdate,
// todo: log just a slice?
vdvAusIstFahrt,
natsMsgSeq: msg.seq,
}, 'publishing GTFS-RT TripUpdate')
natsClient.publish(topic, natsJson.encode(gtfsRtTripUpdate))

// update NATS metrics
{
// We slice() to keep the cardinality low in case of a bug.
const topic_root = (topic.split('.')[0] || '').slice(0, 7)
natsNrOfMessagesSentTotal.inc({
topic_root,
})
natsLatestMessageSentTimestampSeconds.set({
topic_root,
}, tPublished / 1000)
// update NATS metrics
{
// We slice() to keep the cardinality low in case of a bug.
const topic_root = (topic.split('.')[0] || '').slice(0, 7)
natsNrOfMessagesSentTotal.inc({
topic_root,
})
natsLatestMessageSentTimestampSeconds.set({
topic_root,
}, tPublished / 1000)
}
}

if (isMatched) {
Expand Down
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ todo: `$METRICS_SERVER_PORT`
todo: `$MATCHING_CONCURRENCY`
todo: `$MATCH_GTFS_RT_TO_GTFS_CACHING`
todo: `$MATCHING_CONSUMER_NAME`
todo: `$MATCHING_PUBLISH_UNMATCHED_TRIPUPDATES`
todo: `$PG_POOL_SIZE`

### Alternative: Docker Compose setup
Expand Down

0 comments on commit 03afc92

Please sign in to comment.