From bc7ea0138700d6be36fa4eb911e7092da53d57ff Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Fri, 7 Mar 2025 01:31:02 -0500 Subject: [PATCH 1/2] read all entries upfront during section segmentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since DB calls are a bottleneck, we can reduce the number of DB calls by making upfront queries for all the required types of entries we'll need across the entire time range we are sectioning. Then we can filter those down accordingly. These include: segmentation/raw_place, background/bluetooth_ble, background/motion_activity, background/location, and background/filtered_location (For some edge cases, statemachine/transition is used – but seeing as it will only be used in those specific cases, it doesn't make sense to load it upfront) The motion activity and locations are used as dataframes later on, so I used get_data_df for those. trips, places, and ble are queried with find_entries. Then these are plumbed through to segment_trip_into_sections, filtered to the time range of the current trip, and ultimately passed into each SectionSegmentationMethod's segment_into_sections With the locations already pre-loaded as dataframes, I was able to remove get_location_streams_for_trip altogether For _get_distance_from_start_place_to_end, we preload all places in the segmentation range so we can usually find it in memory by its _id. However, unit tests revealed edge cases where the first start place is before the current segmentation time range, so we must fallback to querying the DB via esda.get_object --- .../segmentation/section_segmentation.py | 110 ++++++++++++------ .../flip_flop_detection.py | 12 +- .../smoothed_high_confidence_motion.py | 54 +++++---- ..._high_confidence_with_visit_transitions.py | 60 ++++++---- emission/storage/pipeline_queries.py | 2 +- .../intakeTests/TestSectionSegmentation.py | 31 ++++- 6 files changed, 171 insertions(+), 98 deletions(-) diff --git a/emission/analysis/intake/segmentation/section_segmentation.py b/emission/analysis/intake/segmentation/section_segmentation.py index ffcd2dadb..e15f6da65 100644 --- a/emission/analysis/intake/segmentation/section_segmentation.py +++ b/emission/analysis/intake/segmentation/section_segmentation.py @@ -15,9 +15,11 @@ import emission.storage.decorations.analysis_timeseries_queries as esda import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.timeseries.timequery as estt import emission.core.wrapper.motionactivity as ecwm import emission.core.wrapper.location as ecwl +import emission.core.wrapper.localdate as ecwld import emission.core.wrapper.section as ecwc import emission.core.wrapper.stop as ecws import emission.core.wrapper.entry as ecwe @@ -26,7 +28,7 @@ import emcommon.bluetooth.ble_matching as emcble class SectionSegmentationMethod(object): - def segment_into_sections(self, timeseries, distance_from_place, time_query): + def segment_into_sections(self, timeseries, time_query, distance_from_place, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df): """ Examines the timeseries database for a specific range and returns the points at which the trip needs to be segmented. Again, this allows @@ -46,12 +48,32 @@ def segment_into_sections(self, timeseries, distance_from_place, time_query): pass def segment_current_sections(user_id): - time_query = epq.get_time_range_for_sectioning(user_id) try: - trips_to_process = esda.get_entries(esda.RAW_TRIP_KEY, user_id, time_query) + ts = esta.TimeSeries.get_time_series(user_id) + time_query = epq.get_time_range_for_sectioning(user_id) + trips_to_process = ts.find_entries([esda.RAW_TRIP_KEY], time_query) + + time_query.timeType = 'data.exit_ts' + places_in_range = ts.find_entries([esda.RAW_PLACE_KEY], time_query) + + time_query.timeType = 'data.ts' + ble_list = ts.find_entries(['background/bluetooth_ble'], time_query) + motion_df = ts.get_data_df('background/motion_activity', time_query) + unfiltered_loc_df = ts.get_data_df('background/location', time_query) + filtered_loc_df = ts.get_data_df('background/filtered_location', time_query) + for trip_entry in trips_to_process: - logging.info("+" * 20 + ("Processing trip %s for user %s" % (trip_entry.get_id(), user_id)) + "+" * 20) - segment_trip_into_sections(user_id, trip_entry, trip_entry.data.source) + dist_from_place = _get_distance_from_start_place_to_end(trip_entry, places_in_range) + segment_trip_into_sections( + ts, + trip_entry, + dist_from_place, + trip_entry['data']['source'], + ble_list, + motion_df, + unfiltered_loc_df, + filtered_loc_df, + ) if len(trips_to_process) == 0: # Didn't process anything new so start at the same point next time last_trip_processed = None @@ -62,11 +84,17 @@ def segment_current_sections(user_id): logging.exception("Sectioning failed for user %s" % user_id) epq.mark_sectioning_failed(user_id) -def segment_trip_into_sections(user_id, trip_entry, trip_source): - ts = esta.TimeSeries.get_time_series(user_id) - time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, trip_entry.get_id()) - distance_from_place = _get_distance_from_start_place_to_end(trip_entry) - ble_entries_during_trip = ts.find_entries(["background/bluetooth_ble"], time_query) + +def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df): + trip_tq = estt.TimeQuery("data.ts", trip_entry['data']['start_ts'], trip_entry['data']['end_ts']) + + trip_ble_list = [e for e in ble_list + if e["data"]["ts"] >= trip_tq.startTs + and e["data"]["ts"] <= trip_tq.endTs] + ts_in_tq = "@trip_tq.startTs <= ts <= @trip_tq.endTs" + trip_motion_df = motion_df.query(ts_in_tq) if len(motion_df) else motion_df + trip_unfiltered_loc_df = unfiltered_loc_df.query(ts_in_tq) if len(unfiltered_loc_df) else unfiltered_loc_df + trip_filtered_loc_df = filtered_loc_df.query(ts_in_tq) if len(filtered_loc_df) else filtered_loc_df if (trip_source == "DwellSegmentationTimeFilter"): import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as shcm @@ -84,8 +112,15 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source): ecwm.MotionTypes.STILL, ecwm.MotionTypes.NONE, # iOS only ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE]) # iOS only - - segmentation_points = shcmsm.segment_into_sections(ts, distance_from_place, time_query) + segmentation_points = shcmsm.segment_into_sections( + ts, + trip_tq, + distance_from_place, + trip_ble_list, + trip_motion_df, + trip_unfiltered_loc_df, + trip_filtered_loc_df + ) # Since we are segmenting an existing trip into sections, we do not need to worry about linking with # a prior place, since it will be linked through the trip object. @@ -97,22 +132,19 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source): # TODO: Should we link the locations to the trips this way, or by using a foreign key? # If we want to use a foreign key, then we need to include the object id in the data df as well so that we can # set it properly. - ts = esta.TimeSeries.get_time_series(user_id) - get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"]) - trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts) - trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts) + trip_start_loc = ecwl.Location(trip_filtered_loc_df.iloc[0]) + trip_end_loc = ecwl.Location(trip_filtered_loc_df.iloc[-1]) logging.debug("trip_start_loc = %s, trip_end_loc = %s" % (trip_start_loc, trip_end_loc)) for (i, (start_loc_doc, end_loc_doc, sensed_mode)) in enumerate(segmentation_points): logging.debug("start_loc_doc = %s, end_loc_doc = %s" % (start_loc_doc, end_loc_doc)) - get_loc_for_row = lambda row: ts.df_row_to_entry("background/filtered_location", row).data - start_loc = get_loc_for_row(start_loc_doc) - end_loc = get_loc_for_row(end_loc_doc) + start_loc = ecwl.Location(start_loc_doc) + end_loc = ecwl.Location(end_loc_doc) logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc)) section = ecwc.Section() - section.trip_id = trip_entry.get_id() + section.trip_id = trip_entry['_id'] if prev_section_entry is None: # This is the first point, so we want to start from the start of the trip, not the start of this segment start_loc = trip_start_loc @@ -127,23 +159,23 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source): # Later, we may want to actually use BLE sensor data as part of the basis for segmentation dynamic_config = eadc.get_dynamic_config() ble_sensed_mode = emcble.get_ble_sensed_vehicle_for_section( - ble_entries_during_trip, start_loc.ts, end_loc.ts, dynamic_config + trip_ble_list, start_loc.ts, end_loc.ts, dynamic_config ) fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode) # We create the entry after filling in the section so that we know # that the data is included properly - section_entry = ecwe.Entry.create_entry(user_id, esda.RAW_SECTION_KEY, + section_entry = ecwe.Entry.create_entry(ts.user_id, esda.RAW_SECTION_KEY, section, create_id=True) if prev_section_entry is not None: # If this is not the first section, create a stop to link the two sections together # The expectation is prev_section -> stop -> curr_section stop = ecws.Stop() - stop.trip_id = trip_entry.get_id() - stop_entry = ecwe.Entry.create_entry(user_id, - esda.RAW_STOP_KEY, - stop, create_id=True) + stop.trip_id = trip_entry['_id'] + stop_entry = ecwe.Entry.create_entry(ts.user_id, + esda.RAW_STOP_KEY, + stop, create_id=True) logging.debug("stop = %s, stop_entry = %s" % (stop, stop_entry)) stitch_together(prev_section_entry, stop_entry, section_entry) ts.insert(stop_entry) @@ -157,12 +189,12 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source): def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None): section.start_ts = start_loc.ts - section.start_local_dt = start_loc.local_dt + section.start_local_dt = ecwld.LocalDate.get_local_date(start_loc.ts, start_loc['local_dt_timezone']) section.start_fmt_time = start_loc.fmt_time section.end_ts = end_loc.ts try: - section.end_local_dt = end_loc.local_dt + section.end_local_dt = ecwld.LocalDate.get_local_date(end_loc.ts, end_loc['local_dt_timezone']) except AttributeError as e: print(end_loc) section.end_fmt_time = end_loc.fmt_time @@ -206,14 +238,20 @@ def stitch_together(ending_section_entry, stop_entry, starting_section_entry): stop_entry["data"] = stop starting_section_entry["data"] = starting_section -def _get_distance_from_start_place_to_end(raw_trip_entry): - import emission.core.common as ecc - start_place_id = raw_trip_entry.data.start_place - start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id) - dist = ecc.calDistance(start_place.location.coordinates, - raw_trip_entry.data.end_loc.coordinates) +def _get_distance_from_start_place_to_end(raw_trip_entry, raw_places): + start_place_id = raw_trip_entry['data']['start_place'] + start_place_coords = None + for place in raw_places: + if place['_id'] == start_place_id: + logging.debug('start place found in list') + start_place_coords = place['data']['location']['coordinates'] + if not start_place_coords: + logging.debug('start place not found in list, getting by id') + start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id) + start_place_coords = start_place.location.coordinates + dist = ecc.calDistance(start_place_coords, + raw_trip_entry['data']['end_loc']['coordinates']) logging.debug("Distance from raw_place %s to the end of raw_trip_entry %s = %s" % - (start_place_id, raw_trip_entry.get_id(), dist)) + (start_place_id, raw_trip_entry['_id'], dist)) return dist - diff --git a/emission/analysis/intake/segmentation/section_segmentation_methods/flip_flop_detection.py b/emission/analysis/intake/segmentation/section_segmentation_methods/flip_flop_detection.py index 78609deed..4e4e4edc6 100644 --- a/emission/analysis/intake/segmentation/section_segmentation_methods/flip_flop_detection.py +++ b/emission/analysis/intake/segmentation/section_segmentation_methods/flip_flop_detection.py @@ -88,7 +88,7 @@ def is_flip_flop(self, start_motion, end_motion): return True else: streak_locs = self.seg_method.filter_points_for_range( - self.seg_method.location_points, start_motion, end_motion) + self.seg_method.filtered_loc_df, start_motion, end_motion) logging.debug("in is_flip_flop: len(streak_locs) = %d" % len(streak_locs)) if len(streak_locs) == 0: return True @@ -227,7 +227,7 @@ def check_no_location_walk(self, streak_start, streak_end): ssm, sem = self.motion_changes[streak_start] streak_locs = self.seg_method.filter_points_for_range( - self.seg_method.location_points, ssm, sem) + self.seg_method.filtered_loc_df, ssm, sem) streak_unfiltered_locs = self.seg_method.filter_points_for_range( self.seg_method.unfiltered_loc_df, ssm, sem) @@ -297,17 +297,17 @@ def get_merge_direction(self, streak_start, streak_end): return MergeResult(Direction.FORWARD, FinalMode.UNMERGED) loc_points = self.seg_method.filter_points_for_range( - self.seg_method.location_points, ssm, eem) + self.seg_method.filtered_loc_df, ssm, eem) loc_points.reset_index(inplace=True) with_speed_loc_points = eaicl.add_dist_heading_speed(loc_points) points_before = self.seg_method.filter_points_for_range( - self.seg_method.location_points, bsm, bem) + self.seg_method.filtered_loc_df, bsm, bem) points_before.reset_index(inplace=True) with_speed_points_before = eaicl.add_dist_heading_speed(points_before) points_after = self.seg_method.filter_points_for_range( - self.seg_method.location_points, asm, aem) + self.seg_method.filtered_loc_df, asm, aem) points_after.reset_index(inplace=True) with_speed_points_after = eaicl.add_dist_heading_speed(points_after) @@ -407,7 +407,7 @@ def validate_motorized(self, mcs, mce): def get_median_speed(self, mcs, mce): loc_df = self.seg_method.filter_points_for_range( - self.seg_method.location_points, mcs, mce) + self.seg_method.filtered_loc_df, mcs, mce) if len(loc_df) > 0: loc_df.reset_index(inplace=True) with_speed_df = eaicl.add_dist_heading_speed(loc_df) diff --git a/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_motion.py b/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_motion.py index d793dcfe2..8f914b6ae 100644 --- a/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_motion.py +++ b/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_motion.py @@ -19,7 +19,6 @@ import emission.core.common as ecc import emission.analysis.config as eac -import emission.analysis.intake.location_utils as eail import emission.analysis.intake.domain_assumptions as eaid class SmoothedHighConfidenceMotion(eaiss.SectionSegmentationMethod): @@ -46,31 +45,30 @@ def is_filtered(self, curr_activity_doc): logging.debug("%s, returning False" % activity_dump) return False - def segment_into_motion_changes(self, timeseries, time_query): + def segment_into_motion_changes(self, motion_df): """ Use the motion changes detected on the phone to detect sections (consecutive chains of points) that have a consistent motion. - :param timeseries: the time series for this user - :param time_query: the range to consider for segmentation + + :param motion_df: dataframe of background/motion_activity during this trip :return: a list of tuples [(start_motion, end_motion)] that represent the ranges with a consistent motion. The gap between end_motion[n] and start_motion[n+1] represents the transition between the activities. We don't actually know the motion/activity in that range with any level of confidence. We need a policy on how to deal with them (combine with first, combine with second, split in the middle). This policy can be enforced when we map the activity changes to locations. """ - motion_df = timeseries.get_data_df("background/motion_activity", time_query) filter_mask = motion_df.apply(self.is_filtered, axis=1) # Calling np.nonzero on the filter_mask even if it was related trips with zero sections # has not been a problem before this - the subsequent check on the # length of the filtered dataframe was sufficient. But now both Tom and # I have hit it (on 18th and 21st of Sept) so let's handle it proactively here. - if filter_mask.shape == (0,0): - logging.info("Found filter_mask with shape (0,0), returning blank") + if len(filter_mask) == 0: + logging.info("Found filter_mask with len 0, returning blank") return [] # Replace RUNNING with WALKING so that we don't get extra, incorrect sections # https://github.com/e-mission/e-mission-server/issues/577#issuecomment-379496118 - motion_df["type"].replace([8], 7, inplace=True) + motion_df.loc[motion_df["type"] == 8, "type"] = 7 logging.debug("filtered points %s" % np.nonzero(filter_mask.to_numpy())) logging.debug("motion_df = %s" % motion_df.head()) @@ -123,23 +121,33 @@ def segment_into_motion_changes(self, timeseries, time_query): # Overridden in smoothed_high_confidence_with_visit_transitions.py. # Consider porting any changes there as well if applicable. - def segment_into_sections(self, timeseries, distance_from_place, time_query): + def segment_into_sections(self, timeseries, time_query, distance_from_place, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df): """ Determine locations within the specified time that represent segmentation points for a trip. + :param timeseries: the time series for this user :param time_query: the range to consider for segmentation + :param distance_from_place: distance in m from the start place of the current trip to its end loc + :param ble_list: list of background/ble documents during this trip + :param motion_df: dataframe of background/motion_activity during this trip + :param unfiltered_loc_df: dataframe of background/location points during this trip + :param filtered_loc_df: dataframe of background/filtered_location points during this trip :return: a list of tuples [(start1, end1), (start2, end2), ...] that represent the start and end of sections - in this time range. end[n] and start[n+1] are typically assumed to be adjacent. + in this time range. end[n] and start[n+1] are typically assumed to be adjacent. """ - self.get_location_streams_for_trip(timeseries, time_query) - motion_changes = self.segment_into_motion_changes(timeseries, time_query) + self.ble_list = ble_list + self.motion_df = motion_df + self.unfiltered_loc_df = unfiltered_loc_df + self.filtered_loc_df = filtered_loc_df + + motion_changes = self.segment_into_motion_changes(motion_df) - if len(self.location_points) == 0: - logging.debug("No location points found for query %s, returning []" % time_query) + if len(filtered_loc_df) == 0: + logging.debug("There are no points in the trip. How the heck did we segment it?") return [] - fp = self.location_points.iloc[0] - lp = self.location_points.iloc[-1] + fp = filtered_loc_df.iloc[0] + lp = filtered_loc_df.iloc[-1] # Create sections for each motion. At this point, we need to decide a policy on how to deal with the gaps. # Let's pick a reasonable default for now. @@ -149,8 +157,8 @@ def segment_into_sections(self, timeseries, distance_from_place, time_query): logging.debug("Considering %s from %s -> %s" % (start_motion.type, start_motion.fmt_time, end_motion.fmt_time)) # Find points that correspond to this section - raw_section_df = self.location_points[(self.location_points.ts >= start_motion.ts) & - (self.location_points.ts <= end_motion.ts)] + raw_section_df = filtered_loc_df[(filtered_loc_df.ts >= start_motion.ts) & + (filtered_loc_df.ts <= end_motion.ts)] if len(raw_section_df) == 0: logging.info("Found no location points between %s and %s" % (start_motion, end_motion)) else: @@ -181,16 +189,6 @@ def segment_into_sections(self, timeseries, distance_from_place, time_query): return section_list - def get_location_streams_for_trip(self, timeseries, time_query): - # Let's also read the unfiltered locations so that we can combine them with - # the sampled locations - self.unfiltered_loc_df = timeseries.get_data_df("background/location", time_query) - self.location_points = timeseries.get_data_df("background/filtered_location", time_query) - # Location points can have big gaps. Let's extrapolate them so that we - # can use them better. - # https://github.com/e-mission/e-mission-server/issues/577#issuecomment-377323407 - self.resampled_loc_df = eail.resample(self.location_points, interval = 10) - def filter_points_for_range(self, df, start_motion, end_motion): """ Gets the points from the dataframe that are in the range (sm.ts, em.ts) diff --git a/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_with_visit_transitions.py b/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_with_visit_transitions.py index a9e63fe2f..cac813586 100644 --- a/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_with_visit_transitions.py +++ b/emission/analysis/intake/segmentation/section_segmentation_methods/smoothed_high_confidence_with_visit_transitions.py @@ -12,6 +12,7 @@ import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as eaisms import emission.core.wrapper.motionactivity as ecwm import emission.core.wrapper.location as ecwl +import emission.core.wrapper.localdate as ecwld class SmoothedHighConfidenceMotionWithVisitTransitions(eaisms.SmoothedHighConfidenceMotion): def create_unknown_section(self, location_points_df): @@ -36,7 +37,7 @@ def get_section_if_applicable(self, timeseries, distance_from_start, time_query, if distance_from_start > self.distance_threshold: logging.debug("found distance %s > threshold %s, returning dummy section" % (distance_from_start, self.distance_threshold)) - return self.create_unknown_section(self.location_points) + return self.create_unknown_section(self.filtered_loc_df) visit_ended_transition_df = transition_df[transition_df.transition == 14] if len(visit_ended_transition_df) == 0: @@ -46,39 +47,46 @@ def get_section_if_applicable(self, timeseries, distance_from_start, time_query, # We have a visit transition, so we have a pretty good idea that # this is a real section. So let's create a dummy section for it and return logging.debug("found visit transition %s, returning dummy section" % visit_ended_transition_df[["transition", "fmt_time"]]) - return self.create_unknown_section(self.location_points) + return self.create_unknown_section(self.filtered_loc_df) def extend_activity_to_location(self, motion_change, location_point): new_mc = ecwm.Motionactivity({ 'type': motion_change.type, 'confidence': motion_change.confidence, - 'ts': location_point.data.ts, - 'local_dt': location_point.data.local_dt, - 'fmt_time': location_point.data.fmt_time + 'ts': location_point.ts, + 'local_dt': ecwld.LocalDate.get_local_date(location_point.ts, location_point['local_dt_timezone']), + 'fmt_time': location_point.fmt_time }) return new_mc - def segment_into_sections(self, timeseries, distance_from_place, time_query): + def segment_into_sections(self, timeseries, time_query, distance_from_place, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df): """ Determine locations within the specified time that represent segmentation points for a trip. + :param timeseries: the time series for this user :param time_query: the range to consider for segmentation + :param distance_from_place: distance in m from the start place of the current trip to its end loc + :param ble_list: list of background/ble documents during this trip + :param motion_df: dataframe of background/motion_activity during this trip + :param unfiltered_loc_df: dataframe of background/location points during this trip + :param filtered_loc_df: dataframe of background/filtered_location points during this trip :return: a list of tuples [(start1, end1), (start2, end2), ...] that represent the start and end of sections - in this time range. end[n] and start[n+1] are typically assumed to be adjacent. + in this time range. end[n] and start[n+1] are typically assumed to be adjacent. """ + self.ble_list = ble_list + self.motion_df = motion_df + self.unfiltered_loc_df = unfiltered_loc_df + self.filtered_loc_df = filtered_loc_df - # Since we are going to use a hybrid model, let's just read all kinds - # of locations upfront - self.get_location_streams_for_trip(timeseries, time_query) - motion_changes = self.segment_into_motion_changes(timeseries, time_query) + motion_changes = self.segment_into_motion_changes(motion_df) - if len(self.location_points) == 0: + if len(filtered_loc_df) == 0: logging.debug("There are no points in the trip. How the heck did we segment it?") return [] if len(motion_changes) == 0: dummy_sec = self.get_section_if_applicable(timeseries, distance_from_place, - time_query, self.location_points) + time_query, filtered_loc_df) if dummy_sec is not None: return [dummy_sec] else: @@ -100,21 +108,27 @@ def segment_into_sections(self, timeseries, distance_from_place, time_query): # we cannot specify a sampling frequency for the motion activity # So let us extend the first motion change to the beginning of the # trip, and the last motion change to the end of the trip - motion_changes[0] = (self.extend_activity_to_location(motion_changes[0][0], - timeseries.df_row_to_entry("background/filtered_location", - self.location_points.iloc[0])), - motion_changes[0][1]) - motion_changes[-1] = (motion_changes[-1][0], - self.extend_activity_to_location(motion_changes[-1][1], - timeseries.df_row_to_entry("background/filtered_location", - self.location_points.iloc[-1]))) + motion_changes[0] = ( + self.extend_activity_to_location( + motion_changes[0][0], + ecwm.Motionactivity(filtered_loc_df.iloc[0]) + ), + motion_changes[0][1], + ) + motion_changes[-1] = ( + motion_changes[-1][0], + self.extend_activity_to_location( + motion_changes[-1][1], + ecwm.Motionactivity(filtered_loc_df.iloc[-1]) + ), + ) for (start_motion, end_motion) in motion_changes: logging.debug("Considering %s from %s -> %s" % (start_motion.type, start_motion.fmt_time, end_motion.fmt_time)) # Find points that correspond to this section - raw_section_df = self.filter_points_for_range(self.location_points, - start_motion, end_motion) + raw_section_df = self.filter_points_for_range(filtered_loc_df, + start_motion, end_motion) if len(raw_section_df) <=1: logging.info("Found %d filtered points %s and %s for type %s, skipping..." % diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index 63bb8acb0..7cc5051c2 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -77,7 +77,7 @@ def mark_sectioning_done(user_id, last_trip_done): mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION, None) else: mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION, - last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE) + last_trip_done['data']['end_ts'] + END_FUZZ_AVOID_LTE) def mark_sectioning_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.SECTION_SEGMENTATION) diff --git a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py index 4162962fd..a2b409b9d 100644 --- a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py @@ -73,7 +73,13 @@ def testSegmentationPointsSmoothedHighConfidenceMotion(self): shcmsm = shcm.SmoothedHighConfidenceMotion(60, 100, [ecwm.MotionTypes.TILTING, ecwm.MotionTypes.UNKNOWN, ecwm.MotionTypes.STILL]) - segmentation_points = shcmsm.segment_into_sections(ts, 0, tq) + + ble_list = ts.find_entries(['background/bluetooth_ble'], tq) + motion_df = ts.get_data_df('background/motion_activity', tq) + unfiltered_loc_df = ts.get_data_df('background/location', tq) + filtered_loc_df = ts.get_data_df('background/filtered_location', tq) + + segmentation_points = shcmsm.segment_into_sections(ts, tq, 0, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df) for (start, end, motion) in segmentation_points: logging.info("section is from %s (%f) -> %s (%f) using mode %s" % @@ -140,9 +146,26 @@ def testSegmentationWrapperWithManualTrip(self): test_trip_entry = ts.get_entry_from_id(esda.RAW_TRIP_KEY, test_trip_id) test_place.starting_trip = test_trip_id ts.insert(test_place_entry) - - eaiss.segment_trip_into_sections(self.androidUUID, test_trip_entry, "DwellSegmentationTimeFilter") - + time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, test_trip_entry.get_id()) + + ble_list = ts.find_entries(['background/bluetooth_ble'], time_query) + motion_df = ts.get_data_df('background/motion_activity', time_query) + unfiltered_loc_df = ts.get_data_df('background/location', time_query) + filtered_loc_df = ts.get_data_df('background/filtered_location', time_query) + + dist_from_place = eaiss._get_distance_from_start_place_to_end(test_trip_entry, + [test_place_entry]) + eaiss.segment_trip_into_sections( + ts, + test_trip_entry, + dist_from_place, + "DwellSegmentationTimeFilter", + ble_list, + motion_df, + unfiltered_loc_df, + filtered_loc_df, + ) + created_stops_entries = esdt.get_raw_stops_for_trip(self.androidUUID, test_trip_id) created_sections_entries = esdt.get_raw_sections_for_trip(self.androidUUID, test_trip_id) created_stops = [entry.data for entry in created_stops_entries] From 322acf78f7e845fce3d456b6249648b728ef3efa Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Fri, 7 Mar 2025 01:41:28 -0500 Subject: [PATCH 2/2] insert section segmentation entries in bulk We create segmentation/raw_section and segmentation/raw_stop entries in segment_trip_into_sections. We have been inserting them as we create them (which sometimes required updating the previous entry if there is a stop), resulting in numerous insert and update DB calls Instead we can keep a list of entries to be created and then only bulk_insert them at the end of the stage, once we are done updating them --- .../segmentation/section_segmentation.py | 26 +++++++++---------- .../intakeTests/TestSectionSegmentation.py | 3 ++- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/emission/analysis/intake/segmentation/section_segmentation.py b/emission/analysis/intake/segmentation/section_segmentation.py index e15f6da65..684460afa 100644 --- a/emission/analysis/intake/segmentation/section_segmentation.py +++ b/emission/analysis/intake/segmentation/section_segmentation.py @@ -62,9 +62,10 @@ def segment_current_sections(user_id): unfiltered_loc_df = ts.get_data_df('background/location', time_query) filtered_loc_df = ts.get_data_df('background/filtered_location', time_query) + entries_to_insert = [] for trip_entry in trips_to_process: dist_from_place = _get_distance_from_start_place_to_end(trip_entry, places_in_range) - segment_trip_into_sections( + entries_to_insert += segment_trip_into_sections( ts, trip_entry, dist_from_place, @@ -74,6 +75,8 @@ def segment_current_sections(user_id): unfiltered_loc_df, filtered_loc_df, ) + if entries_to_insert: + ts.bulk_insert(entries_to_insert, esta.EntryType.ANALYSIS_TYPE) if len(trips_to_process) == 0: # Didn't process anything new so start at the same point next time last_trip_processed = None @@ -127,8 +130,6 @@ def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, # So this is much simpler than the trip case. # Again, since this is segmenting a trip, we can just start with a section - prev_section_entry = None - # TODO: Should we link the locations to the trips this way, or by using a foreign key? # If we want to use a foreign key, then we need to include the object id in the data df as well so that we can # set it properly. @@ -137,6 +138,8 @@ def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, trip_end_loc = ecwl.Location(trip_filtered_loc_df.iloc[-1]) logging.debug("trip_start_loc = %s, trip_end_loc = %s" % (trip_start_loc, trip_end_loc)) + section_entries = [] + stops_entries = [] for (i, (start_loc_doc, end_loc_doc, sensed_mode)) in enumerate(segmentation_points): logging.debug("start_loc_doc = %s, end_loc_doc = %s" % (start_loc_doc, end_loc_doc)) start_loc = ecwl.Location(start_loc_doc) @@ -145,7 +148,7 @@ def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, section = ecwc.Section() section.trip_id = trip_entry['_id'] - if prev_section_entry is None: + if len(section_entries) == 0: # This is the first point, so we want to start from the start of the trip, not the start of this segment start_loc = trip_start_loc if i == len(segmentation_points) - 1: @@ -168,7 +171,7 @@ def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, section_entry = ecwe.Entry.create_entry(ts.user_id, esda.RAW_SECTION_KEY, section, create_id=True) - if prev_section_entry is not None: + if len(section_entries) > 0: # If this is not the first section, create a stop to link the two sections together # The expectation is prev_section -> stop -> curr_section stop = ecws.Stop() @@ -177,14 +180,11 @@ def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, esda.RAW_STOP_KEY, stop, create_id=True) logging.debug("stop = %s, stop_entry = %s" % (stop, stop_entry)) - stitch_together(prev_section_entry, stop_entry, section_entry) - ts.insert(stop_entry) - ts.update(prev_section_entry) - - # After we go through the loop, we will be left with the last section, - # which does not have an ending stop. We insert that too. - ts.insert(section_entry) - prev_section_entry = section_entry + stitch_together(section_entries[-1], stop_entry, section_entry) + stops_entries.append(stop_entry) + section_entries.append(section_entry) + + return section_entries + stops_entries def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None): diff --git a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py index a2b409b9d..06adead05 100644 --- a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py @@ -155,7 +155,7 @@ def testSegmentationWrapperWithManualTrip(self): dist_from_place = eaiss._get_distance_from_start_place_to_end(test_trip_entry, [test_place_entry]) - eaiss.segment_trip_into_sections( + entries = eaiss.segment_trip_into_sections( ts, test_trip_entry, dist_from_place, @@ -165,6 +165,7 @@ def testSegmentationWrapperWithManualTrip(self): unfiltered_loc_df, filtered_loc_df, ) + ts.bulk_insert(entries, esta.EntryType.ANALYSIS_TYPE) created_stops_entries = esdt.get_raw_stops_for_trip(self.androidUUID, test_trip_id) created_sections_entries = esdt.get_raw_sections_for_trip(self.androidUUID, test_trip_id)