Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trip Monitor analyzer improvement, status log #199

Merged
merged 8 commits into from
Dec 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ MonitoredTrip preCreateHook(MonitoredTrip monitoredTrip, Request req) {
@Override
MonitoredTrip postCreateHook(MonitoredTrip monitoredTrip, Request req) {
try {
MonitoredTripLocks.lock(monitoredTrip);
MonitoredTripLocks.lock(monitoredTrip.id);
return runCheckMonitoredTrip(monitoredTrip);
} catch (Exception e) {
// FIXME: an error happened while checking the trip, but the trip was saved to the DB, so return the raw
// trip as it was saved in the db?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@ MonitoredTrip preUpdateHook(MonitoredTrip monitoredTrip, MonitoredTrip preExisti
// the raw trip as it was saved in the db before the check monitored trip job ran?
return monitoredTrip;
} finally {
MonitoredTripLocks.unlock(monitoredTrip);
MonitoredTripLocks.unlock(monitoredTrip.id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opentripplanner.middleware.tripmonitor.jobs;

import org.opentripplanner.middleware.models.MonitoredTrip;
import com.mongodb.BasicDBObject;
import org.opentripplanner.middleware.persistence.Persistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,11 +26,12 @@ public class MonitorAllTripsJob implements Runnable {

@Override
public void run() {
long start = System.currentTimeMillis();
LOG.info("MonitorAllTripsJob started");
// analyze all trips

// create a blocking queue of monitored trips to process
BlockingQueue<MonitoredTrip> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);
// create a blocking queue of monitored trip IDs to process
BlockingQueue<String> tripAnalysisQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);

// create an Atomic Boolean for TripAnalyzer threads to check whether the queue is actually depleted
AtomicBoolean queueDepleted = new AtomicBoolean();
Expand All @@ -46,22 +47,50 @@ public void run() {
}

try {
// request all monitored trips from the mongo collection
for (MonitoredTrip monitoredTrip : Persistence.monitoredTrips.getAll()) {
// attempt to add trip to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// Request all monitored trip IDs from the Mongo collection, all at once, and loop through a list of strings.
// If we looped using a Mongo-provided iterator instead, and the Mongo connection is dropped for any reason
// while the iterator is open, this thread would become blocked and
// prevent subsequent trip monitoring jobs to start.
// Performance note: Don't retrieve the full data for each trip at this time.
// This saves bandwidth and memory, as we don't use the trip data immediately besides the ID.
// The full data for each trip will be fetched at the time the actual analysis takes place.
// TODO: Filter out trips that would be skipped by the CheckMonitoredTrip.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this TODO be done now? If not, perhaps explain why. TODOs seem to have a habit of not getting done.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you, I added one criterion in ca89702 and reworded the TODO.

BasicDBObject tripFilter = new BasicDBObject();
List<String> allTripIds = Persistence.monitoredTrips.getDistinctFieldValues(
"_id",
tripFilter,
String.class
).into(new ArrayList<>());
for (String tripId : allTripIds) {
// attempt to add trip ID to tripAnalysisQueue until a spot opens up in the queue. If the timeout is
// exceeded, an InterruptedException is throw.
tripAnalysisQueue.offer(monitoredTrip, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
tripAnalysisQueue.offer(tripId, BLOCKING_QUEUE_INSERT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

// wait for queue to deplete
int queueIterations = 0;
while (tripAnalysisQueue.size() > 0) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
queueIterations++;
// Report queue status every minute (unless this job finishes before).
int runMillis = queueIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % 60000) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could create a constant for 60000 and use here and further down.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored in ca89702.

LOG.info("There are {} queued. after {} sec.", tripAnalysisQueue.size(), runMillis / 1000);
}
}
queueDepleted.set(true);

// wait for analyzers to complete
int idleIterations = 0;
while (!allAnalyzersAreIdle(analyzerStatuses)) {
Thread.sleep(BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS);
idleIterations++;
// Report analyzers statuses every minute (unless this job finishes before).
int runMillis = idleIterations * BLOCKING_QUEUE_DEPLETE_WAIT_TIME_MILLIS;
if ((runMillis % 60000) == 0) {
long notIdleCount = analyzerStatuses.stream().filter(s -> !s.get()).count();
LOG.info("There are {} analyzers not idle after {} sec.", notIdleCount, runMillis / 1000);
}
}
} catch (InterruptedException e) {
LOG.error("error encountered while waiting during MonitorAllTripsJob.");
Expand All @@ -74,7 +103,7 @@ public void run() {

// TODO report successful run to error & notification system

LOG.info("MonitorAllTripsJob completed");
LOG.info("MonitorAllTripsJob completed in {} sec", (System.currentTimeMillis() - start) / 1000);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public class MonitoredTripLocks {
/** the amount of time in milliseconds to wait to check if a lock has been released */
private static final int LOCK_CHECK_WAIT_MILLIS = 500;

private static final ConcurrentHashMap<MonitoredTrip, Boolean> locks = new ConcurrentHashMap();
private static final ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarLint: Update to new ConcurrentHashMap<>()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored in ca89702.


/**
* Locks the given MonitoredTrip
*/
public static void lock(MonitoredTrip trip) {
locks.put(trip, true);
public static void lock(String tripId) {
locks.put(tripId, true);
}

/**
* Removes a lock for a given MonitoredTrip
*/
public static void unlock(MonitoredTrip trip) {
locks.remove(trip);
public static void unlock(String tripId) {
locks.remove(tripId);
}

/**
* Returns true if a lock exists for the given MonitoredTrip
*/
public static boolean isLocked(MonitoredTrip trip) {
return locks.containsKey(trip);
public static boolean isLocked(String tripId) {
return locks.containsKey(tripId);
}

/**
Expand All @@ -48,7 +48,7 @@ public static boolean isLocked(MonitoredTrip trip) {
public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req) {
// Wait for any existing CheckMonitoredTrip jobs to complete before proceeding
String busyMessage = "A trip monitor check prevented the trip from being updated. Please try again in a moment.";
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
int timeWaitedMillis = 0;
do {
try {
Expand All @@ -59,17 +59,17 @@ public static void lockTripForUpdating(MonitoredTrip monitoredTrip, Request req)
timeWaitedMillis += LOCK_CHECK_WAIT_MILLIS;

// if the lock has been released, exit this wait loop
if (!isLocked(monitoredTrip)) break;
if (!isLocked(monitoredTrip.id)) break;
} while (timeWaitedMillis <= MAX_UNLOCKING_WAIT_TIME_MILLIS);
}

// If a lock still exists, prevent the update
if (isLocked(monitoredTrip)) {
if (isLocked(monitoredTrip.id)) {
logMessageAndHalt(req, HttpStatus.INTERNAL_SERVER_ERROR_500, busyMessage);
return;
}

// lock the trip so that the a CheckMonitoredTrip job won't concurrently analyze/update the trip.
lock(monitoredTrip);
lock(monitoredTrip.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class TripAnalyzer implements Runnable {
private final int BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS = 250;

private final AtomicBoolean analyzerIsIdle;
private final BlockingQueue<MonitoredTrip> tripAnalysisQueue;
private final BlockingQueue<String> tripAnalysisQueue;
private final AtomicBoolean queueDepleted;

public TripAnalyzer(
BlockingQueue<MonitoredTrip> tripAnalysisQueue,
BlockingQueue<String> tripAnalysisQueue,
AtomicBoolean queueDepleted,
AtomicBoolean analyzerIsIdle
) {
Expand All @@ -34,10 +34,10 @@ public void run() {
while (!queueDepleted.get()) {
analyzerIsIdle.set(false);

// get the next monitored trip from the queue
MonitoredTrip trip;
// get the next monitored trip ID from the queue
String tripId;
try {
trip = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
tripId = tripAnalysisQueue.poll(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("TripAnalyzer thread interrupted");
e.printStackTrace();
Expand All @@ -48,23 +48,22 @@ public void run() {

// The implementation of the ArrayBlockingQueue can result in null items being returned if the wait is
// exceeded on an empty queue. Therefore, check if the trip is null and if so, wait and then continue.
if (trip == null) {
if (tripId == null) {
Thread.sleep(BLOCKING_QUEUE_POLL_TIMEOUT_MILLIS);
analyzerIsIdle.set(true);
continue;
}

// verify that a lock hasn't been placed on trip by another trip analyzer task
if (MonitoredTripLocks.isLocked(trip)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", trip);
if (MonitoredTripLocks.isLocked(tripId)) {
LOG.warn("Skipping trip analysis due to existing lock on trip: {}", tripId);
analyzerIsIdle.set(true);
continue;
}

// Refetch the trip from the database. This is to ensure the trip has any updates made to the trip
// between when the trip was placed in the analysis queue and the current time.
String tripId = trip.id;
trip = Persistence.monitoredTrips.getById(tripId);
MonitoredTrip trip = Persistence.monitoredTrips.getById(tripId);
if (trip == null) {
// trip was deleted between the time when it was placed in the queue and the current time. Don't
// analyze the trip.
Expand All @@ -76,7 +75,7 @@ public void run() {
LOG.info("Analyzing trip {}", tripId);

// place lock on trip
MonitoredTripLocks.lock(trip);
MonitoredTripLocks.lock(tripId);

/////// BEGIN TRIP ANALYSIS
try {
Expand All @@ -88,7 +87,7 @@ public void run() {
LOG.info("Finished analyzing trip {}", tripId);

// remove lock on trip
MonitoredTripLocks.unlock(trip);
MonitoredTripLocks.unlock(tripId);

analyzerIsIdle.set(true);
}
Expand Down
Loading