diff --git a/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java b/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java index 6a2d14054..4f4c9815e 100644 --- a/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java +++ b/src/main/java/org/opentripplanner/middleware/controllers/api/MonitoredTripController.java @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) { @Override MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) { try { - MonitoredTripLocks.lock(monitoredTrip); + MonitoredTripLocks.lock(monitoredTrip.id); return runCheckMonitoredTrip(monitoredTrip); } catch (Exception e) { // FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw // trip as it was saved in the db? return monitoredTrip; } finally { - MonitoredTripLocks.unlock(monitoredTrip); + MonitoredTripLocks.unlock(monitoredTrip.id); } } @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti // the raw trip as it was saved in the db before the check monitored trip job ran? return monitoredTrip; } finally { - MonitoredTripLocks.unlock(monitoredTrip); + MonitoredTripLocks.unlock(monitoredTrip.id); } } diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java index a2916f1a6..636c904b7 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitorAllTripsJob.java @@ -1,6 +1,6 @@ package org.opentripplanner.middleware.tripmonitor.jobs; -import org.opentripplanner.middleware.models.MonitoredTrip; +import com.mongodb.BasicDBObject; import org.opentripplanner.middleware.persistence.Persistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,24 +13,26 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * This job will analyze all monitored trips and create further individual tasks to analyze each individual trip. + * This job will analyze applicable monitored trips and create further individual tasks to analyze each individual trip. */ public class MonitorAllTripsJob implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(MonitorAllTripsJob.class); + public static final int ONE_MINUTE_IN_MILLIS = 60000; private final int numCores = Runtime.getRuntime().availableProcessors(); private final int BLOCKING_QUEUE_SIZE = numCores; - private final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250; - private final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30; + private static final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250; + private static final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30; private final int N_TRIP_ANALYZERS = numCores; @Override public void run() { + long start = System.currentTimeMillis(); LOG.info("MonitorAllTripsJob started"); // analyze all trips - // create a blocking queue of monitored trips to process - BlockingQueue tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE); + // create a blocking queue of monitored trip IDs to process + BlockingQueue tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE); // create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted AtomicBoolean queueDepleted = new AtomicBoolean(); @@ -46,22 +48,47 @@ public void run() { } try { - // request all monitored trips from the mongo collection - for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) { - // attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is + // Request at once all applicable monitored trip IDs from the Mongo collection, and loop through them. + // If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason + // while the iterator is open, this thread would become blocked and prevent subsequent runs of this job. + // Performance note: Don't retrieve the full data for each trip at this time. + // This saves bandwidth and memory, as only the ID field is used to set up this job. + // The full data for each trip will be fetched at the time the actual analysis takes place. + List allTripIds = Persistence.monitoredTrips.getDistinctFieldValues( + "_id", + makeTripFilter(), + String.class + ).into(new ArrayList<>()); + for (String tripId : allTripIds) { + // attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is // exceeded, an InterruptedException is throw. - tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS); } // wait for queue to deplete - while (tripAnalysisQueue.size() > 0) { + int queueIterations = 0; + while (!tripAnalysisQueue.isEmpty()) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); + queueIterations++; + // Report queue status every minute (unless this job finishes before). + int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; + if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) { + LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000); + } } queueDepleted.set(true); // wait for analyzers to complete + int idleIterations = 0; while (!allAnalyzersAreIdle(analyzerStatuses)) { Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS); + idleIterations++; + // Report analyzers statuses every minute (unless this job finishes before). + int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS; + if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) { + long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count(); + LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000); + } } } catch (InterruptedException e) { LOG.error("error encountered while waiting during MonitorAllTripsJob."); @@ -74,7 +101,22 @@ public void run() { // TODO report successful run to error & notification system - LOG.info("MonitorAllTripsJob completed"); + LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000); + } + + /** + * Create a BSON clause to filter out trips that would not be checked. + */ + private static BasicDBObject makeTripFilter() { + BasicDBObject tripFilter = new BasicDBObject(); + + //.Trips must be active. + tripFilter.put("isActive", true); + + // Other conditions (e.g. in CheckMonitoredTrip) that would result in a trip to be not checked + // should eventually be moved here. + + return tripFilter; } /** diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java index 2abb5354e..02d7a309b 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/MonitoredTripLocks.java @@ -17,27 +17,27 @@ public class MonitoredTripLocks { /** the amount of time in milliseconds to wait to check if a lock has been released */ private static final int LOCK_CHECK_WAIT_MILLIS = 500; - private static final ConcurrentHashMap locks = new ConcurrentHashMap(); + private static final ConcurrentHashMap locks = new ConcurrentHashMap<>(); /** * Locks the given MonitoredTrip */ - public static void lock(MonitoredTrip trip) { - locks.put(trip, true); + public static void lock(String tripId) { + locks.put(tripId, true); } /** * Removes a lock for a given MonitoredTrip */ - public static void unlock(MonitoredTrip trip) { - locks.remove(trip); + public static void unlock(String tripId) { + locks.remove(tripId); } /** * Returns true if a lock exists for the given MonitoredTrip */ - public static boolean isLocked(MonitoredTrip trip) { - return locks.containsKey(trip); + public static boolean isLocked(String tripId) { + return locks.containsKey(tripId); } /** @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) { public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) { // Wait for any existing CheckMonitoredTrip jobs to complete before proceeding String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment."; - if (isLocked(monitoredTrip)) { + if (isLocked(monitoredTrip.id)) { int timeWaitedMillis = 0; do { try { @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS; // if the lock has been released, exit this wait loop - if (!isLocked(monitoredTrip)) break; + if (!isLocked(monitoredTrip.id)) break; } while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS); } // If a lock still exists, prevent the update - if (isLocked(monitoredTrip)) { + if (isLocked(monitoredTrip.id)) { logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage); return; } // lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip. - lock(monitoredTrip); + lock(monitoredTrip.id); } } diff --git a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java index 96a5e37ec..3cc9dab35 100644 --- a/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java +++ b/src/main/java/org/opentripplanner/middleware/tripmonitor/jobs/TripAnalyzer.java @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable { private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250; private final AtomicBoolean analyzerIsIdle; - private final BlockingQueue tripAnalysisQueue; + private final BlockingQueue tripAnalysisQueue; private final AtomicBoolean queueDepleted; public TripAnalyzer( - BlockingQueue tripAnalysisQueue, + BlockingQueue tripAnalysisQueue, AtomicBoolean queueDepleted, AtomicBoolean analyzerIsIdle ) { @@ -34,10 +34,10 @@ public void run() { while (!queueDepleted.get()) { analyzerIsIdle.set(false); - // get the next monitored trip from the queue - MonitoredTrip trip; + // get the next monitored trip ID from the queue + String tripId; try { - trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.warn("TripAnalyzer thread interrupted"); e.printStackTrace(); @@ -48,23 +48,22 @@ public void run() { // The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is // exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue. - if (trip == null) { + if (tripId == null) { Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS); analyzerIsIdle.set(true); continue; } // verify that a lock hasn't been placed on trip by another trip analyzer task - if (MonitoredTripLocks.isLocked(trip)) { - LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip); + if (MonitoredTripLocks.isLocked(tripId)) { + LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId); analyzerIsIdle.set(true); continue; } // Refetch the trip from the database. This is to ensure the trip has any updates made to the trip // between when the trip was placed in the analysis queue and the current time. - String tripId = trip.id; - trip = Persistence.monitoredTrips.getById(tripId); + MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId); if (trip == null) { // trip was deleted between the time when it was placed in the queue and the current time. Don't // analyze the trip. @@ -76,7 +75,7 @@ public void run() { LOG.info("Analyzing trip {}", tripId); // place lock on trip - MonitoredTripLocks.lock(trip); + MonitoredTripLocks.lock(tripId); /////// BEGIN TRIP ANALYSIS try { @@ -88,7 +87,7 @@ public void run() { LOG.info("Finished analyzing trip {}", tripId); // remove lock on trip - MonitoredTripLocks.unlock(trip); + MonitoredTripLocks.unlock(tripId); analyzerIsIdle.set(true); }