diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py index 9e7e417c7..e39fa1233 100644 --- a/emission/purge_restore/purge_data.py +++ b/emission/purge_restore/purge_data.py @@ -48,32 +48,21 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): if os.path.isdir(archive_dir) == False: os.mkdir(archive_dir) - initStartTs = time_query.startTs - initEndTs = time_query.endTs - print("Inside: purge_data - Start time: %s" % initStartTs) - print("Inside: purge_data - End time: %s" % initEndTs) + init_start_ts, init_end_ts = time_query.startTs, time_query.endTs + logging.info(f"Purge data - Start time: {init_start_ts}, End time: {init_end_ts}") file_names = [] entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) - count_entries = len(entries_to_export) + # count_entries = len(entries_to_export) # If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point # Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected - current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts'] + current_start_ts = init_start_ts if init_start_ts is not None else entries_to_export[0]['data']['ts'] - # while current_start_ts < initEndTs: - while True: + while current_start_ts < init_end_ts: + # while True: print("Inside while loop: current_start_ts = %s" % current_start_ts) - current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs - - if export_type == 'incremental': - current_end_ts = min(current_start_ts + 3600, initEndTs) - print("Inside export_type incremental, increasing current_end_ts: %s" % current_end_ts) - elif export_type == 'full': - current_end_ts = initEndTs - print("Inside export_type full, setting current_end_ts to current time: %s" % current_end_ts) - else: - raise ValueError("Unknown export_type %s" % export_type) + current_end_ts = min(current_start_ts + 3600, init_end_ts) if export_type == 'incremental' else init_end_ts print(f"Processing data from {current_start_ts} to {current_end_ts}") @@ -84,74 +73,44 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) count_entries_1 = len(entries_to_export_1) - if export_queries is None and count_entries_1 > 0: - print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) - print("Incrementing time range by 1 hour") - current_start_ts = current_end_ts - continue - # if count_entries_2 == 0 and count_entries_1 == 0: - elif export_queries is None and count_entries_1 == 0: - # Didn't process anything new so start at the same point next time - # self._last_processed_ts = None - logging.debug("No new data to export, breaking out of while loop") - print("No new data to export, breaking out of while loop") - break - - entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - count_entries_2 = len(entries_to_export_2) - print("count_entries_2 = %s" % count_entries_2) - - - logging.debug("Exporting to file: %s" % file_name) - print("Exporting to file: %s" % file_name) - file_names.append(file_name) - print("File names: %s" % file_names) - - self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - - print("Total entries to export: %s" % count_entries) - print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2)) - print("New count entries to export: %s" % count_entries_1) - self._last_processed_ts = entries_to_export_2[-1]['data']['ts'] - print("Updated last_processed_ts %s" % self._last_processed_ts) + if export_queries: + logging.debug("Exporting to file: %s" % file_name) + print("Exporting to file: %s" % file_name) + file_names.append(file_name) + print("File names: %s" % file_names) + + self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + + entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + + if entries: + self.last_processed_ts = entries[-1]['data']['ts'] + logging.debug(f"Exported {len(entries)} entries from {current_start_ts} to {current_end_ts}") + logging.debug("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(entries))) + logging.debug("Remaining entries to export: %s" % count_entries_1) + logging.debug("Updated last_processed_ts %s" % self._last_processed_ts) + print(f"Exported {len(entries)} entries from {current_start_ts} to {current_end_ts}") + print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(entries))) + print("Remaining entries to export: %s" % count_entries_1) + print("Updated last_processed_ts %s" % self._last_processed_ts) + else: + if count_entries_1 > 0: + print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) + print("Incrementing time range by 1 hour") + else: + logging.info(f"No entries found from {current_start_ts} to {current_end_ts}") + logging.debug("No new data to export, breaking out of while loop") + print("No new data to export, breaking out of while loop") + break current_start_ts = current_end_ts - if current_start_ts >= initEndTs: + if current_start_ts >= init_end_ts: break - print("Exported data to %s files" % len(file_names)) - print("Exported file names: %s" % file_names) + print(f"Exported data to {len(file_names)} files") + print(f"Exported file names: {file_names}") return file_names - # new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - - # if new_entries: - # self._last_processed_ts = new_entries[-1]['data']['ts'] - # print(f"Updated last_processed_ts {self._last_processed_ts}") - - # if current_end_ts >= initEndTs: - # file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) - # print(f"Exporting entries from {current_start_ts} to {current_end_ts} to file: {file_name}") - # epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) - # file_names.append(file_name) - # self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - - # current_start_ts = current_end_ts - - # else: - # remaining_entries = self.get_export_timeseries_entries(user_id, ts, initStartTs, initEndTs) - # if not remaining_entries: - # print("No new data to export, breaking out of while loop") - # break - # else: - # print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}") - # print("Incrementing time range") - # current_start_ts = current_end_ts - - # print(f"Exported data to {len(file_names)} files") - # print(f"Exported file names: {file_names}") - # return file_names - # def export_pipeline_states(self, user_id, file_name): # pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) # logging.info("Found %d pipeline states %s" % @@ -195,8 +154,8 @@ def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_d def get_export_queries(self, start_ts, end_ts): export_queries = { - # 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs), - # 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs), + # 'trip_time_query': estt.TimeQuery("data.start_ts", init_start_ts, init_end_ts), + # 'place_time_query': estt.TimeQuery("data.enter_ts", init_start_ts, init_end_ts), 'loc_time_query': estt.TimeQuery("data.ts", start_ts, end_ts) } return export_queries diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index c18e860be..770ff158c 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -124,11 +124,16 @@ def testPurgeRestorePipelineFull(self): print("Exported file names: %s" % file_names) ''' - Test 2 - Assert the file exists after the export process + Test 2 - Assert the file exists after the export process and checking contents ''' self.assertTrue(pl.Path(file_names[0] + ".gz").is_file()) - # with gzip.open(file_names[0] + ".gz", 'r') as ef: - # exported_data = json.loads(ef.read().decode('utf-8')) + with gzip.open(file_names[0] + ".gz", 'r') as ef: + exported_data = json.loads(ef.read().decode('utf-8')) + self.assertEqual(len(exported_data), 1906) + + first_few_objectIds = ['564e73d388f663199aabf0d2', '55afb7c67d65cb39ee976598', '55afb7c67d65cb39ee976599', '55b08d327d65cb39ee9769e1', '55afb7c67d65cb39ee97659a'] + for entry in exported_data[0:5]: + self.assertIn(entry.get('_id').get('$oid'), first_few_objectIds) ''' Test 3 - Verify that purging timeseries data works with sample real data @@ -180,10 +185,16 @@ def testPurgeRestorePipelineIncremental(self): ''' Test 2 - Assert the file exists after the export process ''' + exported_data = [] for file_name in file_names: self.assertTrue(pl.Path(file_name + ".gz").is_file()) - # with gzip.open(file_name + ".gz", 'r') as ef: - # exported_data = json.loads(ef.read().decode('utf-8')) + with gzip.open(file_name + ".gz", 'r') as ef: + exported_data.extend(json.loads(ef.read().decode('utf-8'))) + self.assertEqual(len(exported_data), 1906) + + last_few_objectIds = ['55b08d3e7d65cb39ee976def', '55b08d3e7d65cb39ee976df0', '55b08d3e7d65cb39ee976df1', '55b08e907d65cb39ee976e06', '55b08e907d65cb39ee976e07'] + for entry in exported_data[-5:]: + self.assertIn(entry.get('_id').get('$oid'), last_few_objectIds) ''' Test 3 - Verify that purging timeseries data works with sample real data