Skip to content

Commit

Permalink
Merge pull request #1036 from JGreenlee/section_segmentation_db_optimize
Browse files Browse the repository at this point in the history
🚀 Section Segmentation Optimization
  • Loading branch information
shankari authored Mar 8, 2025
2 parents 88610f4 + 322acf7 commit 87aa882
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 110 deletions.
134 changes: 86 additions & 48 deletions emission/analysis/intake/segmentation/section_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import emission.storage.decorations.analysis_timeseries_queries as esda

import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt

import emission.core.wrapper.motionactivity as ecwm
import emission.core.wrapper.location as ecwl
import emission.core.wrapper.localdate as ecwld
import emission.core.wrapper.section as ecwc
import emission.core.wrapper.stop as ecws
import emission.core.wrapper.entry as ecwe
Expand All @@ -26,7 +28,7 @@
import emcommon.bluetooth.ble_matching as emcble

class SectionSegmentationMethod(object):
def segment_into_sections(self, timeseries, distance_from_place, time_query):
def segment_into_sections(self, timeseries, time_query, distance_from_place, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df):
"""
Examines the timeseries database for a specific range and returns the
points at which the trip needs to be segmented. Again, this allows
Expand All @@ -46,12 +48,35 @@ def segment_into_sections(self, timeseries, distance_from_place, time_query):
pass

def segment_current_sections(user_id):
time_query = epq.get_time_range_for_sectioning(user_id)
try:
trips_to_process = esda.get_entries(esda.RAW_TRIP_KEY, user_id, time_query)
ts = esta.TimeSeries.get_time_series(user_id)
time_query = epq.get_time_range_for_sectioning(user_id)
trips_to_process = ts.find_entries([esda.RAW_TRIP_KEY], time_query)

time_query.timeType = 'data.exit_ts'
places_in_range = ts.find_entries([esda.RAW_PLACE_KEY], time_query)

time_query.timeType = 'data.ts'
ble_list = ts.find_entries(['background/bluetooth_ble'], time_query)
motion_df = ts.get_data_df('background/motion_activity', time_query)
unfiltered_loc_df = ts.get_data_df('background/location', time_query)
filtered_loc_df = ts.get_data_df('background/filtered_location', time_query)

entries_to_insert = []
for trip_entry in trips_to_process:
logging.info("+" * 20 + ("Processing trip %s for user %s" % (trip_entry.get_id(), user_id)) + "+" * 20)
segment_trip_into_sections(user_id, trip_entry, trip_entry.data.source)
dist_from_place = _get_distance_from_start_place_to_end(trip_entry, places_in_range)
entries_to_insert += segment_trip_into_sections(
ts,
trip_entry,
dist_from_place,
trip_entry['data']['source'],
ble_list,
motion_df,
unfiltered_loc_df,
filtered_loc_df,
)
if entries_to_insert:
ts.bulk_insert(entries_to_insert, esta.EntryType.ANALYSIS_TYPE)
if len(trips_to_process) == 0:
# Didn't process anything new so start at the same point next time
last_trip_processed = None
Expand All @@ -62,11 +87,17 @@ def segment_current_sections(user_id):
logging.exception("Sectioning failed for user %s" % user_id)
epq.mark_sectioning_failed(user_id)

def segment_trip_into_sections(user_id, trip_entry, trip_source):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, trip_entry.get_id())
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
ble_entries_during_trip = ts.find_entries(["background/bluetooth_ble"], time_query)

def segment_trip_into_sections(ts, trip_entry, distance_from_place, trip_source, ble_list, motion_df, unfiltered_loc_df, filtered_loc_df):
trip_tq = estt.TimeQuery("data.ts", trip_entry['data']['start_ts'], trip_entry['data']['end_ts'])

trip_ble_list = [e for e in ble_list
if e["data"]["ts"] >= trip_tq.startTs
and e["data"]["ts"] <= trip_tq.endTs]
ts_in_tq = "@trip_tq.startTs <= ts <= @trip_tq.endTs"
trip_motion_df = motion_df.query(ts_in_tq) if len(motion_df) else motion_df
trip_unfiltered_loc_df = unfiltered_loc_df.query(ts_in_tq) if len(unfiltered_loc_df) else unfiltered_loc_df
trip_filtered_loc_df = filtered_loc_df.query(ts_in_tq) if len(filtered_loc_df) else filtered_loc_df

if (trip_source == "DwellSegmentationTimeFilter"):
import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as shcm
Expand All @@ -84,36 +115,40 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
ecwm.MotionTypes.STILL,
ecwm.MotionTypes.NONE, # iOS only
ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE]) # iOS only

segmentation_points = shcmsm.segment_into_sections(ts, distance_from_place, time_query)
segmentation_points = shcmsm.segment_into_sections(
ts,
trip_tq,
distance_from_place,
trip_ble_list,
trip_motion_df,
trip_unfiltered_loc_df,
trip_filtered_loc_df
)

# Since we are segmenting an existing trip into sections, we do not need to worry about linking with
# a prior place, since it will be linked through the trip object.
# So this is much simpler than the trip case.
# Again, since this is segmenting a trip, we can just start with a section

prev_section_entry = None

# TODO: Should we link the locations to the trips this way, or by using a foreign key?
# If we want to use a foreign key, then we need to include the object id in the data df as well so that we can
# set it properly.
ts = esta.TimeSeries.get_time_series(user_id)

get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"])
trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts)
trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts)
trip_start_loc = ecwl.Location(trip_filtered_loc_df.iloc[0])
trip_end_loc = ecwl.Location(trip_filtered_loc_df.iloc[-1])
logging.debug("trip_start_loc = %s, trip_end_loc = %s" % (trip_start_loc, trip_end_loc))

section_entries = []
stops_entries = []
for (i, (start_loc_doc, end_loc_doc, sensed_mode)) in enumerate(segmentation_points):
logging.debug("start_loc_doc = %s, end_loc_doc = %s" % (start_loc_doc, end_loc_doc))
get_loc_for_row = lambda row: ts.df_row_to_entry("background/filtered_location", row).data
start_loc = get_loc_for_row(start_loc_doc)
end_loc = get_loc_for_row(end_loc_doc)
start_loc = ecwl.Location(start_loc_doc)
end_loc = ecwl.Location(end_loc_doc)
logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc))

section = ecwc.Section()
section.trip_id = trip_entry.get_id()
if prev_section_entry is None:
section.trip_id = trip_entry['_id']
if len(section_entries) == 0:
# This is the first point, so we want to start from the start of the trip, not the start of this segment
start_loc = trip_start_loc
if i == len(segmentation_points) - 1:
Expand All @@ -127,42 +162,39 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
# Later, we may want to actually use BLE sensor data as part of the basis for segmentation
dynamic_config = eadc.get_dynamic_config()
ble_sensed_mode = emcble.get_ble_sensed_vehicle_for_section(
ble_entries_during_trip, start_loc.ts, end_loc.ts, dynamic_config
trip_ble_list, start_loc.ts, end_loc.ts, dynamic_config
)

fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode)
# We create the entry after filling in the section so that we know
# that the data is included properly
section_entry = ecwe.Entry.create_entry(user_id, esda.RAW_SECTION_KEY,
section_entry = ecwe.Entry.create_entry(ts.user_id, esda.RAW_SECTION_KEY,
section, create_id=True)

if prev_section_entry is not None:
if len(section_entries) > 0:
# If this is not the first section, create a stop to link the two sections together
# The expectation is prev_section -> stop -> curr_section
stop = ecws.Stop()
stop.trip_id = trip_entry.get_id()
stop_entry = ecwe.Entry.create_entry(user_id,
esda.RAW_STOP_KEY,
stop, create_id=True)
stop.trip_id = trip_entry['_id']
stop_entry = ecwe.Entry.create_entry(ts.user_id,
esda.RAW_STOP_KEY,
stop, create_id=True)
logging.debug("stop = %s, stop_entry = %s" % (stop, stop_entry))
stitch_together(prev_section_entry, stop_entry, section_entry)
ts.insert(stop_entry)
ts.update(prev_section_entry)

# After we go through the loop, we will be left with the last section,
# which does not have an ending stop. We insert that too.
ts.insert(section_entry)
prev_section_entry = section_entry
stitch_together(section_entries[-1], stop_entry, section_entry)
stops_entries.append(stop_entry)
section_entries.append(section_entry)

return section_entries + stops_entries


def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None):
section.start_ts = start_loc.ts
section.start_local_dt = start_loc.local_dt
section.start_local_dt = ecwld.LocalDate.get_local_date(start_loc.ts, start_loc['local_dt_timezone'])
section.start_fmt_time = start_loc.fmt_time

section.end_ts = end_loc.ts
try:
section.end_local_dt = end_loc.local_dt
section.end_local_dt = ecwld.LocalDate.get_local_date(end_loc.ts, end_loc['local_dt_timezone'])
except AttributeError as e:
print(end_loc)
section.end_fmt_time = end_loc.fmt_time
Expand Down Expand Up @@ -206,14 +238,20 @@ def stitch_together(ending_section_entry, stop_entry, starting_section_entry):
stop_entry["data"] = stop
starting_section_entry["data"] = starting_section

def _get_distance_from_start_place_to_end(raw_trip_entry):
import emission.core.common as ecc

start_place_id = raw_trip_entry.data.start_place
start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id)
dist = ecc.calDistance(start_place.location.coordinates,
raw_trip_entry.data.end_loc.coordinates)
def _get_distance_from_start_place_to_end(raw_trip_entry, raw_places):
start_place_id = raw_trip_entry['data']['start_place']
start_place_coords = None
for place in raw_places:
if place['_id'] == start_place_id:
logging.debug('start place found in list')
start_place_coords = place['data']['location']['coordinates']
if not start_place_coords:
logging.debug('start place not found in list, getting by id')
start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id)
start_place_coords = start_place.location.coordinates
dist = ecc.calDistance(start_place_coords,
raw_trip_entry['data']['end_loc']['coordinates'])
logging.debug("Distance from raw_place %s to the end of raw_trip_entry %s = %s" %
(start_place_id, raw_trip_entry.get_id(), dist))
(start_place_id, raw_trip_entry['_id'], dist))
return dist

Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def is_flip_flop(self, start_motion, end_motion):
return True
else:
streak_locs = self.seg_method.filter_points_for_range(
self.seg_method.location_points, start_motion, end_motion)
self.seg_method.filtered_loc_df, start_motion, end_motion)
logging.debug("in is_flip_flop: len(streak_locs) = %d" % len(streak_locs))
if len(streak_locs) == 0:
return True
Expand Down Expand Up @@ -227,7 +227,7 @@ def check_no_location_walk(self, streak_start, streak_end):

ssm, sem = self.motion_changes[streak_start]
streak_locs = self.seg_method.filter_points_for_range(
self.seg_method.location_points, ssm, sem)
self.seg_method.filtered_loc_df, ssm, sem)
streak_unfiltered_locs = self.seg_method.filter_points_for_range(
self.seg_method.unfiltered_loc_df, ssm, sem)

Expand Down Expand Up @@ -297,17 +297,17 @@ def get_merge_direction(self, streak_start, streak_end):
return MergeResult(Direction.FORWARD, FinalMode.UNMERGED)

loc_points = self.seg_method.filter_points_for_range(
self.seg_method.location_points, ssm, eem)
self.seg_method.filtered_loc_df, ssm, eem)
loc_points.reset_index(inplace=True)
with_speed_loc_points = eaicl.add_dist_heading_speed(loc_points)

points_before = self.seg_method.filter_points_for_range(
self.seg_method.location_points, bsm, bem)
self.seg_method.filtered_loc_df, bsm, bem)
points_before.reset_index(inplace=True)
with_speed_points_before = eaicl.add_dist_heading_speed(points_before)

points_after = self.seg_method.filter_points_for_range(
self.seg_method.location_points, asm, aem)
self.seg_method.filtered_loc_df, asm, aem)
points_after.reset_index(inplace=True)
with_speed_points_after = eaicl.add_dist_heading_speed(points_after)

Expand Down Expand Up @@ -407,7 +407,7 @@ def validate_motorized(self, mcs, mce):

def get_median_speed(self, mcs, mce):
loc_df = self.seg_method.filter_points_for_range(
self.seg_method.location_points, mcs, mce)
self.seg_method.filtered_loc_df, mcs, mce)
if len(loc_df) > 0:
loc_df.reset_index(inplace=True)
with_speed_df = eaicl.add_dist_heading_speed(loc_df)
Expand Down
Loading

0 comments on commit 87aa882

Please sign in to comment.