Skip to content

Commit

Permalink
Implemented Full + Incremental export + purge + restore
Browse files Browse the repository at this point in the history
Exporting and Purging entries in 1 hour time chunks into separate files with a defined start_ts and end_ts.

Start_ts is the last_processed_ts for the user_id's Purge pipeline state.
- If the pipeline has already been run for the user, then this would be a non-None value.
- If pipeline hasn't been run, then it will be None; in this case, the earliest timestamp entry is chosen as the start ts. This helps avoid ValueErrors when adding 1 hour (3600 seconds) to start_ts for incremental export.

End_ts differs for Full export and Incremental export:
- Full export: current time at which pipeline is run will be the end ts; value returned by pipeline_queries on initiating pipeline stage is used.
- Incremental export: First end_ts value would be 1 hour ahead of start_ts; this value would continue to be incremented by 1 hour as long as data exists for a user. If the value after adding 1 hour exceeds the current time, then the end_ts is set to the current time itself. The export + delete process continue as long as there is raw timeseries data for the user.

-------

But what does 1 hour’s worth of data mean?
- In any case, purge pipeline runs upto the current time or until no more raw timeseries entries present in db for the user.
- If Purge pipeline running for the first time for a user, then it will export and purge all the timeseries data for a user from its first entry (which can be really old data and first purge run might take a lot of time)
- If Purge pipeline has already been run before for a user, then it will set start_ts to last_processed_ts and export data from that point.
    - If purge pipeline run hourly, then it would eventually just have a small subset of entries.

-------

Some points to consider:

A. Numerous files; Less data quantity per file

One observation is that current logic is creating multiple files in 1 hour chunks, which is okay.
But these files don’t really have a lot of entries.
What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
If this default threshold batch size isn't reached, keep adding to the same file. Keeping updating the end_ts but start_ts would remain the same.

Will attempt this next step.

------

B. Right approach for Export + Purge?

Approach A
1. Export data in chunks to File
2. Delete exported data from DB.
3. Repeat until all data purged.

Flow looks like: Export -> Delete -> Export -> Delete -> Export -> Delete

——

Approach B
1. Export data in chunks to file.
2. Repeat until all data exported.
3. Delete all exported data from DB.

Flow looks like: Export -> Export -> Export ... -> Delete

---------------

C. Do we need all 3 types of entries: locations, trips, places?

For now, commented out code from export_timeseries.py.
If we only need location entries, can simplify code further to just work for these entries.

If these are sort of like subsets of each other: location -> trip -> place.
Then I can safely just take location.
But this is valid only if all entries contain location and hence ts values.
If only trip entries present or only place entries, then directly choosing latest ts is incorrect since trips use enter_ts while places use start_ts

Searching codebase for references and read through Shankari’s thesis to start_ts and enter_ts.

I’m getting hints that start_ts and enter_ts are analysis_timeseries entries?
In that case, can ignore these since the purge + restore is concerned with raw timeseries data only.

Trip entries created in emission/analysis/intake/segmentation/trip_segmentation.py

——

Hint 1: Timeseries_Sample.ipynb

- ct_df fetches analysis/cleaned_trip entries -> analysis DB

------

Hint 2:  bin/historical/migrations/populate_local_dt.py

- Looks like old code, some changes were last made 8 years ago.
- The collection parameter refers to some non-time series databases as seen from the function calls.
- The entry[start_ts] or entry[‘enter_ts’] values are then used in the find query by setting data.ts to this value.

---------

D. Is pipeline_states export needed?

Remove pipeline_states export if not needed.
Currently being used in existing export + load scripts.

---------
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 29, 2024
1 parent 661a222 commit ec162ad
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 73 deletions.
8 changes: 4 additions & 4 deletions emission/pipeline/restore_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import emission.storage.decorations.stats_queries as esds
import emission.purge_restore.restore_data as eprrd

def run_restore_pipeline(process_number, uuid_list, file_name):
def run_restore_pipeline(process_number, uuid_list, file_names):
try:
with open("conf/log/restore.conf", "r") as cf:
restore_log_config = json.load(cf)
Expand All @@ -43,18 +43,18 @@ def run_restore_pipeline(process_number, uuid_list, file_name):
continue

try:
run_restore_pipeline_for_user(uuid, file_name)
run_restore_pipeline_for_user(uuid, file_names)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
logging.exception("Found error %s while processing pipeline "
"for user %s, skipping" % (e, uuid))


def run_restore_pipeline_for_user(uuid, file_name):
def run_restore_pipeline_for_user(uuid, file_names):
with ect.Timer() as edt:
logging.info("*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10)
eprrd.restore_data(uuid, file_name)
eprrd.restore_data(uuid, file_names)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.name,
time.time(), edt.elapsed)
13 changes: 8 additions & 5 deletions emission/purge_restore/export_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
logging.info("In export: Databases = %s" % databases)
print("In export: Databases = %s" % databases)

logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" %
(user_id, start_ts, end_ts, file_name))
print("Extracting timeline for user %s day %s -> %s and saving to file %s" %
(user_id, start_ts, end_ts, file_name))

loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases)
Expand Down Expand Up @@ -110,11 +113,11 @@ def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):


# def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
def validate_truncation(loc_entry_list):
MAX_LIMIT = 25 * 10000
if len(loc_entry_list) == MAX_LIMIT:
logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list))
if len(trip_entry_list) == MAX_LIMIT:
logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list))
if len(place_entry_list) == MAX_LIMIT:
logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list))
# if len(trip_entry_list) == MAX_LIMIT:
# logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list))
# if len(place_entry_list) == MAX_LIMIT:
# logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list))
61 changes: 39 additions & 22 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def purge_data(user_id, archive_dir, export_type):
pdp.user_id = user_id
file_names = pdp.run_purge_data_pipeline(user_id, archive_dir, export_type)
logging.debug("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts))
print("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts))
if pdp.last_processed_ts is None:
logging.debug("After run, last_processed_ts == None, must be early return")
espq.mark_purge_data_done(user_id, pdp.last_processed_ts)
Expand Down Expand Up @@ -55,19 +56,20 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
print("Inside: purge_data - Start time: %s" % initStartTs)
print("Inside: purge_data - End time: %s" % initEndTs)

export_queries = {
export_queries_initial = {
# 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs),
# 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs),
'loc_time_query': estt.TimeQuery("data.ts", initStartTs, initEndTs)
}

file_names = []
entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial)
count_entries = len(entries_to_export)

if initStartTs is None:
# If running the pipeline PURGE stage for first time, choosing the first timestamp
# from the timeseries as the starting point
# Else cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected
entries_to_export = self.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries)
print("Inside: purge_data - entries_to_export = %s" % entries_to_export[0]['data']['ts'])
current_start_ts = entries_to_export[0]['data']['ts']
else:
Expand All @@ -91,28 +93,43 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
print(f"Processing data from {current_start_ts} to {current_end_ts}")

file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
print("Exporting to file: %s" % file_name)

export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False)
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name, False)

entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries_initial)
count_entries_1 = len(entries_to_export_1)

if export_queries is None and count_entries_1 > 0:
print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts))
print("Incrementing time range by 1 hour")
current_start_ts = current_end_ts
continue
# if count_entries_2 == 0 and count_entries_1 == 0:
elif export_queries is None and count_entries_1 == 0:
# Didn't process anything new so start at the same point next time
# self._last_processed_ts = None
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break

entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)
count_entries_2 = len(entries_to_export_2)
print("count_entries_2 = %s" % count_entries_2)


logging.debug("Exporting to file: %s" % file_name)
print("Exporting to file: %s" % file_name)
file_names.append(file_name)
print("File names: %s" % file_names)

if export_queries is None:
logging.debug("No data to export, export_queries is None")
print("No data to export, export_queries is None")
break
else:
entries_to_export = self.get_exported_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)
self.export_pipeline_states(user_id, file_name)
self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)

if len(entries_to_export) == 0:
# Didn't process anything new so start at the same point next time
self._last_processed_ts = None
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break
else:
self._last_processed_ts = entries_to_export[-1]['data']['ts']
self.export_pipeline_states(user_id, file_name)
self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts, export_queries)

print("Total entries to export: %s" % count_entries)
print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2))
print("New count entries to export: %s" % count_entries_1)
self._last_processed_ts = entries_to_export_2[-1]['data']['ts']
print("Updated last_processed_ts %s" % self._last_processed_ts)

current_start_ts = current_end_ts
if current_start_ts >= initEndTs:
Expand Down Expand Up @@ -150,7 +167,7 @@ def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datet
logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

def get_exported_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries):
def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime, export_queries):
entries_to_export = []
for key, value in export_queries.items():
tq = value
Expand Down
47 changes: 19 additions & 28 deletions emission/purge_restore/restore_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp

def restore_data(user_id, file_name):
def restore_data(user_id, file_names):
try:
rdp = RestoreDataPipeline()
rdp.user_id = user_id
rdp.run_restore_data_pipeline(user_id, file_name)
rdp.run_restore_data_pipeline(user_id, file_names)
if rdp.last_processed_ts is None:
logging.debug("After run, last_processed_ts == None, must be early return")
espq.mark_restore_data_done(user_id, rdp.last_processed_ts)
Expand All @@ -33,30 +33,21 @@ def __init__(self):
def last_processed_ts(self):
return self._last_processed_ts

