Skip to content

Commit

Permalink
Add timing instrumentation to has_trip_ended for performance monito…
Browse files Browse the repository at this point in the history
…ring

- Introduced timers to measure the duration of key operations
  • Loading branch information
TeachMeTW committed Dec 3, 2024
1 parent 3900d3c commit 199fe4e
Showing 1 changed file with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def segment_into_trips(self, timeseries, time_query):
)

with ect.Timer() as t_has_trip_ended:
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
if self.has_trip_ended(user_id, prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
Expand Down Expand Up @@ -275,26 +275,41 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df):
else:
return False

def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
def has_trip_ended(self, user_id, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
# Another mismatch between phone and server. Phone stops tracking too soon,
# so the distance is still greater than the threshold at the end of the trip.
# But then the next point is a long time away, so we can split again (similar to a distance filter)
if prev_point is None:
logging.debug("prev_point is None, continuing trip")
else:
timeDelta = curr_point.ts - prev_point.ts
distDelta = pf.calDistance(prev_point, curr_point)
if timeDelta > 0:
speedDelta = old_div(distDelta, timeDelta)
else:
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)
with ect.Timer() as t_time_calculations:
timeDelta = curr_point.ts - prev_point.ts
distDelta = pf.calDistance(prev_point, curr_point)
if timeDelta > 0:
speedDelta = old_div(distDelta, timeDelta)
else:
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/has_trip_ended/time_calculations",
time.time(),
t_time_calculations.elapsed
)

if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
logging.debug("tracking was restarted, ending trip")
return True

ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
with ect.Timer() as t_motion_check:
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/has_trip_ended/motion_check",
time.time(),
t_motion_check.elapsed
)

if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
Expand All @@ -310,8 +325,8 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
(prev_point.ts, curr_point.ts, TWELVE_HOURS, curr_point.ts - prev_point.ts))
return True

if (timeDelta > 2 * self.time_threshold and # We have been here for a while
speedDelta < speedThreshold): # we haven't moved very much
if (timeDelta > 2 * self.time_threshold and # We have been here for a while
speedDelta < speedThreshold): # we haven't moved very much
logging.debug("prev_point.ts = %s, curr_point.ts = %s, threshold = %s, large gap = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts))
return True
Expand All @@ -333,11 +348,18 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
return False

# Normal end-of-trip case
logging.debug("last5MinsDistances.max() = %s, last10PointsDistance.max() = %s" %
(last5MinsDistances.max(), last10PointsDistances.max()))
if (last5MinsDistances.max() < self.distance_threshold and
last10PointsDistances.max() < self.distance_threshold):
with ect.Timer() as t_distance_check:
logging.debug("last5MinsDistances.max() = %s, last10PointsDistance.max() = %s" %
(last5MinsDistances.max(), last10PointsDistances.max()))
if (last5MinsDistances.max() < self.distance_threshold and
last10PointsDistances.max() < self.distance_threshold):
return True
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/has_trip_ended/distance_check",
time.time(),
t_distance_check.elapsed
)


def get_last_trip_end_point(self, filtered_points_df, last10Points_df, last5MinsPoints_df):
Expand Down

0 comments on commit 199fe4e

Please sign in to comment.