Skip to content

Commit

Permalink
Merge pull request #199 from ibi-group/tripmon-job-status
Browse files Browse the repository at this point in the history
Trip Monitor analyzer improvement, status log
  • Loading branch information
binh-dam-ibigroup authored Dec 11, 2023
2 parents 28b332c + d1d9a19 commit fd0d143
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) {
@Override
MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) {
try {
MonitoredTripLocks.lock(monitoredTrip);
MonitoredTripLocks.lock(monitoredTrip.id);
return runCheckMonitoredTrip(monitoredTrip);
} catch (Exception e) {
// FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw
// trip as it was saved in the db?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti
// the raw trip as it was saved in the db before the check monitored trip job ran?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opentripplanner.middleware.tripmonitor.jobs;

import org.opentripplanner.middleware.models.MonitoredTrip;
import com.mongodb.BasicDBObject;
import org.opentripplanner.middleware.persistence.Persistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -13,24 +13,26 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This job will analyze all monitored trips and create further individual tasks to analyze each individual trip.
* This job will analyze applicable monitored trips and create further individual tasks to analyze each individual trip.
*/
public class MonitorAllTripsJob implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MonitorAllTripsJob.class);
public static final int ONE_MINUTE_IN_MILLIS = 60000;

private final int numCores = Runtime.getRuntime().availableProcessors();
private final int BLOCKING_QUEUE_SIZE = numCores;
private final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250;
private final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30;
private static final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250;
private static final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30;
private final int N_TRIP_ANALYZERS = numCores;

@Override
public void run() {
long start = System.currentTimeMillis();
LOG.info("MonitorAllTripsJob started");
// analyze all trips

// create a blocking queue of monitored trips to process
BlockingQueue<MonitoredTrip> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);
// create a blocking queue of monitored trip IDs to process
BlockingQueue<String> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);

// create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted
AtomicBoolean queueDepleted = new AtomicBoolean();
Expand All @@ -46,22 +48,47 @@ public void run() {
}

try {
// request all monitored trips from the mongo collection
for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) {
// attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// Request at once all applicable monitored trip IDs from the Mongo collection, and loop through them.
// If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason
// while the iterator is open, this thread would become blocked and prevent subsequent runs of this job.
// Performance note: Don't retrieve the full data for each trip at this time.
// This saves bandwidth and memory, as only the ID field is used to set up this job.
// The full data for each trip will be fetched at the time the actual analysis takes place.
List<String> allTripIds = Persistence.monitoredTrips.getDistinctFieldValues(
"_id",
makeTripFilter(),
String.class
).into(new ArrayList<>());
for (String tripId : allTripIds) {
// attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// exceeded, an InterruptedException is throw.
tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

// wait for queue to deplete
while (tripAnalysisQueue.size() > 0) {
int queueIterations = 0;
while (!tripAnalysisQueue.isEmpty()) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
queueIterations++;
// Report queue status every minute (unless this job finishes before).
int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) {
LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000);
}
}
queueDepleted.set(true);

// wait for analyzers to complete
int idleIterations = 0;
while (!allAnalyzersAreIdle(analyzerStatuses)) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
idleIterations++;
// Report analyzers statuses every minute (unless this job finishes before).
int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) {
long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count();
LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000);
}
}
} catch (InterruptedException e) {
LOG.error("error encountered while waiting during MonitorAllTripsJob.");
Expand All @@ -74,7 +101,22 @@ public void run() {

// TODO report successful run to error & notification system

LOG.info("MonitorAllTripsJob completed");
LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000);
}

/**
* Create a BSON clause to filter out trips that would not be checked.
*/
private static BasicDBObject makeTripFilter() {
BasicDBObject tripFilter = new BasicDBObject();

//.Trips must be active.
tripFilter.put("isActive", true);

// Other conditions (e.g. in CheckMonitoredTrip) that would result in a trip to be not checked
// should eventually be moved here.

return tripFilter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public class MonitoredTripLocks {
/** the amount of time in milliseconds to wait to check if a lock has been released */
private static final int LOCK_CHECK_WAIT_MILLIS = 500;

private static final ConcurrentHashMap<MonitoredTrip, Boolean> locks = new ConcurrentHashMap();
private static final ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<>();

/**
* Locks the given MonitoredTrip
*/
public static void lock(MonitoredTrip trip) {
locks.put(trip, true);
public static void lock(String tripId) {
locks.put(tripId, true);
}

/**
* Removes a lock for a given MonitoredTrip
*/
public static void unlock(MonitoredTrip trip) {
locks.remove(trip);
public static void unlock(String tripId) {
locks.remove(tripId);
}

/**
* Returns true if a lock exists for the given MonitoredTrip
*/
public static boolean isLocked(MonitoredTrip trip) {
return locks.containsKey(trip);
public static boolean isLocked(String tripId) {
return locks.containsKey(tripId);
}

/**
Expand All @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) {
public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) {
// Wait for any existing CheckMonitoredTrip jobs to complete before proceeding
String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment.";
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
int timeWaitedMillis = 0;
do {
try {
Expand All @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req)
timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS;

// if the lock has been released, exit this wait loop
if (!isLocked(monitoredTrip)) break;
if (!isLocked(monitoredTrip.id)) break;
} while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS);
}

// If a lock still exists, prevent the update
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage);
return;
}

// lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip.
lock(monitoredTrip);
lock(monitoredTrip.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable {
private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250;

private final AtomicBoolean analyzerIsIdle;
private final BlockingQueue<MonitoredTrip> tripAnalysisQueue;
private final BlockingQueue<String> tripAnalysisQueue;
private final AtomicBoolean queueDepleted;

public TripAnalyzer(
BlockingQueue<MonitoredTrip> tripAnalysisQueue,
BlockingQueue<String> tripAnalysisQueue,
AtomicBoolean queueDepleted,
AtomicBoolean analyzerIsIdle
) {
Expand All @@ -34,10 +34,10 @@ public void run() {
while (!queueDepleted.get()) {
analyzerIsIdle.set(false);

// get the next monitored trip from the queue
MonitoredTrip trip;
// get the next monitored trip ID from the queue
String tripId;
try {
trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("TripAnalyzer thread interrupted");
e.printStackTrace();
Expand All @@ -48,23 +48,22 @@ public void run() {

// The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is
// exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue.
if (trip == null) {
if (tripId == null) {
Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS);
analyzerIsIdle.set(true);
continue;
}

// verify that a lock hasn't been placed on trip by another trip analyzer task
if (MonitoredTripLocks.isLocked(trip)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip);
if (MonitoredTripLocks.isLocked(tripId)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId);
analyzerIsIdle.set(true);
continue;
}

// Refetch the trip from the database. This is to ensure the trip has any updates made to the trip
// between when the trip was placed in the analysis queue and the current time.
String tripId = trip.id;
trip = Persistence.monitoredTrips.getById(tripId);
MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId);
if (trip == null) {
// trip was deleted between the time when it was placed in the queue and the current time. Don't
// analyze the trip.
Expand All @@ -76,7 +75,7 @@ public void run() {
LOG.info("Analyzing trip {}", tripId);

// place lock on trip
MonitoredTripLocks.lock(trip);
MonitoredTripLocks.lock(tripId);

/////// BEGIN TRIP ANALYSIS
try {
Expand All @@ -88,7 +87,7 @@ public void run() {
LOG.info("Finished analyzing trip {}", tripId);

// remove lock on trip
MonitoredTripLocks.unlock(trip);
MonitoredTripLocks.unlock(tripId);

analyzerIsIdle.set(true);
}
Expand Down

0 comments on commit fd0d143

Please sign in to comment.