diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..c81f1596b 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -81,8 +81,7 @@ def segment_current_trips(user_id): # invalidate in the database. out_of_order_id_list = out_of_order_points["_id"].tolist() logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) + ts.invalidate_raw_entry(out_of_order_id_list) filters_in_df = loc_df["filter"].dropna().unique() logging.debug("Filters in the dataframe = %s" % filters_in_df) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index ea53c9abb..ce888fea3 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -211,7 +211,7 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries): logging.debug("After dropping %d, filtered points = %s" % (currPoint.idx, self.filtered_points_df.iloc[currPoint.idx - 5:currPoint.idx + 5][["valid", "fmt_time"]])) logging.debug("remove huge invalid ts offset point = %s" % currPoint) - timeseries.invalidate_raw_entry(currPoint["_id"]) + timeseries.invalidate_raw_entry([currPoint["_id"]]) # We currently re-retrieve the last point every time, so # searching upwards is good enough but if we use # lastPoint = currPoint, we should update currPoint here diff --git a/emission/storage/timeseries/abstract_timeseries.py b/emission/storage/timeseries/abstract_timeseries.py index 93e384c78..3a91d153b 100644 --- a/emission/storage/timeseries/abstract_timeseries.py +++ b/emission/storage/timeseries/abstract_timeseries.py @@ -108,5 +108,5 @@ def update(entry): def update_data(user_id, key, obj_id, data): pass - def invalidate_raw_entry(self, obj_id): + def invalidate_raw_entry(self, list_obj_id): pass diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 8204b4e29..7ebb5c403 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -9,7 +9,7 @@ import pandas as pd import pymongo import itertools - +from pymongo import UpdateOne import emission.core.get_database as edb import emission.storage.timeseries.abstract_timeseries as esta @@ -440,8 +440,13 @@ def update_data(user_id, key, obj_id, data): logging.debug("updating entry %s into timeseries" % new_entry) edb.save(ts.get_timeseries_db(key), new_entry) - def invalidate_raw_entry(self, obj_id): - self.timeseries_db.update_one({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}}) + def invalidate_raw_entry(self, out_of_order_id_list): + update_operations = [ + UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}}) + for obj_id in out_of_order_id_list + ] + if update_operations: + self.timeseries_db.bulk_write(update_operations) def find_entries_count(self, key_list = None, time_query = None, geo_query = None, extra_query_list = None): """