From ec162ad8141ee6a9606070a169ace620ff71d1b3 Mon Sep 17 00:00:00 2001 From: "Mahadik, Mukul Chandrakant" Date: Thu, 29 Aug 2024 15:36:09 -0700 Subject: [PATCH] Implemented Full + Incremental export + purge + restore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exporting and Purging entries in 1 hour time chunks into separate files with a defined start_ts and end_ts. Start_ts is the last_processed_ts for the user_id's Purge pipeline state. - If the pipeline has already been run for the user, then this would be a non-None value. - If pipeline hasn't been run, then it will be None; in this case, the earliest timestamp entry is chosen as the start ts. This helps avoid ValueErrors when adding 1 hour (3600 seconds) to start_ts for incremental export. End_ts differs for Full export and Incremental export: - Full export: current time at which pipeline is run will be the end ts; value returned by pipeline_queries on initiating pipeline stage is used. - Incremental export: First end_ts value would be 1 hour ahead of start_ts; this value would continue to be incremented by 1 hour as long as data exists for a user. If the value after adding 1 hour exceeds the current time, then the end_ts is set to the current time itself. The export + delete process continue as long as there is raw timeseries data for the user. ------- But what does 1 hour’s worth of data mean? - In any case, purge pipeline runs upto the current time or until no more raw timeseries entries present in db for the user. - If Purge pipeline running for the first time for a user, then it will export and purge all the timeseries data for a user from its first entry (which can be really old data and first purge run might take a lot of time) - If Purge pipeline has already been run before for a user, then it will set start_ts to last_processed_ts and export data from that point. - If purge pipeline run hourly, then it would eventually just have a small subset of entries. ------- Some points to consider: A. Numerous files; Less data quantity per file One observation is that current logic is creating multiple files in 1 hour chunks, which is okay. But these files don’t really have a lot of entries. What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range). If this default threshold batch size isn't reached, keep adding to the same file. Keeping updating the end_ts but start_ts would remain the same. Will attempt this next step. ------ B. Right approach for Export + Purge? Approach A 1. Export data in chunks to File 2. Delete exported data from DB. 3. Repeat until all data purged. Flow looks like: Export -> Delete -> Export -> Delete -> Export -> Delete —— Approach B 1. Export data in chunks to file. 2. Repeat until all data exported. 3. Delete all exported data from DB. Flow looks like: Export -> Export -> Export ... -> Delete --------------- C. Do we need all 3 types of entries: locations, trips, places? For now, commented out code from export_timeseries.py. If we only need location entries, can simplify code further to just work for these entries. If these are sort of like subsets of each other: location -> trip -> place. Then I can safely just take location. But this is valid only if all entries contain location and hence ts values. If only trip entries present or only place entries, then directly choosing latest ts is incorrect since trips use enter_ts while places use start_ts Searching codebase for references and read through Shankari’s thesis to start_ts and enter_ts. I’m getting hints that start_ts and enter_ts are analysis_timeseries entries? In that case, can ignore these since the purge + restore is concerned with raw timeseries data only. Trip entries created in emission/analysis/intake/segmentation/trip_segmentation.py —— Hint 1: Timeseries_Sample.ipynb - ct_df fetches analysis/cleaned_trip entries -> analysis DB ------ Hint 2: bin/historical/migrations/populate_local_dt.py - Looks like old code, some changes were last made 8 years ago. - The collection parameter refers to some non-time series databases as seen from the function calls. - The entry[start_ts] or entry[‘enter_ts’] values are then used in the find query by setting data.ts to this value. --------- D. Is pipeline_states export needed? Remove pipeline_states export if not needed. Currently being used in existing export + load scripts. --------- --- emission/pipeline/restore_stage.py | 8 +- emission/purge_restore/export_timeseries.py | 13 +-- emission/purge_restore/purge_data.py | 61 +++++++++----- emission/purge_restore/restore_data.py | 47 +++++------ .../storage/timeseries/builtin_timeseries.py | 2 - .../exportTests/TestPurgeRestoreModule.py | 80 ++++++++++++++++--- 6 files changed, 138 insertions(+), 73 deletions(-) diff --git a/emission/pipeline/restore_stage.py b/emission/pipeline/restore_stage.py index df6e42f17..1bd320761 100644 --- a/emission/pipeline/restore_stage.py +++ b/emission/pipeline/restore_stage.py @@ -20,7 +20,7 @@ import emission.storage.decorations.stats_queries as esds import emission.purge_restore.restore_data as eprrd -def run_restore_pipeline(process_number, uuid_list, file_name): +def run_restore_pipeline(process_number, uuid_list, file_names): try: with open("conf/log/restore.conf", "r") as cf: restore_log_config = json.load(cf) @@ -43,18 +43,18 @@ def run_restore_pipeline(process_number, uuid_list, file_name): continue try: - run_restore_pipeline_for_user(uuid, file_name) + run_restore_pipeline_for_user(uuid, file_names) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) -def run_restore_pipeline_for_user(uuid, file_name): +def run_restore_pipeline_for_user(uuid, file_names): with ect.Timer() as edt: logging.info("*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10) - eprrd.restore_data(uuid, file_name) + eprrd.restore_data(uuid, file_names) esds.store_pipeline_time(uuid, ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.name, time.time(), edt.elapsed) diff --git a/emission/purge_restore/export_timeseries.py b/emission/purge_restore/export_timeseries.py index fc6353c69..ae68b1c0e 100644 --- a/emission/purge_restore/export_timeseries.py +++ b/emission/purge_restore/export_timeseries.py @@ -63,9 +63,12 @@ def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None): logging.info("In export: Databases = %s" % databases) + print("In export: Databases = %s" % databases) logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" % (user_id, start_ts, end_ts, file_name)) + print("Extracting timeline for user %s day %s -> %s and saving to file %s" % + (user_id, start_ts, end_ts, file_name)) loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases) @@ -110,11 +113,11 @@ def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None): # def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): -def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): +def validate_truncation(loc_entry_list): MAX_LIMIT = 25 * 10000 if len(loc_entry_list) == MAX_LIMIT: logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list)) - if len(trip_entry_list) == MAX_LIMIT: - logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list)) - if len(place_entry_list) == MAX_LIMIT: - logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list)) + # if len(trip_entry_list) == MAX_LIMIT: + # logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list)) + # if len(place_entry_list) == MAX_LIMIT: + # logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list)) diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py index 4b2ffd8f8..f6d1badf2 100644 --- a/emission/purge_restore/purge_data.py +++ b/emission/purge_restore/purge_data.py @@ -21,6 +21,7 @@ def purge_data(user_id, archive_dir, export_type): pdp.user_id = user_id file_names = pdp.run_purge_data_pipeline(user_id, archive_dir, export_type) logging.debug("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts)) + print("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts)) if pdp.last_processed_ts is None: logging.debug("After run, last_processed_ts == None, must be early return") espq.mark_purge_data_done(user_id, pdp.last_processed_ts) @@ -55,19 +56,20 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): print("Inside: purge_data - Start time: %s" % initStartTs) print("Inside: purge_data - End time: %s" % initEndTs) - export_queries = { + export_queries_initial = { # 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs), # 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs), 'loc_time_query': estt.TimeQuery("data.ts", initStartTs, initEndTs) } file_names = [] + entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial) + count_entries = len(entries_to_export) if initStartTs is None: # If running the pipeline PURGE stage for first time, choosing the first timestamp # from the timeseries as the starting point # Else cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected - entries_to_export = self.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries) print("Inside: purge_data - entries_to_export = %s" % entries_to_export[0]['data']['ts']) current_start_ts = entries_to_export[0]['data']['ts'] else: @@ -91,28 +93,43 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): print(f"Processing data from {current_start_ts} to {current_end_ts}") file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) - print("Exporting to file: %s" % file_name) - export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False) + # epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False) + + entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial) + count_entries_1 = len(entries_to_export_1) + + if export_queries is None and count_entries_1 > 0: + print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) + print("Incrementing time range by 1 hour") + current_start_ts = current_end_ts + continue + # if count_entries_2 == 0 and count_entries_1 == 0: + elif export_queries is None and count_entries_1 == 0: + # Didn't process anything new so start at the same point next time + # self._last_processed_ts = None + logging.debug("No new data to export, breaking out of while loop") + print("No new data to export, breaking out of while loop") + break + + entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries) + count_entries_2 = len(entries_to_export_2) + print("count_entries_2 = %s" % count_entries_2) + + + logging.debug("Exporting to file: %s" % file_name) + print("Exporting to file: %s" % file_name) file_names.append(file_name) + print("File names: %s" % file_names) - if export_queries is None: - logging.debug("No data to export, export_queries is None") - print("No data to export, export_queries is None") - break - else: - entries_to_export = self.get_exported_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries) - self.export_pipeline_states(user_id, file_name) - self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries) - - if len(entries_to_export) == 0: - # Didn't process anything new so start at the same point next time - self._last_processed_ts = None - logging.debug("No new data to export, breaking out of while loop") - print("No new data to export, breaking out of while loop") - break - else: - self._last_processed_ts = entries_to_export[-1]['data']['ts'] + self.export_pipeline_states(user_id, file_name) + self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries) + + print("Total entries to export: %s" % count_entries) + print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2)) + print("New count entries to export: %s" % count_entries_1) + self._last_processed_ts = entries_to_export_2[-1]['data']['ts'] + print("Updated last_processed_ts %s" % self._last_processed_ts) current_start_ts = current_end_ts if current_start_ts >= initEndTs: @@ -150,7 +167,7 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - def get_exported_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries): + def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries): entries_to_export = [] for key, value in export_queries.items(): tq = value diff --git a/emission/purge_restore/restore_data.py b/emission/purge_restore/restore_data.py index f15fa41c5..a0bb11542 100644 --- a/emission/purge_restore/restore_data.py +++ b/emission/purge_restore/restore_data.py @@ -13,11 +13,11 @@ import emission.core.get_database as edb import emission.core.wrapper.pipelinestate as ecwp -def restore_data(user_id, file_name): +def restore_data(user_id, file_names): try: rdp = RestoreDataPipeline() rdp.user_id = user_id - rdp.run_restore_data_pipeline(user_id, file_name) + rdp.run_restore_data_pipeline(user_id, file_names) if rdp.last_processed_ts is None: logging.debug("After run, last_processed_ts == None, must be early return") espq.mark_restore_data_done(user_id, rdp.last_processed_ts) @@ -33,30 +33,21 @@ def __init__(self): def last_processed_ts(self): return self._last_processed_ts - def run_restore_data_pipeline(self, user_id, file_name): + def run_restore_data_pipeline(self, user_id, file_names): time_query = espq.get_time_range_for_restore_data(user_id) - entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook) - ''' - PipelineState({ - '_id': ObjectId('66b15dd496328b58cca9486d'), - 'user_id': UUID('6f600819-2b42-47a8-a57c-c7db631a832a'), - 'pipeline_stage': 20, - 'curr_run_ts': None, - 'last_processed_ts': 1437633640.069, - 'last_ts_run': 1722899919.6985111 - }) - ''' - # pipelineState = edb.get_pipeline_state_db().find_one({"user_id": user_id, - # "pipeline_stage": ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.value}) - # self._last_processed_ts = pipelineState["last_processed_ts"] - # logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts)) - (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) - if tsdb_count == 0: - # Didn't process anything new so start at the same point next time - self._last_processed_ts = None - else: - self._last_processed_ts = entries_to_import[-1]['data']['ts'] - print("After load, last_processed_ts = %s" % (self._last_processed_ts)) - # if self._last_processed_ts is None or self._last_processed_ts < entries_to_import[-1]['metadata']['write_ts']: - # self._last_processed_ts = entries_to_import[-1]['metadata']['write_ts'] + for file_name in file_names: + entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook) + # pipelineState = edb.get_pipeline_state_db().find_one({"user_id": user_id, + # "pipeline_stage": ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.value}) + # self._last_processed_ts = pipelineState["last_processed_ts"] + # logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts)) + (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) + print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) + if tsdb_count == 0: + # Didn't process anything new so start at the same point next time + self._last_processed_ts = None + else: + self._last_processed_ts = entries_to_import[-1]['data']['ts'] + print("After load, last_processed_ts = %s" % (self._last_processed_ts)) + # if self._last_processed_ts is None or self._last_processed_ts < entries_to_import[-1]['metadata']['write_ts']: + # self._last_processed_ts = entries_to_import[-1]['metadata']['write_ts'] diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index dad0bfec1..8204b4e29 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -135,8 +135,6 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None, ret_query.update({"$or": key_query_list}) if time_query is not None: ret_query.update(time_query.get_query()) - print("Inside builtintimeseries") - print("time_query = %s" % time_query.get_query()) if geo_query is not None: ret_query.update(geo_query.get_query()) if extra_query_list is not None: diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index 07c87c73e..9818fb71f 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -107,7 +107,7 @@ def testPurgeRestoreModule(self): (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) self.assertEqual(tsdb_count, 0) - def testPurgeRestorePipeline(self): + def testPurgeRestorePipelineFull(self): ''' Test 1 - Verify that purging timeseries data works with sample real data ''' @@ -115,19 +115,18 @@ def testPurgeRestorePipeline(self): res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) logging.info(f"About to purge {res} entries") print(f"About to purge {res} entries") - # self.assertEqual(res, 1906) + self.assertEqual(res, 1906) # Run the purge pipeline - # file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") - file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental") + file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") print("Exported file names: %s" % file_names) ''' Test 2 - Assert the file exists after the export process ''' - # self.assertTrue(pl.Path(file_name + ".gz").is_file()) - # with gzip.open(file_name + ".gz", 'r') as ef: - # exported_data = json.loads(ef.read().decode('utf-8')) + self.assertTrue(pl.Path(file_names[0] + ".gz").is_file()) + with gzip.open(file_names[0] + ".gz", 'r') as ef: + exported_data = json.loads(ef.read().decode('utf-8')) ''' Test 3 - Verify that purging timeseries data works with sample real data @@ -141,13 +140,13 @@ def testPurgeRestorePipeline(self): # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline stat_pipeline_key = entries[0].get('metadata').get('key') print(f"stat_pipeline_key = {stat_pipeline_key}") - # self.assertEqual(stat_pipeline_key,'stats/pipeline_time') - # self.assertEqual(res, 1) + self.assertEqual(stat_pipeline_key,'stats/pipeline_time') + self.assertEqual(res, 1) # Run the restore pipeline logging.info(f"About to restore entries") print(f"About to restore entries") - # epr.run_restore_pipeline_for_user(self.testUUID, file_names[0]) + epr.run_restore_pipeline_for_user(self.testUUID, file_names[0]) ''' Test 4 - Verify that restoring timeseries data works with sample real data @@ -160,8 +159,65 @@ def testPurgeRestorePipeline(self): # Two entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline print(f"res_stats_count = {res_stats_count}") - # self.assertEqual(res_stats_count, 2) - # self.assertEqual(res, 1908) + self.assertEqual(res_stats_count, 2) + self.assertEqual(res, 1908) + + def testPurgeRestorePipelineIncremental(self): + ''' + Test 1 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.info(f"About to purge {res} entries") + print(f"About to purge {res} entries") + self.assertEqual(res, 1906) + + # Run the purge pipeline + file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental") + print("Exported file names: %s" % file_names) + + ''' + Test 2 - Assert the file exists after the export process + ''' + for file_name in file_names: + self.assertTrue(pl.Path(file_name + ".gz").is_file()) + # with gzip.open(file_name + ".gz", 'r') as ef: + # exported_data = json.loads(ef.read().decode('utf-8')) + + ''' + Test 3 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there is after + entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.info(f"Purging complete: {res} entries remaining") + print(f"Purging complete: {res} entries remaining") + + # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline + stat_pipeline_key = entries[0].get('metadata').get('key') + print(f"stat_pipeline_key = {stat_pipeline_key}") + self.assertEqual(stat_pipeline_key,'stats/pipeline_time') + self.assertEqual(res, 1) + + # Run the restore pipeline + logging.info(f"About to restore entries") + print(f"About to restore entries") + print("File names: %s" % file_names) + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + + ''' + Test 4 - Verify that restoring timeseries data works with sample real data + ''' + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.info(f"Restoring complete: {res} entries restored") + print(f"Restoring complete: {res} entries restored") + + # Two entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline + print(f"res_stats_count = {res_stats_count}") + # self.assertEqual(res_stats_count, 2) + # self.assertEqual(res, 1908) if __name__ == '__main__':