diff --git a/bin/debug/load_multi_timeline_for_range.py b/bin/debug/load_multi_timeline_for_range.py index 79973fde0..cdd2bf6a8 100644 --- a/bin/debug/load_multi_timeline_for_range.py +++ b/bin/debug/load_multi_timeline_for_range.py @@ -116,6 +116,7 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con all_user_list = [] all_rerun_list = [] + (tsdb_count, ucdb_count) = (0,0) for i, filename in enumerate(sel_file_list): if "pipelinestate" in filename: @@ -154,6 +155,7 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con register_fake_users(prefix, unique_user_list, verbose) post_check(unique_user_list, all_rerun_list) + return (tsdb_count, ucdb_count) if __name__ == '__main__': parser = argparse.ArgumentParser() diff --git a/emission/storage/timeseries/cache_series.py b/emission/storage/timeseries/cache_series.py index 3d3cc0830..0417ac56e 100644 --- a/emission/storage/timeseries/cache_series.py +++ b/emission/storage/timeseries/cache_series.py @@ -13,6 +13,7 @@ from future import standard_library standard_library.install_aliases() from builtins import * +import logging import emission.core.get_database as edb import emission.net.usercache.abstract_usercache as enua @@ -65,5 +66,10 @@ def insert_entries(uuid, entry_it, continue_on_error): except pymongo.errors.DuplicateKeyError as e: if not continue_on_error: raise(e) + else: + if "write_fmt_time" in entry["metadata"]: + logging.info("ignoring duplicate key error while restoring timeseries entries") + else: + logging.info("ignoring duplicate key error while restoring usercache entries") return (tsdb_count, ucdb_count) diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index 74901cb56..7e8b1654c 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -2,11 +2,8 @@ standard_library.install_aliases() from builtins import * import os -from os import path -import tempfile import unittest import json -import bson.json_util as bju import pathlib as pl import emission.storage.timeseries.abstract_timeseries as esta import gzip @@ -19,6 +16,8 @@ import emission.purge_restore.purge_data as eprpd import bin.debug.load_multi_timeline_for_range as lmtfr import logging +import gzip +import emission.storage.json_wrappers as esj class TestPurgeRestoreModule(unittest.TestCase): def setUp(self): @@ -78,7 +77,7 @@ def testPurgeRestoreModule(self): pdp.delete_timeseries_entries(self.testUUID, ts, time_query['startTs'], time_query['endTs'], export_queries) # Check how much data there is after - res = res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) logging.info(f"Purging complete: {res} entries remaining") self.assertEqual(res, 0) @@ -90,10 +89,18 @@ def testPurgeRestoreModule(self): lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) # Check how much data there is after - res = res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) logging.info(f"Restoring complete: {res} entries restored") self.assertEqual(res, 1906) + ''' + Test 4 - Verify that restoring timeseries data fails if data already exists + Duplicate key error is ignored hence no entries should be inserted + ''' + logging.info("Attempting to load duplicate data...") + (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) + self.assertEqual(tsdb_count, 0) + def testPurgeRestorePipeline(self): file_name = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived')) epr.run_restore_pipeline_for_user(self.testUUID, file_name)