Skip to content

Commit

Permalink
Defluffed time filter and new one from dist filter
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Dec 8, 2024
1 parent 01402a9 commit f497ddf
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,7 @@ def segment_into_trips(self, timeseries, time_query):

self.filtered_points_df.loc[:, "valid"] = True

with ect.Timer() as t_get_transition_df:
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,17 @@ def segment_into_trips(self, timeseries, time_query):
data that they want from the sensor streams in order to determine the
segmentation points.
"""
with ect.Timer() as t_get_filtered_points_pre:
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0]
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df",
time.time(),
t_get_filtered_points_pre.elapsed
)

with ect.Timer() as t_filter_bogus_points:
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points",
time.time(),
t_filter_bogus_points.elapsed
)

with ect.Timer() as t_get_transition_df:
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0]
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -135,47 +112,40 @@ def segment_into_trips(self, timeseries, time_query):
curr_trip_start_point = sel_point
just_ended = False

with ect.Timer() as t_calculations:
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts
),
filtered_points_df.ts >= curr_trip_start_point.ts
)]
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (
last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape
))

# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts
),
filtered_points_df.ts >= curr_trip_start_point.ts
)]
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (
last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape
))

# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculations_per_iteration",
time.time(),
t_calculations.elapsed
)

with ect.Timer() as t_has_trip_ended:
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
Expand Down Expand Up @@ -216,31 +186,24 @@ def segment_into_trips(self, timeseries, time_query):
t_loop.elapsed
)

with ect.Timer() as t_post_loop:
logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop",
time.time(),
t_post_loop.elapsed
)
logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts

return segmentation_points

Expand Down

0 comments on commit f497ddf

Please sign in to comment.