Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚀 Section Segmentation Optimization #1036

Merged

Conversation

JGreenlee
Copy link
Contributor

@JGreenlee JGreenlee commented Mar 7, 2025

(Continuation of #1031)

read all entries upfront during section segmentation

Since DB calls are a bottleneck, we can reduce the number of DB calls by making upfront queries for all the required types of entries we'll need across the entire time range we are sectioning.
Then we can filter those down accordingly.

These include: segmentation/raw_place, background/bluetooth_ble, background/motion_activity, background/location, and background/filtered_location
(For some edge cases, statemachine/transition is used – but seeing as it will only be used in those specific cases, it doesn't make sense to load upfront)

The motion activity and locations are used as dataframes later on, so I used get_data_df for those.
trips, places, and ble are queried with find_entries.

Then these are plumbed through to segment_trip_into_sections, filtered to the time range of the current trip, and ultimately passed into each SectionSegmentationMethod's segment_into_sections

With the locations already pre-loaded as dataframes, I was able to remove get_location_streams_for_trip altogether

For _get_distance_from_start_place_to_end, we preload all places in the segmentation range, so we can usually find the trip's start place in memory by its _id. However, unit tests revealed edge cases where the first start place is before the current segmentation time range, so we must fallback to querying the DB via esda.get_object

insert section segmentation entries in bulk

We create segmentation/raw_section and segmentation/raw_stop entries in segment_trip_into_sections. We have been inserting them as we create them (which sometimes required updating the previous entry if there is a stop), resulting in numerous insert and update DB calls
Instead we can keep a list of entries to be created and then only bulk_insert them at the end of the stage, once we are done updating them

Since DB calls are a bottleneck, we can reduce the number of DB calls by making upfront queries for all the required types of entries we'll need across the entire time range we are sectioning.
Then we can filter those down accordingly.

These include: segmentation/raw_place, background/bluetooth_ble, background/motion_activity, background/location, and background/filtered_location
(For some edge cases, statemachine/transition is used – but seeing as it will only be used in those specific cases, it doesn't make sense to load it upfront)

The motion activity and locations are used as dataframes later on, so I used get_data_df for those.
trips, places, and ble are queried with find_entries.

Then these are plumbed through to segment_trip_into_sections, filtered to the time range of the current trip, and ultimately passed into each SectionSegmentationMethod's segment_into_sections

With the locations already pre-loaded as dataframes, I was able to remove get_location_streams_for_trip altogether

For _get_distance_from_start_place_to_end, we preload all places in the segmentation range so we can usually find it in memory by its _id. However, unit tests revealed edge cases where the first start place is before the current segmentation time range, so we must fallback to querying the DB via esda.get_object
We create segmentation/raw_section and segmentation/raw_stop entries in segment_trip_into_sections. We have been inserting them as we create them (which sometimes required updating the previous entry if there is a stop), resulting in numerous insert and update DB calls
Instead we can keep a list of entries to be created and then only bulk_insert them at the end of the stage, once we are done updating them
@JGreenlee
Copy link
Contributor Author

image image image image image

@JGreenlee JGreenlee mentioned this pull request Mar 7, 2025
Copy link
Contributor

@shankari shankari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM barring some high-level cleanup comments.

I am now going to run this against our 3 canonical users from ccebike before merging.

Comment on lines +137 to +138
trip_start_loc = ecwl.Location(trip_filtered_loc_df.iloc[0])
trip_end_loc = ecwl.Location(trip_filtered_loc_df.iloc[-1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I considered doing this for the trip segmentation as well as part of #1035

I think this will largely work except for the LocalDate part which is expanded in the dataframe and would have to be collapsed back into a structure in the Location object.

I even looked into "reverse json_normalize` 😄 but there was no one-line solution so I gave up given the tight timeframe.
https://stackoverflow.com/questions/54776916/inverse-of-pandas-json-normalize

It is a good point that we don't really need the LocalDate fields here. However, to avoid confusion down the road (here's a Location object, I expect to be able to access the start_local_dt field but I can't), I would suggest filing a cleanup issue that recreates those structured fields correctly and use both here and in trip_segmentation

location in trip_segmentation:

start_loc = get_loc_for_row(start_loc_doc)

Fortunately, the testing shows that looking up by _id is fairly fast
#1035 (comment)

so this is not super high priority

Comment on lines +150 to +151
section.trip_id = trip_entry['_id']
if len(section_entries) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: we are now building the section list in memory before saving it, so if the list is zero, this is the first point.

Comment on lines +145 to +146
start_loc = ecwl.Location(start_loc_doc)
end_loc = ecwl.Location(end_loc_doc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this for trip_segmentation as well!



def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None):
section.start_ts = start_loc.ts
section.start_local_dt = start_loc.local_dt
section.start_local_dt = ecwld.LocalDate.get_local_date(start_loc.ts, start_loc['local_dt_timezone'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah we do use the local date (I had forgotten although I saw it just last weekend). This is fine, but I do think we should refactor as part of cleanup

@@ -77,7 +77,7 @@ def mark_sectioning_done(user_id, last_trip_done):
mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION, None)
else:
mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION,
last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE)
last_trip_done['data']['end_ts'] + END_FUZZ_AVOID_LTE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: we need this because the last_trip_done is no longer an entry.

Note to @JGreenlee: we can also wrap the trip in a Trip object like you did with location, in case we need to keep it consistent with other stages.

@shankari
Copy link
Contributor

shankari commented Mar 8, 2025

After #1035

Trip segmentation took minutes, but section segmentation took all day
With these changes, section segmentation took minutes as well, and the entire pipeline run took ~ 10 minutes

Fri Mar  7 15:39:03 PST 2025
Fri Mar  7 15:47:48 PST 2025

I don't see any errors in SECTION_SEGMENTATION (last_processed_ts is not None) so will merge this now. Exciting!

$ grep "For stage" intake_0.log
2025-03-07 15:39:36,213:INFO:8474691648:For stage PipelineStages.USERCACHE, start_ts is None
2025-03-07 15:39:44,148:INFO:8474691648:For stage PipelineStages.USERCACHE, last_ts_processed = 2024-11-26T22:29:59.992352
2025-03-07 15:39:44,182:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, start_ts is None
2025-03-07 15:39:48,389:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, last_ts_processed = 2024-11-26T18:50:38.508838
2025-03-07 15:39:53,354:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, start_ts is None
2025-03-07 15:39:53,359:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, last_ts_processed is unchanged
2025-03-07 15:39:53,366:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, start_ts is None
2025-03-07 15:40:24,864:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, last_ts_processed = 2024-11-26T23:47:34.319918
2025-03-07 15:40:25,075:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None
2025-03-07 15:41:06,553:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, last_ts_processed = 2024-11-26T23:33:34.999963
2025-03-07 15:41:06,686:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, start_ts is None
2025-03-07 15:41:30,934:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, last_ts_processed = 2024-11-26T23:33:34.999963
2025-03-07 15:41:30,965:INFO:8474691648:For stage PipelineStages.CLEAN_RESAMPLING, start_ts is None
2025-03-07 15:42:42,858:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, start_ts is None
2025-03-07 15:42:42,863:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, last_ts_processed is unchanged
2025-03-07 15:42:42,868:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, start_ts is None
2025-03-07 15:42:42,873:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, last_ts_processed is unchanged
2025-03-07 15:42:42,877:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, start_ts is None
2025-03-07 15:42:42,883:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, last_ts_processed is unchanged
2025-03-07 15:42:42,888:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, start_ts is None
2025-03-07 15:42:42,977:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, last_ts_processed is unchanged
$ grep "For stage" intake_1.log
2025-03-07 15:39:36,222:INFO:8474691648:For stage PipelineStages.USERCACHE, start_ts is None
2025-03-07 15:39:39,343:INFO:8474691648:For stage PipelineStages.USERCACHE, last_ts_processed = 2025-02-11T23:08:35.827000
2025-03-07 15:39:39,366:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, start_ts is None
2025-03-07 15:39:43,710:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, last_ts_processed = 2024-12-16T21:40:00.600000
2025-03-07 15:39:45,407:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, start_ts is None
2025-03-07 15:39:45,411:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, last_ts_processed is unchanged
2025-03-07 15:39:45,418:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, start_ts is None
2025-03-07 15:42:20,621:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, last_ts_processed = 2025-02-12T00:10:28.813000
2025-03-07 15:42:20,724:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None
2025-03-07 15:42:50,307:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, last_ts_processed = 2025-02-12T00:07:55.365000
2025-03-07 15:42:50,419:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, start_ts is None
2025-03-07 15:43:11,796:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, last_ts_processed = 2025-02-12T00:07:55.365000
2025-03-07 15:43:11,804:INFO:8474691648:For stage PipelineStages.CLEAN_RESAMPLING, start_ts is None
2025-03-07 15:44:53,915:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, start_ts is None
2025-03-07 15:44:53,920:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, last_ts_processed is unchanged
2025-03-07 15:44:53,923:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, start_ts is None
2025-03-07 15:44:53,927:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, last_ts_processed is unchanged
2025-03-07 15:44:53,930:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, start_ts is None
2025-03-07 15:44:53,937:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, last_ts_processed is unchanged
2025-03-07 15:44:53,940:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, start_ts is None
2025-03-07 15:44:53,996:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, last_ts_processed is unchanged
$ grep "For stage" intake_2.log
2025-03-07 15:39:36,163:INFO:8474691648:For stage PipelineStages.USERCACHE, start_ts is None
2025-03-07 15:39:38,897:INFO:8474691648:For stage PipelineStages.USERCACHE, last_ts_processed = 2025-02-11T22:17:10.589000
2025-03-07 15:39:38,908:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, start_ts is None
2025-03-07 15:39:43,191:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, last_ts_processed is unchanged
2025-03-07 15:39:45,389:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, start_ts is None
2025-03-07 15:39:45,394:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, last_ts_processed is unchanged
2025-03-07 15:39:45,403:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, start_ts is None
2025-03-07 15:44:42,208:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, last_ts_processed = 2025-02-12T00:06:25.235000
2025-03-07 15:44:42,592:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None
2025-03-07 15:45:26,517:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, last_ts_processed = 2025-02-12T00:03:01.496000
2025-03-07 15:45:26,808:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, start_ts is None
2025-03-07 15:45:46,458:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, last_ts_processed = 2025-02-12T00:03:01.496000
2025-03-07 15:45:46,465:INFO:8474691648:For stage PipelineStages.CLEAN_RESAMPLING, start_ts is None
2025-03-07 15:47:47,179:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, start_ts is None
2025-03-07 15:47:47,183:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, last_ts_processed is unchanged
2025-03-07 15:47:47,186:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, start_ts is None
2025-03-07 15:47:47,189:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, last_ts_processed is unchanged
2025-03-07 15:47:47,192:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, start_ts is None
2025-03-07 15:47:47,196:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, last_ts_processed is unchanged
2025-03-07 15:47:47,199:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, start_ts is None
2025-03-07 15:47:47,252:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, last_ts_processed is unchanged

@shankari shankari merged commit 87aa882 into e-mission:master Mar 8, 2025
5 checks passed
@shankari
Copy link
Contributor

shankari commented Mar 8, 2025

There was an error in CLEAN_AND_RESAMPLE earlier. To to verify that this is not a regression, I copied over the production analysis config (which turns off the assertions) and reran

This run finished in 10 mins

Fri Mar  7 16:43:36 PST 2025
Fri Mar  7 16:51:10 PST 2025

without any errors; all stages completed successfully

kshankar-41872s:tmp kshankar$ grep "For stage" intake_2.log 
2025-03-07 16:44:01,154:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, start_ts is None
2025-03-07 16:44:02,278:INFO:8474691648:For stage PipelineStages.USER_INPUT_MATCH_INCOMING, last_ts_processed is unchanged
2025-03-07 16:44:03,801:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, start_ts is None
2025-03-07 16:44:03,807:INFO:8474691648:For stage PipelineStages.ACCURACY_FILTERING, last_ts_processed is unchanged
2025-03-07 16:44:03,814:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, start_ts is None
2025-03-07 16:48:44,896:INFO:8474691648:For stage PipelineStages.TRIP_SEGMENTATION, last_ts_processed = 2025-02-12T00:06:25.235000
2025-03-07 16:48:45,178:INFO:8474691648:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None
2025-03-07 16:49:29,438:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, start_ts is None
2025-03-07 16:49:29,448:INFO:8474691648:For stage PipelineStages.JUMP_SMOOTHING, last_ts_processed is unchanged
2025-03-07 16:49:29,451:INFO:8474691648:For stage PipelineStages.CLEAN_RESAMPLING, start_ts is None
2025-03-07 16:51:07,976:INFO:8474691648:For stage PipelineStages.CLEAN_RESAMPLING, last_ts_processed = 2025-02-12T00:03:01.496000
2025-03-07 16:51:07,982:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, start_ts is None
2025-03-07 16:51:07,986:INFO:8474691648:For stage PipelineStages.LABEL_INFERENCE, last_ts_processed is unchanged
2025-03-07 16:51:07,990:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, start_ts is None
2025-03-07 16:51:07,994:INFO:8474691648:For stage PipelineStages.EXPECTATION_POPULATION, last_ts_processed is unchanged
2025-03-07 16:51:07,997:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, start_ts is None
2025-03-07 16:51:09,214:INFO:8474691648:For stage PipelineStages.CREATE_CONFIRMED_OBJECTS, last_ts_processed = 2025-01-04T06:44:41.419000
2025-03-07 16:51:09,219:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, start_ts is None
2025-03-07 16:51:09,396:INFO:8474691648:For stage PipelineStages.CREATE_COMPOSITE_OBJECTS, last_ts_processed = 2025-01-04T06:44:41.419000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Tasks completed
Development

Successfully merging this pull request may close these issues.

2 participants