Skip to content

Commit

Permalink
Faster Trip Invalidation
Browse files Browse the repository at this point in the history
Instead of invalidating one ooid from the list at a time, use UpdateOne and bulkwrite to
 invalidate entire list . This is supported by findings here  e-mission/e-mission-docs#1041 (comment)
  • Loading branch information
humbleOldSage committed Feb 4, 2024
1 parent 0a8c61d commit 3e1b451
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 7 deletions.
3 changes: 1 addition & 2 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def segment_current_trips(user_id):
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
ts.invalidate_raw_entry(out_of_order_id_list)

filters_in_df = loc_df["filter"].dropna().unique()
logging.debug("Filters in the dataframe = %s" % filters_in_df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
logging.debug("After dropping %d, filtered points = %s" %
(currPoint.idx, self.filtered_points_df.iloc[currPoint.idx - 5:currPoint.idx + 5][["valid", "fmt_time"]]))
logging.debug("remove huge invalid ts offset point = %s" % currPoint)
timeseries.invalidate_raw_entry(currPoint["_id"])
timeseries.invalidate_raw_entry([currPoint["_id"]])
# We currently re-retrieve the last point every time, so
# searching upwards is good enough but if we use
# lastPoint = currPoint, we should update currPoint here
Expand Down
2 changes: 1 addition & 1 deletion emission/storage/timeseries/abstract_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ def update(entry):
def update_data(user_id, key, obj_id, data):
pass

def invalidate_raw_entry(self, obj_id):
def invalidate_raw_entry(self, list_obj_id):
pass
11 changes: 8 additions & 3 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
import pymongo
import itertools

from pymongo import UpdateOne
import emission.core.get_database as edb
import emission.storage.timeseries.abstract_timeseries as esta

Expand Down Expand Up @@ -440,8 +440,13 @@ def update_data(user_id, key, obj_id, data):
logging.debug("updating entry %s into timeseries" % new_entry)
edb.save(ts.get_timeseries_db(key), new_entry)

def invalidate_raw_entry(self, obj_id):
self.timeseries_db.update_one({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
def invalidate_raw_entry(self, out_of_order_id_list):
update_operations = [
UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
for obj_id in out_of_order_id_list
]
if update_operations:
self.timeseries_db.bulk_write(update_operations)

def find_entries_count(self, key_list = None, time_query = None, geo_query = None, extra_query_list = None):
"""
Expand Down

0 comments on commit 3e1b451

Please sign in to comment.