Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trip Monitor analyzer improvement, status log #199

Merged
merged 8 commits into from
Dec 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) {
@Override
MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) {
try {
MonitoredTripLocks.lock(monitoredTrip);
MonitoredTripLocks.lock(monitoredTrip.id);
return runCheckMonitoredTrip(monitoredTrip);
} catch (Exception e) {
// FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw
// trip as it was saved in the db?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti
// the raw trip as it was saved in the db before the check monitored trip job ran?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opentripplanner.middleware.tripmonitor.jobs;

import org.opentripplanner.middleware.models.MonitoredTrip;
import com.mongodb.BasicDBObject;
import org.opentripplanner.middleware.persistence.Persistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -13,24 +13,26 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This job will analyze all monitored trips and create further individual tasks to analyze each individual trip.
* This job will analyze applicable monitored trips and create further individual tasks to analyze each individual trip.
*/
public class MonitorAllTripsJob implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MonitorAllTripsJob.class);
public static final int ONE_MINUTE_IN_MILLIS = 60000;

private final int numCores = Runtime.getRuntime().availableProcessors();
private final int BLOCKING_QUEUE_SIZE = numCores;
private final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250;
private final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30;
private static final int BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS = 250;
private static final int BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS = 30;
private final int N_TRIP_ANALYZERS = numCores;

@Override
public void run() {
long start = System.currentTimeMillis();
LOG.info("MonitorAllTripsJob started");
// analyze all trips

// create a blocking queue of monitored trips to process
BlockingQueue<MonitoredTrip> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);
// create a blocking queue of monitored trip IDs to process
BlockingQueue<String> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);

// create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted
AtomicBoolean queueDepleted = new AtomicBoolean();
Expand All @@ -46,22 +48,47 @@ public void run() {
}

try {
// request all monitored trips from the mongo collection
for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) {
// attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// Request at once all applicable monitored trip IDs from the Mongo collection, and loop through them.
// If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason
// while the iterator is open, this thread would become blocked and prevent subsequent runs of this job.
// Performance note: Don't retrieve the full data for each trip at this time.
// This saves bandwidth and memory, as only the ID field is used to set up this job.
// The full data for each trip will be fetched at the time the actual analysis takes place.
List<String> allTripIds = Persistence.monitoredTrips.getDistinctFieldValues(
"_id",
makeTripFilter(),
String.class
).into(new ArrayList<>());
for (String tripId : allTripIds) {
// attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// exceeded, an InterruptedException is throw.
tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

// wait for queue to deplete
while (tripAnalysisQueue.size() > 0) {
int queueIterations = 0;
while (!tripAnalysisQueue.isEmpty()) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
queueIterations++;
// Report queue status every minute (unless this job finishes before).
int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) {
LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000);
}
}
queueDepleted.set(true);

// wait for analyzers to complete
int idleIterations = 0;
while (!allAnalyzersAreIdle(analyzerStatuses)) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
idleIterations++;
// Report analyzers statuses every minute (unless this job finishes before).
int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % ONE_MINUTE_IN_MILLIS) == 0) {
long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count();
LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000);
}
}
} catch (InterruptedException e) {
LOG.error("error encountered while waiting during MonitorAllTripsJob.");
Expand All @@ -74,7 +101,22 @@ public void run() {

// TODO report successful run to error & notification system

LOG.info("MonitorAllTripsJob completed");
LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000);
}

/**
* Create a BSON clause to filter out trips that would not be checked.
*/
private static BasicDBObject makeTripFilter() {
BasicDBObject tripFilter = new BasicDBObject();

//.Trips must be active.
tripFilter.put("isActive", true);

// Other conditions (e.g. in CheckMonitoredTrip) that would result in a trip to be not checked
// should eventually be moved here.

return tripFilter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public class MonitoredTripLocks {
/** the amount of time in milliseconds to wait to check if a lock has been released */
private static final int LOCK_CHECK_WAIT_MILLIS = 500;

private static final ConcurrentHashMap<MonitoredTrip, Boolean> locks = new ConcurrentHashMap();
private static final ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<>();

/**
* Locks the given MonitoredTrip
*/
public static void lock(MonitoredTrip trip) {
locks.put(trip, true);
public static void lock(String tripId) {
locks.put(tripId, true);
}

/**
* Removes a lock for a given MonitoredTrip
*/
public static void unlock(MonitoredTrip trip) {
locks.remove(trip);
public static void unlock(String tripId) {
locks.remove(tripId);
}

/**
* Returns true if a lock exists for the given MonitoredTrip
*/
public static boolean isLocked(MonitoredTrip trip) {
return locks.containsKey(trip);
public static boolean isLocked(String tripId) {
return locks.containsKey(tripId);
}

/**
Expand All @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) {
public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) {
// Wait for any existing CheckMonitoredTrip jobs to complete before proceeding
String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment.";
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
int timeWaitedMillis = 0;
do {
try {
Expand All @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req)
timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS;

// if the lock has been released, exit this wait loop
if (!isLocked(monitoredTrip)) break;
if (!isLocked(monitoredTrip.id)) break;
} while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS);
}

// If a lock still exists, prevent the update
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage);
return;
}

// lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip.
lock(monitoredTrip);
lock(monitoredTrip.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable {
private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250;

private final AtomicBoolean analyzerIsIdle;
private final BlockingQueue<MonitoredTrip> tripAnalysisQueue;
private final BlockingQueue<String> tripAnalysisQueue;
private final AtomicBoolean queueDepleted;

public TripAnalyzer(
BlockingQueue<MonitoredTrip> tripAnalysisQueue,
BlockingQueue<String> tripAnalysisQueue,
AtomicBoolean queueDepleted,
AtomicBoolean analyzerIsIdle
) {
Expand All @@ -34,10 +34,10 @@ public void run() {
while (!queueDepleted.get()) {
analyzerIsIdle.set(false);

// get the next monitored trip from the queue
MonitoredTrip trip;
// get the next monitored trip ID from the queue
String tripId;
try {
trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("TripAnalyzer thread interrupted");
e.printStackTrace();
Expand All @@ -48,23 +48,22 @@ public void run() {

// The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is
// exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue.
if (trip == null) {
if (tripId == null) {
Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS);
analyzerIsIdle.set(true);
continue;
}

// verify that a lock hasn't been placed on trip by another trip analyzer task
if (MonitoredTripLocks.isLocked(trip)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip);
if (MonitoredTripLocks.isLocked(tripId)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId);
analyzerIsIdle.set(true);
continue;
}

// Refetch the trip from the database. This is to ensure the trip has any updates made to the trip
// between when the trip was placed in the analysis queue and the current time.
String tripId = trip.id;
trip = Persistence.monitoredTrips.getById(tripId);
MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId);
if (trip == null) {
// trip was deleted between the time when it was placed in the queue and the current time. Don't
// analyze the trip.
Expand All @@ -76,7 +75,7 @@ public void run() {
LOG.info("Analyzing trip {}", tripId);

// place lock on trip
MonitoredTripLocks.lock(trip);
MonitoredTripLocks.lock(tripId);

/////// BEGIN TRIP ANALYSIS
try {
Expand All @@ -88,7 +87,7 @@ public void run() {
LOG.info("Finished analyzing trip {}", tripId);

// remove lock on trip
MonitoredTripLocks.unlock(trip);
MonitoredTripLocks.unlock(tripId);

analyzerIsIdle.set(true);
}
Expand Down
Loading