diff --git a/dp3/database/database.py b/dp3/database/database.py index a9220d0d..efb4b292 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -671,14 +671,30 @@ def get_raw_summary(self, etype: str, before: datetime) -> pymongo.cursor: except Exception as e: raise DatabaseError(f"Raw collection {raw_col_name} fetch failed: {e}") from e - def get_raw(self, etype: str, after: datetime, before: datetime) -> pymongo.cursor: + def get_raw( + self, etype: str, after: datetime, before: datetime, plain: bool = True + ) -> pymongo.cursor: + """Get raw datapoints where `t1` is in <`after`, `before`). + + If `plain` is `True`, then all plain datapoints will be returned (default). + """ raw_col_name = self._raw_col_name(etype) - return self._db[raw_col_name].find({"t1": {"$gte": after, "$lt": before}}) + query_filter = {"$or": [{"t1": {"$gte": after, "$lt": before}}]} + if plain: + query_filter["$or"].append({"t1": {"$exists": False}}) + return self._db[raw_col_name].find(query_filter) - def delete_old_raw_dps(self, etype: str, before: datetime): + def delete_old_raw_dps(self, etype: str, before: datetime, plain: bool = True): + """Delete raw datapoints older than `before`. + + Deletes all plain datapoints if `plain` is `True` (default). + """ raw_col_name = self._raw_col_name(etype) + query_filter = {"$or": [{"t1": {"$lt": before}}]} + if plain: + query_filter["$or"].append({"t1": {"$exists": False}}) try: - return self._db[raw_col_name].delete_many({"t1": {"$lt": before}}) + return self._db[raw_col_name].delete_many(query_filter) except Exception as e: raise DatabaseError(f"Delete of old datapoints failed: {e}") from e diff --git a/dp3/history_management/history_manager.py b/dp3/history_management/history_manager.py index 70d80a2d..c08efe54 100644 --- a/dp3/history_management/history_manager.py +++ b/dp3/history_management/history_manager.py @@ -189,7 +189,7 @@ def archive_old_dps(self): first = True for etype in self.model_spec.entities: - result_cursor = self.db.get_raw(etype, after=min_date, before=t_old) + result_cursor = self.db.get_raw(etype, after=min_date, before=t_old, plain=True) for dp in result_cursor: if first: logfile.write( @@ -209,7 +209,7 @@ def archive_old_dps(self): deleted_count = 0 for etype in self.model_spec.entities: - deleted_res = self.db.delete_old_raw_dps(etype, before=t_old) + deleted_res = self.db.delete_old_raw_dps(etype, before=t_old, plain=True) deleted_count += deleted_res.deleted_count self.log.info("Deleted %s datapoints", deleted_count)