Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Use previously-read transition entries instead of re-reading them #1035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def tracking_restarted_in_loc_df(loc_df, transition_df):
return tracking_restarted


def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=None):
def is_tracking_restarted_in_range(start_ts, end_ts, transition_df=None):
"""
Check to see if tracing was restarted between the times specified
:param start_ts: the start of the time range to check
Expand All @@ -70,15 +70,9 @@ def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=N
:param transition_df: dataframe of transitions to use (if None, will be fetched from timeseries)
:return:
"""
if transition_df is not None:
transition_df = transition_df[
(transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts)
]
else:
import emission.storage.timeseries.timequery as estt
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
transition_df = transition_df[
(transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts)
]

if len(transition_df) == 0:
logging.debug("In range %s -> %s found no transitions" %
Expand Down
30 changes: 17 additions & 13 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def segment_current_trips(user_id):
epq.mark_segmentation_done(user_id, None)
return

# Reading in the other sensor data
transition_df = ts.get_data_df("statemachine/transition", time_query)
motion_df = ts.get_data_df("background/motion_activity", time_query)

out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
Expand All @@ -100,13 +104,11 @@ def segment_current_trips(user_id):
if len(filters_in_df) == 1:
# Common case - let's make it easy
with ect.Timer() as t_segment_trips:
transition_df = ts.get_data_df("statemachine/transition", time_query)
motion_df = ts.get_data_df("background/motion_activity", time_query)
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(loc_df, transition_df, motion_df)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips", time.time(), t_segment_trips.elapsed)
else:
with ect.Timer() as t_get_combined_segmentation:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
segmentation_points = get_combined_segmentation_points(loc_df, transition_df, motion_df, time_query,
filters_in_df,
filter_methods)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_combined_segmentation_points", time.time(), t_get_combined_segmentation.elapsed)
Expand All @@ -121,14 +123,14 @@ def segment_current_trips(user_id):
else:
with ect.Timer() as t_create_places_trips:
try:
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
create_places_and_trips(user_id, segmentation_points, transition_df, filter_method_names[filters_in_df[0]])
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
except:
logging.exception("Trip generation failed for user %s" % user_id)
epq.mark_segmentation_failed(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips", time.time(), t_create_places_trips.elapsed)

def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
def get_combined_segmentation_points(loc_df, transition_df, motion_df, time_query, filters_in_df, filter_methods):
"""
We can have mixed filters in a particular time range for multiple reasons.
a) user switches phones from one platform to another
Expand Down Expand Up @@ -169,8 +171,10 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
(curr_filter, time_query.startTs, time_query.endTs))
curr_filter_loc_df = loc_df.loc[startIndex:endIndex]
curr_filter_loc_df.reset_index(drop=True, inplace=True)
curr_filter_transition_df = ts.get_data_df("statemachine/transition", time_query)
curr_filter_motion_df = ts.get_data_df("background/motion_activity", time_query)
curr_filter_transition_df = transition_df.query("@time_query.startTs <= metadata_write_ts <= @time_query.endTs")
curr_filter_transition_df.reset_index(drop=True, inplace=True)
curr_filter_motion_df = motion_df.query("@time_query.startTs <= metadata_write_ts <= @time_query.endTs")
curr_filter_motion_df.reset_index(drop=True, inplace=True)
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(curr_filter_loc_df, curr_filter_transition_df, curr_filter_motion_df)
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
sortedStartTsList = sorted(segmentation_map.keys())
Expand All @@ -193,7 +197,7 @@ def get_last_ts_processed(filter_methods):
logging.info("Returning last_ts_processed = %s" % last_ts_processed)
return last_ts_processed

def create_places_and_trips(user_id, segmentation_points, segmentation_method_name):
def create_places_and_trips(user_id, segmentation_points, transition_df, segmentation_method_name):
# new segments, need to deal with them
# First, retrieve the last place so that we can stitch it to the newly created trip.
# Again, there are easy and hard. In the easy case, the trip was
Expand Down Expand Up @@ -236,7 +240,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na
new_place_entry = ecwe.Entry.create_entry(user_id,
"segmentation/raw_place", new_place, create_id = True)

if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name):
if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name):
# Fill in the gap in the chain with an untracked period
curr_untracked = ecwut.Untrackedtime()
curr_untracked.source = segmentation_method_name
Expand Down Expand Up @@ -276,7 +280,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start
# it will be lost
ts.update(last_place_entry)

def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name):
def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name):
"""
Check to see whether the two places are the same.
This is a fix for https://github.com/e-mission/e-mission-server/issues/378
Expand All @@ -292,7 +296,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho
logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted")
return False

if _is_tracking_restarted(last_place, start_loc, timeseries):
if _is_tracking_restarted(last_place, start_loc, transition_df):
logging.debug("tracking has been restarted, returning True")
return True

Expand Down Expand Up @@ -400,6 +404,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc):
new_place_entry["data"] = new_place
curr_trip_entry["data"] = curr_trip

def _is_tracking_restarted(last_place, start_loc, timeseries):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries)
def _is_tracking_restarted(last_place, start_loc, transition_df):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df)