diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py index 0d53932fe..31f738c8e 100644 --- a/emission/analysis/intake/segmentation/restart_checking.py +++ b/emission/analysis/intake/segmentation/restart_checking.py @@ -61,7 +61,7 @@ def tracking_restarted_in_loc_df(loc_df, transition_df): return tracking_restarted -def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=None): +def is_tracking_restarted_in_range(start_ts, end_ts, transition_df=None): """ Check to see if tracing was restarted between the times specified :param start_ts: the start of the time range to check @@ -70,15 +70,9 @@ def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=N :param transition_df: dataframe of transitions to use (if None, will be fetched from timeseries) :return: """ - if transition_df is not None: - transition_df = transition_df[ - (transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts) - ] - else: - import emission.storage.timeseries.timequery as estt - tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, - endTs=end_ts) - transition_df = timeseries.get_data_df("statemachine/transition", tq) + transition_df = transition_df[ + (transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts) + ] if len(transition_df) == 0: logging.debug("In range %s -> %s found no transitions" % diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 2952a576d..8a6f61761 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -81,6 +81,10 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) return + # Reading in the other sensor data + transition_df = ts.get_data_df("statemachine/transition", time_query) + motion_df = ts.get_data_df("background/motion_activity", time_query) + out_of_order_points = loc_df[loc_df.ts.diff() < 0] if len(out_of_order_points) > 0: logging.info("Found out of order points!") @@ -100,13 +104,11 @@ def segment_current_trips(user_id): if len(filters_in_df) == 1: # Common case - let's make it easy with ect.Timer() as t_segment_trips: - transition_df = ts.get_data_df("statemachine/transition", time_query) - motion_df = ts.get_data_df("background/motion_activity", time_query) segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(loc_df, transition_df, motion_df) esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips", time.time(), t_segment_trips.elapsed) else: with ect.Timer() as t_get_combined_segmentation: - segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, + segmentation_points = get_combined_segmentation_points(loc_df, transition_df, motion_df, time_query, filters_in_df, filter_methods) esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_combined_segmentation_points", time.time(), t_get_combined_segmentation.elapsed) @@ -121,14 +123,14 @@ def segment_current_trips(user_id): else: with ect.Timer() as t_create_places_trips: try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + create_places_and_trips(user_id, segmentation_points, transition_df, filter_method_names[filters_in_df[0]]) epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) except: logging.exception("Trip generation failed for user %s" % user_id) epq.mark_segmentation_failed(user_id) esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips", time.time(), t_create_places_trips.elapsed) -def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): +def get_combined_segmentation_points(loc_df, transition_df, motion_df, time_query, filters_in_df, filter_methods): """ We can have mixed filters in a particular time range for multiple reasons. a) user switches phones from one platform to another @@ -169,8 +171,10 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt (curr_filter, time_query.startTs, time_query.endTs)) curr_filter_loc_df = loc_df.loc[startIndex:endIndex] curr_filter_loc_df.reset_index(drop=True, inplace=True) - curr_filter_transition_df = ts.get_data_df("statemachine/transition", time_query) - curr_filter_motion_df = ts.get_data_df("background/motion_activity", time_query) + curr_filter_transition_df = transition_df.query("@time_query.startTs <= metadata_write_ts <= @time_query.endTs") + curr_filter_transition_df.reset_index(drop=True, inplace=True) + curr_filter_motion_df = motion_df.query("@time_query.startTs <= metadata_write_ts <= @time_query.endTs") + curr_filter_motion_df.reset_index(drop=True, inplace=True) segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(curr_filter_loc_df, curr_filter_transition_df, curr_filter_motion_df) logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys())) sortedStartTsList = sorted(segmentation_map.keys()) @@ -193,7 +197,7 @@ def get_last_ts_processed(filter_methods): logging.info("Returning last_ts_processed = %s" % last_ts_processed) return last_ts_processed -def create_places_and_trips(user_id, segmentation_points, segmentation_method_name): +def create_places_and_trips(user_id, segmentation_points, transition_df, segmentation_method_name): # new segments, need to deal with them # First, retrieve the last place so that we can stitch it to the newly created trip. # Again, there are easy and hard. In the easy case, the trip was @@ -236,7 +240,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na new_place_entry = ecwe.Entry.create_entry(user_id, "segmentation/raw_place", new_place, create_id = True) - if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): + if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name): # Fill in the gap in the chain with an untracked period curr_untracked = ecwut.Untrackedtime() curr_untracked.source = segmentation_method_name @@ -276,7 +280,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start # it will be lost ts.update(last_place_entry) -def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name): +def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name): """ Check to see whether the two places are the same. This is a fix for https://github.com/e-mission/e-mission-server/issues/378 @@ -292,7 +296,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted") return False - if _is_tracking_restarted(last_place, start_loc, timeseries): + if _is_tracking_restarted(last_place, start_loc, transition_df): logging.debug("tracking has been restarted, returning True") return True @@ -400,6 +404,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc): new_place_entry["data"] = new_place curr_trip_entry["data"] = curr_trip -def _is_tracking_restarted(last_place, start_loc, timeseries): - return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries) +def _is_tracking_restarted(last_place, start_loc, transition_df): + return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df)