From 3d874b6f4cd33bd3df47891ddcd0a15e8308faea Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Thu, 30 Nov 2023 13:46:35 -0500 Subject: [PATCH 1/7] refactor(MonitorAllTripsJob): Add logging of trip queue size and analyzer status. --- .../tripmonitor/jobs/MonitorAllTripsJob.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index a2916f1a6..c464e01a1 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -26,6 +26,7 @@ public class MonitorAllTripsJob implements Runnable { @Override public void run() { + long start = System.currentTimeMillis(); LOG.info("MonitorAllTripsJob started"); // analyze all trips @@ -54,14 +55,28 @@ public void run() { } // wait for queue to deplete + int queueIterations = 0; while (tripAnalysisQueue.size() > 0) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); + queueIterations++; + // Report queue status every minute (unless this job finishes before). + int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; + if ((runMillis & 60000) == 0) { + LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000); + } } queueDepleted.set(true); // wait for analyzers to complete + int idleIterations = 0; while (!allAnalyzersAreIdle(analyzerStatuses)) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); + // Report analyzers statuses every minute (unless this job finishes before). + int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; + if ((runMillis & 60000) == 0) { + long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count(); + LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000); + } } } catch (InterruptedException e) { LOG.error("error encountered while waiting during MonitorAllTripsJob."); @@ -74,7 +89,7 @@ public void run() { // TODO report successful run to error & notification system - LOG.info("MonitorAllTripsJob completed"); + LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000); } /** From a1e9352406a25728dde7f5456c0ee2380f1a9a89 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Thu, 30 Nov 2023 14:36:33 -0500 Subject: [PATCH 2/7] refactor(MonitorAllTripsJob): Use modulo and iteration counts correctly. --- .../middleware/tripmonitor/jobs/MonitorAllTripsJob.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index c464e01a1..e12f920e8 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -61,7 +61,7 @@ public void run() { queueIterations++; // Report queue status every minute (unless this job finishes before). int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; - if ((runMillis & 60000) == 0) { + if ((runMillis % 60000) == 0) { LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000); } } @@ -71,9 +71,10 @@ public void run() { int idleIterations = 0; while (!allAnalyzersAreIdle(analyzerStatuses)) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); + idleIterations++; // Report analyzers statuses every minute (unless this job finishes before). int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; - if ((runMillis & 60000) == 0) { + if ((runMillis % 60000) == 0) { long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count(); LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000); } From cf944be6809359f64872f0942fd74fd2693ea278 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Thu, 30 Nov 2023 14:40:06 -0500 Subject: [PATCH 3/7] perf(MonitorAllTripJob): Fetch only all trip ids instead of all trips. --- .../api/MonitoredTripController.java | 6 ++--- .../tripmonitor/jobs/MonitorAllTripsJob.java | 16 +++++++------ .../tripmonitor/jobs/MonitoredTripLocks.java | 22 +++++++++--------- .../tripmonitor/jobs/TripAnalyzer.java | 23 +++++++++---------- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java b/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java index 6a2d14054..4f4c9815e 100644 --- a/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java +++ b/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) { @Override MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) { try { - MonitoredTripLocks.lock(monitoredTrip); + MonitoredTripLocks.lock(monitoredTrip.id); return runCheckMonitoredTrip(monitoredTrip); } catch (Exception e) { // FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw // trip as it was saved in the db? return monitoredTrip; } finally { - MonitoredTripLocks.unlock(monitoredTrip); + MonitoredTripLocks.unlock(monitoredTrip.id); } } @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti // the raw trip as it was saved in the db before the check monitored trip job ran? return monitoredTrip; } finally { - MonitoredTripLocks.unlock(monitoredTrip); + MonitoredTripLocks.unlock(monitoredTrip.id); } } diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index e12f920e8..a2e511e4c 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -1,6 +1,6 @@ package org.opentripplanner.middleware.tripmonitor.jobs; -import org.opentripplanner.middleware.models.MonitoredTrip; +import com.mongodb.BasicDBObject; import org.opentripplanner.middleware.persistence.Persistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,8 +30,8 @@ public void run() { LOG.info("MonitorAllTripsJob started"); // analyze all trips - // create a blocking queue of monitored trips to process - BlockingQueue tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE); + // create a blocking queue of monitored trip IDs to process + BlockingQueue tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE); // create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted AtomicBoolean queueDepleted = new AtomicBoolean(); @@ -47,11 +47,13 @@ public void run() { } try { - // request all monitored trips from the mongo collection - for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) { - // attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is + // request all monitored trips from the Mongo collection + // TODO: Filter out trips that would be skipped by the CheckMonitoredTrip. + BasicDBObject tripFilter = new BasicDBObject(); + for (String tripId : Persistence.monitoredTrips.getDistinctFieldValues("_id", tripFilter, String.class)) { + // attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is // exceeded, an InterruptedException is throw. - tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS); } // wait for queue to deplete diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java index 2abb5354e..02fe09f84 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java @@ -17,27 +17,27 @@ public class MonitoredTripLocks { /** the amount of time in milliseconds to wait to check if a lock has been released */ private static final int LOCK_CHECK_WAIT_MILLIS = 500; - private static final ConcurrentHashMap locks = new ConcurrentHashMap(); + private static final ConcurrentHashMap locks = new ConcurrentHashMap(); /** * Locks the given MonitoredTrip */ - public static void lock(MonitoredTrip trip) { - locks.put(trip, true); + public static void lock(String tripId) { + locks.put(tripId, true); } /** * Removes a lock for a given MonitoredTrip */ - public static void unlock(MonitoredTrip trip) { - locks.remove(trip); + public static void unlock(String tripId) { + locks.remove(tripId); } /** * Returns true if a lock exists for the given MonitoredTrip */ - public static boolean isLocked(MonitoredTrip trip) { - return locks.containsKey(trip); + public static boolean isLocked(String tripId) { + return locks.containsKey(tripId); } /** @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) { public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) { // Wait for any existing CheckMonitoredTrip jobs to complete before proceeding String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment."; - if (isLocked(monitoredTrip)) { + if (isLocked(monitoredTrip.id)) { int timeWaitedMillis = 0; do { try { @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS; // if the lock has been released, exit this wait loop - if (!isLocked(monitoredTrip)) break; + if (!isLocked(monitoredTrip.id)) break; } while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS); } // If a lock still exists, prevent the update - if (isLocked(monitoredTrip)) { + if (isLocked(monitoredTrip.id)) { logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage); return; } // lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip. - lock(monitoredTrip); + lock(monitoredTrip.id); } } diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java index 96a5e37ec..3cc9dab35 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable { private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250; private final AtomicBoolean analyzerIsIdle; - private final BlockingQueue tripAnalysisQueue; + private final BlockingQueue tripAnalysisQueue; private final AtomicBoolean queueDepleted; public TripAnalyzer( - BlockingQueue tripAnalysisQueue, + BlockingQueue tripAnalysisQueue, AtomicBoolean queueDepleted, AtomicBoolean analyzerIsIdle ) { @@ -34,10 +34,10 @@ public void run() { while (!queueDepleted.get()) { analyzerIsIdle.set(false); - // get the next monitored trip from the queue - MonitoredTrip trip; + // get the next monitored trip ID from the queue + String tripId; try { - trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.warn("TripAnalyzer thread interrupted"); e.printStackTrace(); @@ -48,23 +48,22 @@ public void run() { // The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is // exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue. - if (trip == null) { + if (tripId == null) { Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS); analyzerIsIdle.set(true); continue; } // verify that a lock hasn't been placed on trip by another trip analyzer task - if (MonitoredTripLocks.isLocked(trip)) { - LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip); + if (MonitoredTripLocks.isLocked(tripId)) { + LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId); analyzerIsIdle.set(true); continue; } // Refetch the trip from the database. This is to ensure the trip has any updates made to the trip // between when the trip was placed in the analysis queue and the current time. - String tripId = trip.id; - trip = Persistence.monitoredTrips.getById(tripId); + MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId); if (trip == null) { // trip was deleted between the time when it was placed in the queue and the current time. Don't // analyze the trip. @@ -76,7 +75,7 @@ public void run() { LOG.info("Analyzing trip {}", tripId); // place lock on trip - MonitoredTripLocks.lock(trip); + MonitoredTripLocks.lock(tripId); /////// BEGIN TRIP ANALYSIS try { @@ -88,7 +87,7 @@ public void run() { LOG.info("Finished analyzing trip {}", tripId); // remove lock on trip - MonitoredTripLocks.unlock(trip); + MonitoredTripLocks.unlock(tripId); analyzerIsIdle.set(true); } From 4b9d2d4fecd2073f82f24fe5441a803e9b6166f6 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Thu, 30 Nov 2023 15:34:19 -0500 Subject: [PATCH 4/7] refactor(MonitorAllTripsJob): Tweak comment about performance. --- .../middleware/tripmonitor/jobs/MonitorAllTripsJob.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index a2e511e4c..e34e14b53 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -47,7 +47,10 @@ public void run() { } try { - // request all monitored trips from the Mongo collection + // Request all monitored trip IDs from the Mongo collection. + // Performance note: Don't retrieve the full data for each trip at this time. + // This saves bandwidth and memory, as we don't use the trip data immediately besides the ID. + // The full data for each trip will be fetched at the time the actual analysis takes place. // TODO: Filter out trips that would be skipped by the CheckMonitoredTrip. BasicDBObject tripFilter = new BasicDBObject(); for (String tripId : Persistence.monitoredTrips.getDistinctFieldValues("_id", tripFilter, String.class)) { From 2117a0c1788ba467a565c7f3ccae072ca96d9145 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Thu, 30 Nov 2023 16:01:26 -0500 Subject: [PATCH 5/7] fix(MonitorAllTripsJob): Collect monitoted trip ids all at once before dispatching to threads. --- .../tripmonitor/jobs/MonitorAllTripsJob.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index e34e14b53..e59b5b5d7 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -47,13 +47,21 @@ public void run() { } try { - // Request all monitored trip IDs from the Mongo collection. + // Request all monitored trip IDs from the Mongo collection, all at once, and loop through a list of strings. + // If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason + // while the iterator is open, this thread would become blocked and + // prevent subsequent trip monitoring jobs to start. // Performance note: Don't retrieve the full data for each trip at this time. // This saves bandwidth and memory, as we don't use the trip data immediately besides the ID. // The full data for each trip will be fetched at the time the actual analysis takes place. // TODO: Filter out trips that would be skipped by the CheckMonitoredTrip. BasicDBObject tripFilter = new BasicDBObject(); - for (String tripId : Persistence.monitoredTrips.getDistinctFieldValues("_id", tripFilter, String.class)) { + List allTripIds = Persistence.monitoredTrips.getDistinctFieldValues( + "_id", + tripFilter, + String.class + ).into(new ArrayList<>()); + for (String tripId : allTripIds) { // attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is // exceeded, an InterruptedException is throw. tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS); From ca89702fd4441423f3af8a721054834fe7bc7d64 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Fri, 1 Dec 2023 09:44:37 -0500 Subject: [PATCH 6/7] refactor: Aply review feedback and other light refactorings --- .../tripmonitor/jobs/CheckMonitoredTrip.java | 6 --- .../tripmonitor/jobs/MonitorAllTripsJob.java | 38 ++++++++++++------- .../tripmonitor/jobs/MonitoredTripLocks.java | 2 +- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java index 1287be92f..893274103 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java @@ -520,12 +520,6 @@ private long getMinutesUntilTrip() { * calculated and updated in the monitored trip's journey state. */ public boolean shouldSkipMonitoredTripCheck() throws Exception { - // before anything else, return true if the trip is inactive - if (trip.isInactive()) { - LOG.info("Skipping: Trip is inactive."); - return true; - } - // get the configured timezone that OTP is using to parse dates and times ZoneId targetZoneId = DateTimeUtils.getOtpZoneId(); diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index e59b5b5d7..ef6a6584c 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -13,15 +13,16 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * This job will analyze all monitored trips and create further individual tasks to analyze each individual trip. + * This job will analyze applicable monitored trips and create further individual tasks to analyze each individual trip. */ public class MonitorAllTripsJob implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(MonitorAllTripsJob.class); + public static final int ONE_MINUTE_IN_MILLIS = 60000; private final int numCores = Runtime.getRuntime().availableProcessors(); private final int BLOCKING_QUEUE_SIZE = numCores; - private final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250; - private final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30; + private static final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250; + private static final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30; private final int N_TRIP_ANALYZERS = numCores; @Override @@ -47,18 +48,15 @@ public void run() { } try { - // Request all monitored trip IDs from the Mongo collection, all at once, and loop through a list of strings. + // Request at once all applicable monitored trip IDs from the Mongo collection, and loop through them. // If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason - // while the iterator is open, this thread would become blocked and - // prevent subsequent trip monitoring jobs to start. + // while the iterator is open, this thread would become blocked and prevent subsequent runs of this job. // Performance note: Don't retrieve the full data for each trip at this time. - // This saves bandwidth and memory, as we don't use the trip data immediately besides the ID. + // This saves bandwidth and memory, as only the ID field is used to set up this job. // The full data for each trip will be fetched at the time the actual analysis takes place. - // TODO: Filter out trips that would be skipped by the CheckMonitoredTrip. - BasicDBObject tripFilter = new BasicDBObject(); List allTripIds = Persistence.monitoredTrips.getDistinctFieldValues( "_id", - tripFilter, + makeTripFilter(), String.class ).into(new ArrayList<>()); for (String tripId : allTripIds) { @@ -69,12 +67,12 @@ public void run() { // wait for queue to deplete int queueIterations = 0; - while (tripAnalysisQueue.size() > 0) { + while (!tripAnalysisQueue.isEmpty()) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); queueIterations++; // Report queue status every minute (unless this job finishes before). int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; - if ((runMillis % 60000) == 0) { + if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) { LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000); } } @@ -87,7 +85,7 @@ public void run() { idleIterations++; // Report analyzers statuses every minute (unless this job finishes before). int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; - if ((runMillis % 60000) == 0) { + if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) { long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count(); LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000); } @@ -106,6 +104,20 @@ public void run() { LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000); } + /** + * Create a BSON clause to filter out trips that would not be checked. + */ + private static BasicDBObject makeTripFilter() { + BasicDBObject tripFilter = new BasicDBObject(); + //.Trips must be active. + tripFilter.put("isInactive", false); + + // Other conditions (e.g. in CheckMonitoredTrip) that would result in a trip to be not checked + // should eventually be moved here. + + return tripFilter; + } + /** * Checks each analyzer idle status and returns false if any are not idle. */ diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java index 02fe09f84..02d7a309b 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java @@ -17,7 +17,7 @@ public class MonitoredTripLocks { /** the amount of time in milliseconds to wait to check if a lock has been released */ private static final int LOCK_CHECK_WAIT_MILLIS = 500; - private static final ConcurrentHashMap locks = new ConcurrentHashMap(); + private static final ConcurrentHashMap locks = new ConcurrentHashMap<>(); /** * Locks the given MonitoredTrip From d1d9a190a9361bb31feebfb6c00ef2c3900e1157 Mon Sep 17 00:00:00 2001 From: binh-dam-ibigroup <56846598+binh-dam-ibigroup@users.noreply.github.com> Date: Fri, 1 Dec 2023 10:00:52 -0500 Subject: [PATCH 7/7] refactor(MonitorAllTripsJob): Fix active trip filter and checking. --- .../middleware/tripmonitor/jobs/CheckMonitoredTrip.java | 6 ++++++ .../middleware/tripmonitor/jobs/MonitorAllTripsJob.java | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java index 893274103..1287be92f 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/CheckMonitoredTrip.java @@ -520,6 +520,12 @@ private long getMinutesUntilTrip() { * calculated and updated in the monitored trip's journey state. */ public boolean shouldSkipMonitoredTripCheck() throws Exception { + // before anything else, return true if the trip is inactive + if (trip.isInactive()) { + LOG.info("Skipping: Trip is inactive."); + return true; + } + // get the configured timezone that OTP is using to parse dates and times ZoneId targetZoneId = DateTimeUtils.getOtpZoneId(); diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index ef6a6584c..636c904b7 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -109,8 +109,9 @@ public void run() { */ private static BasicDBObject makeTripFilter() { BasicDBObject tripFilter = new BasicDBObject(); + //.Trips must be active. - tripFilter.put("isInactive", false); + tripFilter.put("isActive", true); // Other conditions (e.g. in CheckMonitoredTrip) that would result in a trip to be not checked // should eventually be moved here.