def run_restore_data_pipeline(self, user_id, file_name):
def run_restore_data_pipeline(self, user_id, file_names):
time_query = espq.get_time_range_for_restore_data(user_id)
entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook)
'''
PipelineState({
'_id': ObjectId('66b15dd496328b58cca9486d'),
'user_id': UUID('6f600819-2b42-47a8-a57c-c7db631a832a'),
'pipeline_stage': 20,
'curr_run_ts': None,
'last_processed_ts': 1437633640.069,
'last_ts_run': 1722899919.6985111
})
'''
# pipelineState = edb.get_pipeline_state_db().find_one({"user_id": user_id,
# "pipeline_stage": ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.value})
# self._last_processed_ts = pipelineState["last_processed_ts"]
# logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts))
(tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count))
if tsdb_count == 0:
# Didn't process anything new so start at the same point next time
self._last_processed_ts = None
else:
self._last_processed_ts = entries_to_import[-1]['data']['ts']
print("After load, last_processed_ts = %s" % (self._last_processed_ts))
# if self._last_processed_ts is None or self._last_processed_ts < entries_to_import[-1]['metadata']['write_ts']:
# self._last_processed_ts = entries_to_import[-1]['metadata']['write_ts']
for file_name in file_names:
entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook)
# pipelineState = edb.get_pipeline_state_db().find_one({"user_id": user_id,
# "pipeline_stage": ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.value})
# self._last_processed_ts = pipelineState["last_processed_ts"]
# logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts))
(tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count))
if tsdb_count == 0:
# Didn't process anything new so start at the same point next time
self._last_processed_ts = None
else:
self._last_processed_ts = entries_to_import[-1]['data']['ts']
print("After load, last_processed_ts = %s" % (self._last_processed_ts))
# if self._last_processed_ts is None or self._last_processed_ts < entries_to_import[-1]['metadata']['write_ts']:
# self._last_processed_ts = entries_to_import[-1]['metadata']['write_ts']
2 changes: 0 additions & 2 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None,
ret_query.update({"$or": key_query_list})
if time_query is not None:
ret_query.update(time_query.get_query())
print("Inside builtintimeseries")
print("time_query = %s" % time_query.get_query())
if geo_query is not None:
ret_query.update(geo_query.get_query())
if extra_query_list is not None:
Expand Down
80 changes: 68 additions & 12 deletions emission/tests/exportTests/TestPurgeRestoreModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,26 @@ def testPurgeRestoreModule(self):
(tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
self.assertEqual(tsdb_count, 0)

def testPurgeRestorePipeline(self):
def testPurgeRestorePipelineFull(self):
'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"About to purge {res} entries")
print(f"About to purge {res} entries")
# self.assertEqual(res, 1906)
self.assertEqual(res, 1906)

# Run the purge pipeline
# file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full")
file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental")
file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full")
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the file exists after the export process
'''
# self.assertTrue(pl.Path(file_name + ".gz").is_file())
# with gzip.open(file_name + ".gz", 'r') as ef:
# exported_data = json.loads(ef.read().decode('utf-8'))
self.assertTrue(pl.Path(file_names[0] + ".gz").is_file())
with gzip.open(file_names[0] + ".gz", 'r') as ef:
exported_data = json.loads(ef.read().decode('utf-8'))

'''
Test 3 - Verify that purging timeseries data works with sample real data
Expand All @@ -141,13 +140,13 @@ def testPurgeRestorePipeline(self):
# A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline
stat_pipeline_key = entries[0].get('metadata').get('key')
print(f"stat_pipeline_key = {stat_pipeline_key}")
# self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
# self.assertEqual(res, 1)
self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
self.assertEqual(res, 1)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
# epr.run_restore_pipeline_for_user(self.testUUID, file_names[0])
epr.run_restore_pipeline_for_user(self.testUUID, file_names[0])

'''
Test 4 - Verify that restoring timeseries data works with sample real data
Expand All @@ -160,8 +159,65 @@ def testPurgeRestorePipeline(self):

# Two entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline
print(f"res_stats_count = {res_stats_count}")
# self.assertEqual(res_stats_count, 2)
# self.assertEqual(res, 1908)
self.assertEqual(res_stats_count, 2)
self.assertEqual(res, 1908)

def testPurgeRestorePipelineIncremental(self):
'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"About to purge {res} entries")
print(f"About to purge {res} entries")
self.assertEqual(res, 1906)

# Run the purge pipeline
file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental")
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the file exists after the export process
'''
for file_name in file_names:
self.assertTrue(pl.Path(file_name + ".gz").is_file())
# with gzip.open(file_name + ".gz", 'r') as ef:
# exported_data = json.loads(ef.read().decode('utf-8'))

'''
Test 3 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there is after
entries = edb.get_timeseries_db().find({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Purging complete: {res} entries remaining")
print(f"Purging complete: {res} entries remaining")

# A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline
stat_pipeline_key = entries[0].get('metadata').get('key')
print(f"stat_pipeline_key = {stat_pipeline_key}")
self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
self.assertEqual(res, 1)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
print("File names: %s" % file_names)
epr.run_restore_pipeline_for_user(self.testUUID, file_names)

'''
Test 4 - Verify that restoring timeseries data works with sample real data
'''
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res} entries restored")
print(f"Restoring complete: {res} entries restored")

# Two entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline
print(f"res_stats_count = {res_stats_count}")
# self.assertEqual(res_stats_count, 2)
# self.assertEqual(res, 1908)


if __name__ == '__main__':
Expand Down

0 comments on commit ec162ad

Please sign in to comment.