From 7b11c0d8536e2719a325120d2d3603e27ccc8289 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Sat, 2 Nov 2024 20:45:30 -0700 Subject: [PATCH 1/5] added instrumentation to the _get_and_store_range --- emission/pipeline/intake_stage.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index a93ba2996..b681f93d5 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -198,7 +198,13 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - _get_and_store_range(uuid, "analysis/composite_trip") + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) + _get_and_store_range(uuid, "analysis/composite_trip") + + esds.store_pipeline_time(uuid, 'GENERATE_STORE_AND_RANGE', + time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): ts = esta.TimeSeries.get_time_series(user_id) From 7b5ef4fa8efbda09d831d228f451f18813fc4478 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 10:50:54 -0800 Subject: [PATCH 2/5] Segment_Current_Trips --- .../intake/segmentation/trip_segmentation.py | 100 +++++++++++------- 1 file changed, 64 insertions(+), 36 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..7b47bd49b 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -7,6 +7,7 @@ from builtins import * from builtins import object import logging +import time import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.place_queries as esdp @@ -23,6 +24,9 @@ import emission.analysis.intake.segmentation.restart_checking as eaisr import emission.core.common as ecc +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class TripSegmentationMethod(object): def segment_into_trips(self, timeseries, time_query): @@ -47,54 +51,76 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - time_query = epq.get_time_range_for_segmentation(user_id) + with ect.Timer() as t_get_time_series: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed) + + with ect.Timer() as t_get_time_range: + time_query = epq.get_time_range_for_segmentation(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins - point_threshold = 9, - distance_threshold = 100) # 100 m - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 10 mins - point_threshold = 9, - distance_threshold = 50) # 50 m + with ect.Timer() as t_create_time_filter: + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed) + + with ect.Timer() as t_create_dist_filter: + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed) filter_methods = {"time": dstfsm, "distance": dsdfsm} filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} + # We need to use the appropriate filter based on the incoming data # So let's read in the location points for the specified query - loc_df = ts.get_data_df("background/filtered_location", time_query) + with ect.Timer() as t_get_data_df: + loc_df = ts.get_data_df("background/filtered_location", time_query) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_data_df", time.time(), t_get_data_df.elapsed) + if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") epq.mark_segmentation_done(user_id, None) return - out_of_order_points = loc_df[loc_df.ts.diff() < 0] - if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - - filters_in_df = loc_df["filter"].dropna().unique() + with ect.Timer() as t_handle_out_of_order: + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + if len(out_of_order_points) > 0: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed) + + with ect.Timer() as t_get_filters: + filters_in_df = loc_df["filter"].dropna().unique() + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed) + logging.debug("Filters in the dataframe = %s" % filters_in_df) if len(filters_in_df) == 1: # Common case - let's make it easy - - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, - time_query) + with ect.Timer() as t_segment_trips: + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips", time.time(), t_segment_trips.elapsed) else: - segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, - filters_in_df, - filter_methods) + with ect.Timer() as t_get_combined_segmentation: + segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, + filters_in_df, + filter_methods) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_combined_segmentation_points", time.time(), t_get_combined_segmentation.elapsed) + # Create and store trips and places based on the segmentation points if segmentation_points is None: epq.mark_segmentation_failed(user_id) @@ -103,13 +129,15 @@ def segment_current_trips(user_id): logging.debug("len(segmentation_points) == 0, early return") epq.mark_segmentation_done(user_id, None) else: - try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) - epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) - except: - logging.exception("Trip generation failed for user %s" % user_id) - epq.mark_segmentation_failed(user_id) - + with ect.Timer() as t_create_places_trips: + try: + create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) + except: + logging.exception("Trip generation failed for user %s" % user_id) + epq.mark_segmentation_failed(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips", time.time(), t_create_places_trips.elapsed) + def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): """ We can have mixed filters in a particular time range for multiple reasons. From 2a327421d20b4dd5e3989cdd042df9e645bd1707 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 14:35:36 -0800 Subject: [PATCH 3/5] Discussed with Jack, approved with this change to clarify pipeline stage --- emission/pipeline/intake_stage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index b681f93d5..424d85e32 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -199,11 +199,11 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): time.time(), crt.elapsed) with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) _get_and_store_range(uuid, "analysis/composite_trip") - esds.store_pipeline_time(uuid, 'GENERATE_STORE_AND_RANGE', + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): From 0382abf8fb0895cdefaa264aeb26006557ee755d Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 21:27:23 -0800 Subject: [PATCH 4/5] Time Filter --- .../dwell_segmentation_time_filter.py | 254 ++++++++++++------ 1 file changed, 165 insertions(+), 89 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index d75760cd9..4364cdeaf 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd import datetime as pydt +import time # Our imports import emission.analysis.point_features as pf @@ -20,6 +21,9 @@ import emission.core.wrapper.location as ecwl import emission.analysis.intake.segmentation.restart_checking as eaisr +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod): def __init__(self, time_threshold, point_threshold, distance_threshold): @@ -61,14 +65,41 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) - # Sometimes, we can get bogus points because data.ts and - # metadata.write_ts are off by a lot. If we don't do this, we end up - # appearing to travel back in time - # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000] - filtered_points_df.reset_index(inplace=True) - transition_df = timeseries.get_data_df("statemachine/transition", time_query) + with ect.Timer() as t_get_filtered_points_pre: + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_filtered_points_pre_ts_diff_df", + time.time(), + t_get_filtered_points_pre.elapsed + ) + + with ect.Timer() as t_filter_bogus_points: + # Sometimes, we can get bogus points because data.ts and + # metadata.write_ts are off by a lot. If we don't do this, we end up + # appearing to travel back in time + # https://github.com/e-mission/e-mission-server/issues/457 + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] + filtered_points_df.reset_index(inplace=True) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/filter_bogus_points", + time.time(), + t_filter_bogus_points.elapsed + ) + + with ect.Timer() as t_get_transition_df: + transition_df = timeseries.get_data_df("statemachine/transition", time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_transition_df", + time.time(), + t_get_transition_df.elapsed + ) + if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) else: @@ -83,88 +114,133 @@ def segment_into_trips(self, timeseries, time_query): curr_trip_start_point = None just_ended = True prevPoint = None - for idx, row in filtered_points_df.iterrows(): - currPoint = ad.AttrDict(row) - currPoint.update({"idx": idx}) - logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) - if curr_trip_start_point is None: - logging.debug("Appending currPoint because the current start point is None") - # segmentation_points.append(currPoint) - - if just_ended: - if self.continue_just_ended(idx, currPoint, filtered_points_df): - # We have "processed" the currPoint by deciding to glom it - self.last_ts_processed = currPoint.metadata_write_ts - continue - # else: - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - - last5MinsPoints_df = filtered_points_df[np.logical_and( - np.logical_and( - filtered_points_df.ts > currPoint.ts - self.time_threshold, - filtered_points_df.ts < currPoint.ts), - filtered_points_df.ts >= curr_trip_start_point.ts)] - # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. - # Using .iloc just ends up including points after this one. - # So we reset_index upstream and use it here. - # We are going to use the last 8 points for now. - # TODO: Change this back to last 10 points once we normalize phone and this - last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] - distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) - timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts - last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) - logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) - last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) - logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(), - len(last10PointsDistances), - last10PointsDistances.shape)) - - # Fix for https://github.com/e-mission/e-mission-server/issues/348 - last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) - - logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % - (len(last10PointsDistances), len(last5MinsDistances))) - logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % - (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - - if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): - (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, - last10Points_df, last5MinsPoints_df) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - if ended_before_this: - # in this case, we end a trip at the previous point, and the next trip starts at this - # point, not the next one + + with ect.Timer() as t_loop: + for idx, row in filtered_points_df.iterrows(): + currPoint = ad.AttrDict(row) + currPoint.update({"idx": idx}) + logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) + if curr_trip_start_point is None: + logging.debug("Appending currPoint because the current start point is None") + # segmentation_points.append(currPoint) + + if just_ended: + if self.continue_just_ended(idx, currPoint, filtered_points_df): + # We have "processed" the currPoint by deciding to glom it + self.last_ts_processed = currPoint.metadata_write_ts + continue + # else: + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point just_ended = False - prevPoint = currPoint - curr_trip_start_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % - (currPoint, currPoint.idx)) - else: - # We end a trip at the current point, and the next trip starts at the next point - just_ended = True - prevPoint = None - else: - prevPoint = currPoint - - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)] - logging.debug("looking after %s, found transitions %s" % - (currPoint.ts, stopped_moving_after_last)) - if len(stopped_moving_after_last) > 0: - (unused, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, - last10Points_df, None) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts + + with ect.Timer() as t_calculations: + last5MinsPoints_df = filtered_points_df[np.logical_and( + np.logical_and( + filtered_points_df.ts > currPoint.ts - self.time_threshold, + filtered_points_df.ts < currPoint.ts + ), + filtered_points_df.ts >= curr_trip_start_point.ts + )] + # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. + # Using .iloc just ends up including points after this one. + # So we reset_index upstream and use it here. + # We are going to use the last 8 points for now. + # TODO: Change this back to last 10 points once we normalize phone and this + last10Points_df = filtered_points_df.iloc[ + max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 + ] + distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) + timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts + last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) + logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) + last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) + logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( + last10PointsDistances.to_numpy(), + len(last10PointsDistances), + last10PointsDistances.shape + )) + + # Fix for https://github.com/e-mission/e-mission-server/issues/348 + last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) + + logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % + (len(last10PointsDistances), len(last5MinsDistances))) + logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % + (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) + + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/calculations_per_iteration", + time.time(), + t_calculations.elapsed + ) + + with ect.Timer() as t_has_trip_ended: + if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): + (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + last5MinsPoints_df + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + if ended_before_this: + # in this case, we end a trip at the previous point, and the next trip starts at this + # point, not the next one + just_ended = False + prevPoint = currPoint + curr_trip_start_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % + (currPoint, currPoint.idx)) + else: + # We end a trip at the current point, and the next trip starts at the next point + just_ended = True + prevPoint = None + else: + prevPoint = currPoint + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/has_trip_ended", + time.time(), + t_has_trip_ended.elapsed + ) + + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/loop", + time.time(), + t_loop.elapsed + ) + + with ect.Timer() as t_post_loop: + logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % + (just_ended, len(transition_df))) + if not just_ended and len(transition_df) > 0: + stopped_moving_after_last = transition_df[ + (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + ] + logging.debug("looking after %s, found transitions %s" % + (currPoint.ts, stopped_moving_after_last)) + if len(stopped_moving_after_last) > 0: + (unused, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + None + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/post_loop", + time.time(), + t_post_loop.elapsed + ) return segmentation_points From 1a5a0d9eb8926e9bc9679a0cb657516d5d9bb27d Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 22:10:44 -0800 Subject: [PATCH 5/5] Dist Filter fixed dist name Added name to time --- .../dwell_segmentation_dist_filter.py | 260 ++++++++++++------ .../dwell_segmentation_time_filter.py | 14 +- 2 files changed, 185 insertions(+), 89 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index ea53c9abb..2ffd6f058 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -12,6 +12,7 @@ import attrdict as ad import numpy as np import datetime as pydt +import time # Our imports import emission.analysis.point_features as pf @@ -20,6 +21,9 @@ import emission.analysis.intake.segmentation.restart_checking as eaisr import emission.analysis.intake.segmentation.trip_segmentation_methods.trip_end_detection_corner_cases as eaistc +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class DwellSegmentationDistFilter(eaist.TripSegmentationMethod): def __init__(self, time_threshold, point_threshold, distance_threshold): @@ -46,9 +50,34 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) - self.filtered_points_df.loc[:,"valid"] = True - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + with ect.Timer() as t_get_filtered_points: + self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = self.filtered_points_df["user_id"].iloc[0] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_filtered_points_df", + time.time(), + t_get_filtered_points.elapsed + ) + + with ect.Timer() as t_mark_valid: + self.filtered_points_df.loc[:, "valid"] = True + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid", + time.time(), + t_mark_valid.elapsed + ) + + with ect.Timer() as t_get_transition_df: + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df", + time.time(), + t_get_transition_df.elapsed + ) + if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) else: @@ -62,86 +91,153 @@ def segment_into_trips(self, timeseries, time_query): last_trip_end_point = None curr_trip_start_point = None just_ended = True - for idx, row in self.filtered_points_df.iterrows(): - currPoint = ad.AttrDict(row) - currPoint.update({"idx": idx}) - logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) - if curr_trip_start_point is None: - logging.debug("Appending currPoint because the current start point is None") - # segmentation_points.append(currPoint) - - if just_ended: - if self.continue_just_ended(idx, currPoint, self.filtered_points_df): - # We have "processed" the currPoint by deciding to glom it - self.last_ts_processed = currPoint.metadata_write_ts - continue - # else: - # Here's where we deal with the start trip. At this point, the - # distance is greater than the filter. - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - else: - # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. - # Using .iloc just ends up including points after this one. - # So we reset_index upstream and use it here. - last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] - lastPoint = self.find_last_valid_point(idx) - if self.has_trip_ended(lastPoint, currPoint, timeseries): - last_trip_end_point = lastPoint - logging.debug("Appending last_trip_end_point %s with index %s " % - (last_trip_end_point, idx-1)) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - just_ended = True - # Now, we have finished processing the previous point as a trip - # end or not. But we still need to process this point by seeing - # whether it should represent a new trip start, or a glom to the - # previous trip - if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + + with ect.Timer() as t_loop: + for idx, row in self.filtered_points_df.iterrows(): + currPoint = ad.AttrDict(row) + currPoint.update({"idx": idx}) + logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) + + if curr_trip_start_point is None: + logging.debug("Appending currPoint because the current start point is None") + # segmentation_points.append(currPoint) + + if just_ended: + with ect.Timer() as t_continue_just_ended: + continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended", + time.time(), + t_continue_just_ended.elapsed + ) + + if continue_flag: + # We have "processed" the currPoint by deciding to glom it + self.last_ts_processed = currPoint.metadata_write_ts + continue + # else: + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + with ect.Timer() as t_set_start_point: curr_trip_start_point = sel_point - just_ended = False - - # Since we only end a trip when we start a new trip, this means that - # the last trip that was pushed is ignored. Consider the example of - # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, - # so during that remote push, a trip end was not detected. And we got - # back home shortly after 5pm, so the trip end was only detected on the - # phone at 6pm. At that time, the following points were pushed: - # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, - # ..., 2016-02-22T16:57:04 - # Then, on the server, while iterating through the points, we detected - # a trip end at 16:04, and a new trip start at 16:49. But we did not - # detect the trip end at 16:57, because we didn't start a new point. - # This has two issues: - # - we won't see this trip until the next trip start, which may be on the next day - # - we won't see this trip at all, because when we run the pipeline the - # next time, we will only look at points from that time onwards. These - # points have been marked as "processed", so they won't even be considered. - - # There are multiple potential fixes: - # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) - # - we can mark a trip end based on the fact that we only push data - # when a trip ends, so if we have data, it means that the trip has been - # detected as ended on the phone. - # This seems a bit fragile - what if we start pushing incomplete trip - # data for efficiency reasons? Therefore, we also check to see if there - # is a trip_end_detected in this timeframe after the last point. If so, - # then we end the trip at the last point that we have. - if not just_ended and len(self.transition_df) > 0: - stopped_moving_after_last = self.transition_df[(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)] - logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) - if len(stopped_moving_after_last) > 0: - logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) - segmentation_points.append((curr_trip_start_point, currPoint)) - self.last_ts_processed = currPoint.metadata_write_ts - else: - logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point", + time.time(), + t_set_start_point.elapsed + ) + just_ended = False + else: + with ect.Timer() as t_process_trip: + # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. + # Using .iloc just ends up including points after this one. + # So we reset_index upstream and use it here. + last10Points_df = self.filtered_points_df.iloc[ + max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 + ] + lastPoint = self.find_last_valid_point(idx) + with ect.Timer() as t_has_trip_ended: + trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended", + time.time(), + t_has_trip_ended.elapsed + ) + + if trip_ended: + with ect.Timer() as t_get_last_trip_end_point: + last_trip_end_point = lastPoint + logging.debug("Appending last_trip_end_point %s with index %s " % + (last_trip_end_point, idx - 1)) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point", + time.time(), + t_get_last_trip_end_point.elapsed + ) + + with ect.Timer() as t_handle_trip_end: + just_ended = True + # Now, we have finished processing the previous point as a trip + # end or not. But we still need to process this point by seeing + # whether it should represent a new trip start, or a glom to the + # previous trip + if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point + just_ended = False + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end", + time.time(), + t_handle_trip_end.elapsed + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop", + time.time(), + t_loop.elapsed + ) + + with ect.Timer() as t_post_loop: + # Since we only end a trip when we start a new trip, this means that + # the last trip that was pushed is ignored. Consider the example of + # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, + # so during that remote push, a trip end was not detected. And we got + # back home shortly after 5pm, so the trip end was only detected on the + # phone at 6pm. At that time, the following points were pushed: + # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, + # ..., 2016-02-22T16:57:04 + # Then, on the server, while iterating through the points, we detected + # a trip end at 16:04, and a new trip start at 16:49. But we did not + # detect the trip end at 16:57, because we didn't start a new point. + # This has two issues: + # - we won't see this trip until the next trip start, which may be on the next day + # - we won't see this trip at all, because when we run the pipeline the + # next time, we will only look at points from that time onwards. These + # points have been marked as "processed", so they won't even be considered. + + # There are multiple potential fixes: + # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) + # - we can mark a trip end based on the fact that we only push data + # when a trip ends, so if we have data, it means that the trip has been + # detected as ended on the phone. + # This seems a bit fragile - what if we start pushing incomplete trip + # data for efficiency reasons? Therefore, we also check to see if there + # is a trip_end_detected in this timeframe after the last point. If so, + # then we end the trip at the last point that we have. + if not just_ended and len(self.transition_df) > 0: + with ect.Timer() as t_check_transitions: + stopped_moving_after_last = self.transition_df[ + (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) + ] + logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) + if len(stopped_moving_after_last) > 0: + logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) + segmentation_points.append((curr_trip_start_point, currPoint)) + self.last_ts_processed = currPoint.metadata_write_ts + else: + logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop", + time.time(), + t_check_transitions.elapsed + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop", + time.time(), + t_post_loop.elapsed + ) + return segmentation_points def has_trip_ended(self, lastPoint, currPoint, timeseries): diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index 4364cdeaf..3febdca20 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -70,7 +70,7 @@ def segment_into_trips(self, timeseries, time_query): user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_filtered_points_pre_ts_diff_df", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df", time.time(), t_get_filtered_points_pre.elapsed ) @@ -86,7 +86,7 @@ def segment_into_trips(self, timeseries, time_query): filtered_points_df.reset_index(inplace=True) esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/filter_bogus_points", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", time.time(), t_filter_bogus_points.elapsed ) @@ -95,7 +95,7 @@ def segment_into_trips(self, timeseries, time_query): transition_df = timeseries.get_data_df("statemachine/transition", time_query) esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_transition_df", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", time.time(), t_get_transition_df.elapsed ) @@ -172,7 +172,7 @@ def segment_into_trips(self, timeseries, time_query): esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/calculations_per_iteration", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculations_per_iteration", time.time(), t_calculations.elapsed ) @@ -204,14 +204,14 @@ def segment_into_trips(self, timeseries, time_query): prevPoint = currPoint esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/has_trip_ended", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended", time.time(), t_has_trip_ended.elapsed ) esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/loop", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop", time.time(), t_loop.elapsed ) @@ -237,7 +237,7 @@ def segment_into_trips(self, timeseries, time_query): self.last_ts_processed = currPoint.metadata_write_ts esds.store_pipeline_time( user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/post_loop", + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop", time.time(), t_post_loop.elapsed )