From 4703f044e32fbe153cd16b4bcaf2a03042e2ca97 Mon Sep 17 00:00:00 2001 From: "Mahadik, Mukul Chandrakant" Date: Mon, 16 Sep 2024 14:27:53 -0700 Subject: [PATCH] Cleaned up duplicate code, log statements + Refactored export.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes 1. Fetching only loc-like entries from the existing export data logic as the raw timeseries entries. - Found a lot of references that trip and place entries are a part of analysis timeseries database. Almost every place I’ve found uses data.start_ts for “analysis/*” metadata key entries In bin/debug/export_participants_trips_csv.py ``` ts = esta.TimeSeries.get_time_series(user_id) trip_time_query = estt.TimeQuery("data.start_ts", start_day_ts, end_day_ts) ct_df = ts.get_data_df("analysis/confirmed_trip", trip_time_query) ``` --------- In bin/debug/label_stats.py ``` for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})): if t["data"]["inferred_labels"] != []: confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"], "metadata.key": "analysis/confirmed_trip", "data.start_ts": t["data"]["start_ts"]}) ``` Similarly for data.entry_ts. ----------------- On the other hand for data.ts, timeseries_db was used since “background/*” metadata key entries were used: In emission/analysis/intake/segmentation/section_segmentation.py ``` get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"]) trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts) trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts) ``` ---------------- In emission/analysis/intake/segmentation/trip_segmentation.py ``` untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location", "data.ts", last_place_entry.data.enter_ts)).data ``` -------------------------------------- 2. Refactored emission/export/export.py - Added a separate function that returns exported entries so that this function can be reused in the purge pipeline code. - This helped to remove repeated code for re-fetching exported entries. - Also using databases parameter for exporting data from specific db. For the purge usecase, `databases` should only have 'timeseries_db' -------------------------------------- 3. Added raw_timeseries_only parameter to load_multi_timeline_for_range.py - If this argument is set, then pipeline_states will not be loaded since we don't want pipeline states to be restored during restoring raw timeseries data. -------------------------------------- 4. Cleaned up tests - Reduced repetitive code by moving assertion tests to functions that can be reused for both full and incremental export testing. -------------------------------------- 5. Removed export_timeseries.py and import_timeseries.py - No need to have duplicate code since now using existing scripts present in load_multi_timeline_for_range.py and export.py -------------------------------------- --- bin/debug/load_multi_timeline_for_range.py | 13 +- emission/export/export.py | 106 ++++-- emission/purge_restore/export_timeseries.py | 124 ------ emission/purge_restore/import_timeseries.py | 158 -------- emission/purge_restore/purge_data.py | 99 +++-- emission/purge_restore/restore_data.py | 14 +- emission/storage/pipeline_queries.py | 3 - .../exportTests/TestPurgeRestoreModule.py | 353 +++++------------- 8 files changed, 228 insertions(+), 642 deletions(-) delete mode 100644 emission/purge_restore/export_timeseries.py delete mode 100644 emission/purge_restore/import_timeseries.py diff --git a/bin/debug/load_multi_timeline_for_range.py b/bin/debug/load_multi_timeline_for_range.py index cdd2bf6a8..152736289 100644 --- a/bin/debug/load_multi_timeline_for_range.py +++ b/bin/debug/load_multi_timeline_for_range.py @@ -109,7 +109,7 @@ def post_check(unique_user_list, all_rerun_list): else: logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!") -def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000): +def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000, raw_timeseries_only=False): fn = file_prefix logging.info("Loading file or prefix %s" % fn) sel_file_list = common.read_files_with_prefix(fn) @@ -144,11 +144,12 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1])) wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]] (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error) - print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) + logging.debug("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) unique_user_list = set(all_user_list) if not info_only: - load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose) + if not raw_timeseries_only: + load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose) if mapfile is not None: register_mapped_users(mapfile, unique_user_list, verbose) elif prefix is not None: @@ -176,6 +177,9 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con parser.add_argument("-s", "--batch-size", default=10000, type=int, help="batch size to use for the entries") + + parser.add_argument("-t", "--raw-timeseries-only", default=False, action='store_true', + help="load only raw timeseries data; if not set load both raw and analysis timeseries data") group = parser.add_mutually_exclusive_group(required=False) group.add_argument("-p", "--prefix", default="user", @@ -189,4 +193,5 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con else: logging.basicConfig(level=logging.INFO) - load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size) + # load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size) + load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size, args.raw_timeseries_only) diff --git a/emission/export/export.py b/emission/export/export.py index 614c3327e..a8af447f3 100644 --- a/emission/export/export.py +++ b/emission/export/export.py @@ -39,13 +39,40 @@ def get_with_retry(retrieve_call, in_query): query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] return list_so_far -def get_from_all_three_sources_with_retry(user_id, in_query): +def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): + logging.info("In get_from_all_three_sources_with_retry: Databases = %s" % databases) + import emission.storage.timeseries.builtin_timeseries as estb ts = estb.BuiltinTimeSeries(user_id) uc = enua.UserCache.getUserCache(user_id) sort_key = ts._get_sort_key(in_query) + + source_db_calls = [] + + if databases is None or 'timeseries_db' in databases: + logging.info("Fetching from timeseries_db") + base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, + geo_query=None, extra_query_list=None, sort_key = sort_key) + source_db_calls.append(base_ts_call) + if databases is None or 'analysis_db' in databases: + logging.info("Fetching from analysis_timeseries_db") + analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq, + geo_query=None, extra_query_list=None, sort_key = sort_key) + source_db_calls.append(analysis_ts_call) + if databases is None or 'usercache_db' in databases: + logging.info("Fetching from usercache_db") + uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq)) + source_db_calls.append(uc_ts_call) + + retry_lists = [] + for source_db_call in source_db_calls: + retry_lists = retry_lists + get_with_retry(source_db_call, in_query) + + return retry_lists + + ''' base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq, @@ -55,48 +82,73 @@ def get_from_all_three_sources_with_retry(user_id, in_query): return get_with_retry(base_ts_call, in_query) + \ get_with_retry(analysis_ts_call, in_query) + \ get_with_retry(uc_ts_call, in_query) + ''' + +def get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases=None): + combined_list = [] + entries_lists = { + "loc_entry_list": None, + "trip_entry_list": None, + "place_entry_list": None + } -def export(user_id, ts, start_ts, end_ts, file_name, ma_bool): + if databases == ['timeseries_db']: + loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) + loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases) + combined_list = loc_entry_list + logging.info("Found %d loc-like entries = %d total entries" % + (len(loc_entry_list), len(combined_list))) + entries_lists["loc_entry_list"] = loc_entry_list + else: + loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) + loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query) + # Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts + # from the usercache as well + trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts) + trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query) + place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts) + place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query) + # Handle the case of the first place, which has no enter_ts and won't be + # matched by the default query + first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}}, + {'data.exit_ts': {'$exists': True}}]} + first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query])) + logging.info("First place entry list = %s" % first_place_entry_list) + combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list + logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" % + (len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list))) + entries_lists["loc_entry_list"] = loc_entry_list + entries_lists["trip_entry_list"] = trip_entry_list + entries_lists["place_entry_list"] = place_entry_list + + return entries_lists, combined_list + +def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None): + logging.info("In export: Databases = %s" % databases) logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" % (user_id, start_ts, end_ts, file_name)) + + entries_lists, combined_list = get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases) - loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) - loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query) - # Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts - # from the usercache as well - trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts) - trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query) - place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts) - place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query) - # Handle the case of the first place, which has no enter_ts and won't be - # matched by the default query - first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}}, - {'data.exit_ts': {'$exists': True}}]} - first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query])) - logging.info("First place entry list = %s" % first_place_entry_list) - - combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list - logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" % - (len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list))) - - validate_truncation(loc_entry_list, trip_entry_list, place_entry_list) + validate_truncation(entries_lists["loc_entry_list"], entries_lists["trip_entry_list"], entries_lists["place_entry_list"]) unique_key_list = set([e["metadata"]["key"] for e in combined_list]) logging.info("timeline has unique keys = %s" % unique_key_list) if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']): logging.info("No entries found in range for user %s, skipping save" % user_id) + return None else: combined_filename = "%s.gz" % (file_name) with gzip.open(combined_filename, "wt") as gcfd: json.dump(combined_list, gcfd, default=esj.wrapped_default, allow_nan=False, indent=4) + return combined_list - -def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): +def validate_truncation(loc_entry_list=None, trip_entry_list=None, place_entry_list=None): MAX_LIMIT = 25 * 10000 - if len(loc_entry_list) == MAX_LIMIT: + if loc_entry_list is not None and len(loc_entry_list) == MAX_LIMIT: logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list)) - if len(trip_entry_list) == MAX_LIMIT: + if trip_entry_list is not None and len(trip_entry_list) == MAX_LIMIT: logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list)) - if len(place_entry_list) == MAX_LIMIT: + if place_entry_list is not None and len(place_entry_list) == MAX_LIMIT: logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list)) diff --git a/emission/purge_restore/export_timeseries.py b/emission/purge_restore/export_timeseries.py deleted file mode 100644 index fc2bc596e..000000000 --- a/emission/purge_restore/export_timeseries.py +++ /dev/null @@ -1,124 +0,0 @@ -import logging -logging.basicConfig(level=logging.DEBUG) -import gzip -import os -import copy - -import uuid -import json -import emission.storage.json_wrappers as esj -import emission.storage.timeseries.timequery as estt -import emission.storage.timeseries.abstract_timeseries as esta -import emission.storage.timeseries.cache_series as estcs -import emission.net.usercache.abstract_usercache as enua - -def get_with_retry(retrieve_call, in_query): - # Let's clone the query since we are going to modify it - query = copy.copy(in_query) - # converts "data.ts" = ["data", "ts"] - timeTypeSplit = query.timeType.split(".") - list_so_far = [] - done = False - while not done: - # if we don't sort this here, we simply concatenate the entries in the - # two timeseries and analysis databases so we could end up with a later - # timestamp from the analysis database as opposed to the timeseries - (curr_count, curr_batch_cursor) = retrieve_call(query) - # If this is the first call (as identified by `len(list_so_far) == 0` - # the count is the total count - total_count = curr_count if len(list_so_far) == 0 else total_count - curr_batch = list(curr_batch_cursor) - if len(list_so_far) > 0 and len(curr_batch) > 0 and curr_batch[0]["_id"] == list_so_far[-1]["_id"]: - logging.debug(f"first entry {curr_batch[0]['_id']} == last entry so far {list_so_far[-1]['_id']}, deleting") - del curr_batch[0] - list_so_far.extend(curr_batch) - logging.debug(f"Retrieved batch of size {len(curr_batch)}, cumulative {len(list_so_far)} entries of total {total_count} documents for {query}") - if len(list_so_far) >= total_count: - done = True - else: - query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] - - return list_so_far - -def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): - import emission.storage.timeseries.builtin_timeseries as estb - - ts = estb.BuiltinTimeSeries(user_id) - - sort_key = ts._get_sort_key(in_query) - source_db_calls = [] - - logging.info("In get_from_all_three_sources_with_retry: Databases = %s" % databases) - - if databases is None or 'timeseries_db' in databases: - logging.info("Fetching from timeseries_db") - base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, - geo_query=None, extra_query_list=None, sort_key = sort_key) - source_db_calls.append(base_ts_call) - - retry_lists = [] - for source_db_call in source_db_calls: - retry_lists = retry_lists + get_with_retry(source_db_call, in_query) - - return retry_lists - -def export(user_id, ts, start_ts, end_ts, file_name, databases=None): - logging.info("In export: Databases = %s" % databases) - print("In export: Databases = %s" % databases) - - logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" % - (user_id, start_ts, end_ts, file_name)) - print("Extracting timeline for user %s day %s -> %s and saving to file %s" % - (user_id, start_ts, end_ts, file_name)) - - loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) - loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases) - # trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts) - # trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query, databases) - # place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts) - # place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query, databases) - - # combined_list = loc_entry_list + trip_entry_list + place_entry_list - combined_list = loc_entry_list - # logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" % - # (len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list))) - logging.info("Found %d loc-like entries = %d total entries" % - (len(loc_entry_list), len(combined_list))) - - validate_truncation(loc_entry_list) - # validate_truncation(loc_entry_list, trip_entry_list, place_entry_list) - - unique_key_list = set([e["metadata"]["key"] for e in combined_list]) - logging.info("timeline has unique keys = %s" % unique_key_list) - if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']): - logging.info("No entries found in range for user %s, skipping save" % user_id) - print("No entries found in range for user %s, skipping save" % user_id) - print("Combined list length = %d" % len(combined_list)) - print("Unique key list = %s" % unique_key_list) - return None - else: - combined_filename = "%s.gz" % (file_name) - logging.info("Combined list:") - logging.info(len(combined_list)) - with gzip.open(combined_filename, "wt") as gcfd: - json.dump(combined_list, - gcfd, default=esj.wrapped_default, allow_nan=False, indent=4) - - # Returning these queries that were used to fetch the data entries that were exported. - # Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported - return { - # 'trip_time_query': trip_time_query, - # 'place_time_query': place_time_query, - 'loc_time_query': loc_time_query - } - - -# def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): -def validate_truncation(loc_entry_list): - MAX_LIMIT = 25 * 10000 - if len(loc_entry_list) == MAX_LIMIT: - logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list)) - # if len(trip_entry_list) == MAX_LIMIT: - # logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list)) - # if len(place_entry_list) == MAX_LIMIT: - # logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list)) diff --git a/emission/purge_restore/import_timeseries.py b/emission/purge_restore/import_timeseries.py deleted file mode 100644 index 6870ca5a3..000000000 --- a/emission/purge_restore/import_timeseries.py +++ /dev/null @@ -1,158 +0,0 @@ -from __future__ import unicode_literals -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import -from future import standard_library -standard_library.install_aliases() -from builtins import zip -from builtins import str -from builtins import range -from builtins import * -import logging - -import json -import emission.storage.json_wrappers as esj -import argparse - -import bin.debug.common as common -import os - -import gzip - -import emission.storage.timeseries.abstract_timeseries as esta -import emission.core.wrapper.user as ecwu -import emission.core.wrapper.entry as ecwe -import emission.storage.timeseries.cache_series as estcs - -args = None - -def register_fake_users(prefix, unique_user_list, verbose): - logging.info("Creating user entries for %d users" % len(unique_user_list)) - - format_string = "{0}-%0{1}d".format(prefix, len(str(len(unique_user_list)))) - logging.info("pattern = %s" % format_string) - - for i, uuid in enumerate(unique_user_list): - username = (format_string % i) - if verbose is not None and i % verbose == 0: - logging.info("About to insert mapping %s -> %s" % (username, uuid)) - user = ecwu.User.registerWithUUID(username, uuid) - -def register_mapped_users(mapfile, unique_user_list, verbose): - uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook) - logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile))) - - lookup_map = dict([(eu["uuid"], eu) for eu in uuid_entries]) - - for i, uuid in enumerate(unique_user_list): - username = lookup_map[uuid]["user_email"] - # TODO: Figure out whether we should insert the entry directly or - # register this way - # Pro: will do everything that register does, including creating the profile - # Con: will insert only username and uuid - id and update_ts will be different - if verbose is not None and i % verbose == 0: - logging.info("About to insert mapping %s -> %s" % (username, uuid)) - user = ecwu.User.registerWithUUID(username, uuid) - -def get_load_ranges(entries, batch_size): - start_indices = list(range(0, len(entries), batch_size)) - ranges = list(zip(start_indices, start_indices[1:])) - ranges.append((start_indices[-1], len(entries))) - return ranges - -# def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose): -# import emission.core.get_database as edb -# import pymongo - -# for curr_uuid in all_uuid_list: -# pipeline_filename = "%s_pipelinestate_%s.gz" % (file_prefix, curr_uuid) -# print("Loading pipeline state for %s from %s" % -# (curr_uuid, pipeline_filename)) -# with gzip.open(pipeline_filename) as gfd: -# states = json.load(gfd, object_hook = esj.wrapped_object_hook) -# if verbose: -# logging.debug("Loading states of length %s" % len(states)) -# if len(states) > 0: -# try: -# edb.get_pipeline_state_db().insert_many(states) -# except pymongo.errors.BulkWriteError as e: -# # print(e.__dict__.keys()) -# # print(e._OperationFailure__details.keys()) -# all_error_codes = list(set([we['code'] for we in e.details['writeErrors']])) -# if len(all_error_codes) == 1 and all_error_codes[0] == 11000 and continue_on_error: -# logging.info("ignoring duplicate key error while restoring pipeline state") -# else: -# logging.error(e.details['writeErrors']) -# raise(e) -# else: -# logging.info("No pipeline states found, skipping load") - -def post_check(unique_user_list, all_rerun_list): - import emission.core.get_database as edb - import numpy as np - - logging.info("For %s users, loaded %s raw entries, %s processed entries and %s pipeline states" % - (len(unique_user_list), - edb.get_timeseries_db().count_documents({"user_id": {"$in": list(unique_user_list)}}), - edb.get_analysis_timeseries_db().count_documents({"user_id": {"$in": list(unique_user_list)}}), - edb.get_pipeline_state_db().count_documents({"user_id": {"$in": list(unique_user_list)}}))) - - all_rerun_arr = np.array(all_rerun_list) - - # want to check if no entry needs a rerun? In this case we are done - # no entry needs a rerun = all entries are false, not(all entries) are true - if np.all(np.logical_not(all_rerun_list)): - logging.info("all entries in the timeline contain analysis results, no need to run the intake pipeline") - # if all entries need to be re-run, we must have had raw data throughout - elif np.all(all_rerun_list): - logging.info("all entries in the timeline contain only raw data, need to run the intake pipeline") - else: - logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!") - -def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000): - fn = file_prefix - logging.info("Loading file or prefix %s" % fn) - sel_file_list = common.read_files_with_prefix(fn) - - all_user_list = [] - all_rerun_list = [] - (tsdb_count, ucdb_count) = (0,0) - - for i, filename in enumerate(sel_file_list): - # if "pipelinestate" in filename: - # continue - logging.info("=" * 50) - logging.info("Loading data from file %s" % filename) - - entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook) - - # Obtain uuid and rerun information from entries - curr_uuid_list, needs_rerun = common.analyse_timeline(entries) - if len(curr_uuid_list) > 1: - logging.warning("Found %d users, %s in filename, aborting! " % - (len(curr_uuid_list), curr_uuid_list)) - raise RuntimeException("Found %d users, %s in filename, expecting 1, %s" % - (len(curr_uuid_list), curr_uuid_list, common.split_user_id(filename))) - curr_uuid = curr_uuid_list[0] - all_user_list.append(curr_uuid) - all_rerun_list.append(needs_rerun) - - load_ranges = get_load_ranges(entries, batch_size) - if not info_only: - for j, curr_range in enumerate(load_ranges): - if verbose is not None and j % verbose == 0: - logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1])) - wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]] - (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error) - print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) - - unique_user_list = set(all_user_list) - if not info_only: - # load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose) - if mapfile is not None: - register_mapped_users(mapfile, unique_user_list, verbose) - elif prefix is not None: - register_fake_users(prefix, unique_user_list, verbose) - - post_check(unique_user_list, all_rerun_list) - return (tsdb_count, ucdb_count) diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py index 04209fe3f..cd6de383f 100644 --- a/emission/purge_restore/purge_data.py +++ b/emission/purge_restore/purge_data.py @@ -5,7 +5,6 @@ import emission.storage.pipeline_queries as espq import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.analysis_timeseries_queries as esda -import emission.purge_restore.export_timeseries as epret import gzip import json import os @@ -13,6 +12,7 @@ import emission.storage.json_wrappers as esj import emission.core.wrapper.entry as ecwe import emission.storage.timeseries.timequery as estt +import emission.export.export as eee def purge_data(user_id, archive_dir, export_type): file_names = None @@ -20,8 +20,6 @@ def purge_data(user_id, archive_dir, export_type): pdp = PurgeDataPipeline() pdp.user_id = user_id file_names = pdp.run_purge_data_pipeline(user_id, archive_dir, export_type) - logging.debug("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts)) - print("last_processed_ts with entries_to_export logic = %s" % (pdp.last_processed_ts)) if pdp.last_processed_ts is None: logging.debug("After run, last_processed_ts == None, must be early return") espq.mark_purge_data_done(user_id, pdp.last_processed_ts) @@ -50,108 +48,96 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type): initStartTs = time_query.startTs initEndTs = time_query.endTs - print("Inside: purge_data - Start time: %s" % initStartTs) - print("Inside: purge_data - End time: %s" % initEndTs) + logging.debug("Initial pipeline purge query range = start_time: %s , end_time: %s" % (initStartTs, initEndTs)) file_names = [] - entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) - count_entries = len(entries_to_export) + total_entries_to_export = eee.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, ['timeseries_db'])[1] # If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point # Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected - current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts'] + current_start_ts = initStartTs if initStartTs is not None else total_entries_to_export[0]['data']['ts'] - # while current_start_ts < initEndTs: while True: - print("Inside while loop: current_start_ts = %s" % current_start_ts) + logging.debug("Inside while loop: current_start_ts = %s" % current_start_ts) current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs if export_type == 'incremental': current_end_ts = min(current_start_ts + 3600, initEndTs) - print("Inside export_type incremental, increasing current_end_ts: %s" % current_end_ts) + logging.debug("Performing incremental export, increasing current_end_ts: %s" % current_end_ts) elif export_type == 'full': current_end_ts = initEndTs - print("Inside export_type full, setting current_end_ts to current time: %s" % current_end_ts) + logging.debug("Performing full export, setting current_end_ts to current time: %s" % current_end_ts) else: raise ValueError("Unknown export_type %s" % export_type) - print(f"Processing data from {current_start_ts} to {current_end_ts}") + logging.debug(f"Processing current batch from {current_start_ts} to {current_end_ts}") file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) - export_queries = epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) - # epret.export(user_id, ts, current_start_ts, current_end_ts, file_name) + current_batch_exported_entries = eee.export(user_id, ts, current_start_ts, current_end_ts, file_name, False, ['timeseries_db']) - entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs) - count_entries_1 = len(entries_to_export_1) + # Recompute total entries from pipeline initial start time to end time since we are deleting entries iteratively + # This is used to keep a track of remaining entries to export + remaining_timeseries_entries = eee.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, ['timeseries_db'])[1] - if export_queries is None and count_entries_1 > 0: - print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) - print("Incrementing time range by 1 hour") + if current_batch_exported_entries is None and len(remaining_timeseries_entries) > 0: + logging.debug("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) + logging.debug("Incrementing time range by 1 hour to process remaining timeseries entries") current_start_ts = current_end_ts continue - # if count_entries_2 == 0 and count_entries_1 == 0: - elif export_queries is None and count_entries_1 == 0: - # Didn't process anything new so start at the same point next time - # self._last_processed_ts = None + elif current_batch_exported_entries is None and len(remaining_timeseries_entries) == 0: logging.debug("No new data to export, breaking out of while loop") - print("No new data to export, breaking out of while loop") break - - entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - count_entries_2 = len(entries_to_export_2) - print("count_entries_2 = %s" % count_entries_2) - - logging.debug("Exporting to file: %s" % file_name) - print("Exporting to file: %s" % file_name) + logging.debug("Exported to file: %s" % file_name) file_names.append(file_name) - print("File names: %s" % file_names) + logging.debug("List of exported file names: %s" % file_names) self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) - print("Total entries to export: %s" % count_entries) - print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2)) - print("New count entries to export: %s" % count_entries_1) - self._last_processed_ts = entries_to_export_2[-1]['data']['ts'] - print("Updated last_processed_ts %s" % self._last_processed_ts) + logging.debug("Total entries to export: %s" % len(total_entries_to_export)) + logging.debug("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(current_batch_exported_entries))) + logging.debug("Remaining entries to export: %s" % len(remaining_timeseries_entries)) + + self._last_processed_ts = current_batch_exported_entries[-1]['data']['ts'] + logging.debug("Updated last_processed_ts to last entry in current export batch = %s" % self._last_processed_ts) current_start_ts = current_end_ts if current_start_ts >= initEndTs: break - print("Exported data to %s files" % len(file_names)) - print("Exported file names: %s" % file_names) + logging.debug("Exported data to %s files: %s" % (len(file_names), file_names)) return file_names + - # def export_pipeline_states(self, user_id, file_name): - # pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) - # logging.info("Found %d pipeline states %s" % - # (len(pipeline_state_list), - # list([ps["pipeline_stage"] for ps in pipeline_state_list]))) - # pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id) - # with gzip.open(pipeline_filename, "wt") as gpfd: - # json.dump(pipeline_state_list, - # gpfd, default=esj.wrapped_default, allow_nan=False, indent=4) + def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): + export_timequery = estt.TimeQuery("data.ts", start_ts_datetime, end_ts_datetime) + ts_query = ts._get_query(time_query=export_timequery) + delete_query = {"user_id": user_id, **ts_query} + count_entries_to_delete = ts.timeseries_db.count_documents(delete_query) + logging.debug(f"Number of matching entries for deletion = {count_entries_to_delete}") + + logging.debug("Deleting entries from database...") + result = ts.timeseries_db.delete_many(delete_query) + assert(result.deleted_count == count_entries_to_delete) + logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) + + ''' def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) for key, value in export_queries.items(): ts_query = ts._get_query(time_query=value) - print(ts_query) + logging.debug(ts_query) delete_query = {"user_id": user_id, **ts_query} count = ts.timeseries_db.count_documents(delete_query) logging.debug(f"Number of documents matching for {ts.timeseries_db} with {key} query: {count}") - print(f"Number of documents matching for {ts.timeseries_db} with {key} query: {count}") logging.debug("Deleting entries from database...") - print("Deleting entries from database...") result = ts.timeseries_db.delete_many(delete_query) logging.debug(f"Key query: {key}") - print(f"Key query: {key}") logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) - + def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): entries_to_export = [] export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) @@ -163,6 +149,7 @@ def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_d logging.debug(f"Key query: {key}") logging.debug("{} fetched entries from {} to {}".format(ts_db_count, start_ts_datetime, end_ts_datetime)) return entries_to_export + def get_export_queries(self, start_ts, end_ts): export_queries = { @@ -171,4 +158,4 @@ def get_export_queries(self, start_ts, end_ts): 'loc_time_query': estt.TimeQuery("data.ts", start_ts, end_ts) } return export_queries - \ No newline at end of file + ''' diff --git a/emission/purge_restore/restore_data.py b/emission/purge_restore/restore_data.py index 2758336a9..71c402418 100644 --- a/emission/purge_restore/restore_data.py +++ b/emission/purge_restore/restore_data.py @@ -12,7 +12,6 @@ import emission.storage.json_wrappers as esj import emission.core.get_database as edb import emission.core.wrapper.pipelinestate as ecwp -import emission.purge_restore.import_timeseries as eprit def restore_data(user_id, file_names): try: @@ -38,18 +37,11 @@ def run_restore_data_pipeline(self, user_id, file_names): time_query = espq.get_time_range_for_restore_data(user_id) for file_name in file_names: entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook) - # pipelineState = edb.get_pipeline_state_db().find_one({"user_id": user_id, - # "pipeline_stage": ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.value}) - # self._last_processed_ts = pipelineState["last_processed_ts"] - # logging.debug("Restoring from file, last_processed_ts = %s" % (self._last_processed_ts)) - # (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - print("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) + (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, raw_timeseries_only=True) + logging.debug("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) if tsdb_count == 0: # Didn't process anything new so start at the same point next time self._last_processed_ts = None else: self._last_processed_ts = entries_to_import[-1]['data']['ts'] - print("After load, last_processed_ts = %s" % (self._last_processed_ts)) - # if self._last_processed_ts is None or self._last_processed_ts < entries_to_import[-1]['metadata']['write_ts']: - # self._last_processed_ts = entries_to_import[-1]['metadata']['write_ts'] + logging.debug("After load, last_processed_ts = %s" % (self._last_processed_ts)) diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index bc4ec72dc..6460c9a41 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -350,9 +350,6 @@ def get_time_range_for_stage(user_id, stage): Returns the start ts and the end ts of the entries in the stage """ curr_state = get_current_state(user_id, stage) - print("Inside get_time_range_for_stage") - if curr_state is not None: - print("curr_state last_processed_ts = %s" % curr_state.last_processed_ts) if curr_state is None: start_ts = None diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py index be409764f..bc51eae3e 100644 --- a/emission/tests/exportTests/TestPurgeRestoreModule.py +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -16,8 +16,6 @@ import emission.storage.timeseries.abstract_timeseries as esta import emission.pipeline.purge_stage as epp import emission.pipeline.restore_stage as epr -import emission.purge_restore.export_timeseries as epret -import emission.purge_restore.import_timeseries as eprit import emission.purge_restore.purge_data as eprpd import emission.storage.json_wrappers as esj import emission.storage.timeseries.timequery as estt @@ -26,22 +24,11 @@ class TestPurgeRestoreModule(unittest.TestCase): def setUp(self): self.testEmail = "testPurgeRestoreUser123" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") - # etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") - print("Test UUID for Purge: %s" % self.testUUID) + logging.debug("Test UUID for Purge: %s" % self.testUUID) etc.runIntakePipeline(self.testUUID) def tearDown(self): - print("Clearing entries for test UUID from database...") - etc.dropAllCollections(edb._get_current_db()) - # self.clearRelatedDb() - # self.clearAllDb() - - def clearRelatedDb(self): - edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) - edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) - edb.get_usercache_db().delete_many({'user_id': self.testUUID}) - edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) - edb.get_uuid_db().delete_one({"user_email": self.testEmail}) + self.clearAllDb() def clearAllDb(self): edb.get_timeseries_db().delete_many({}) @@ -50,69 +37,7 @@ def clearAllDb(self): edb.get_pipeline_state_db().delete_many({}) edb.get_uuid_db().delete_one({}) - # def testPurgeRestoreModule(self): - # ts = esta.TimeSeries.get_time_series(self.testUUID) - # time_query = { - # 'startTs': 1437578093.881, - # 'endTs': 1437633635.069 # Exporting all 1906 entries - # # 'endTs': 1437597615.778 # Exporting first 650 entries - # } - # file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (self.testUUID, time_query['startTs'], time_query['endTs']) - - # export_queries = epret.export(self.testUUID, ts, time_query['startTs'], time_query['endTs'], file_name, False) - # pdp = eprpd.PurgeDataPipeline() - # # pdp.export_pipeline_states(self.testUUID, file_name) - - # ''' - # Test 1 - Assert the file exists after the export process - # ''' - # self.assertTrue(pl.Path(file_name + ".gz").is_file()) - # # with gzip.open(file_name + ".gz", 'r') as ef: - # # exported_data = json.loads(ef.read().decode('utf-8')) - - # ''' - # Test 2 - Verify that purging timeseries data works with sample real data - # ''' - # # Check how much data there was before - # res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - # logging.info(f"About to purge {res} entries") - # print(f"About to purge {res} entries") - # self.assertEqual(res, 1906) - - # pdp.delete_timeseries_entries(self.testUUID, ts, time_query['startTs'], time_query['endTs']) - - # # Check how much data there is after - # res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - # logging.info(f"Purging complete: {res} entries remaining") - # print(f"Purging complete: {res} entries remaining") - # self.assertEqual(res, 0) - - # ''' - # Test 3 - Verify that restoring timeseries data works with sample real data - # ''' - # # Run the restore function - # logging.info(f"About to restore entries") - # print(f"About to restore entries") - # (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - # # lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - - # # Check how much data there is after - # res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - # logging.info(f"Restoring complete: {res} entries restored") - # print(f"Restoring complete: {res} entries restored") - # self.assertEqual(res, 1906) - - # ''' - # Test 4 - Verify that restoring timeseries data fails if data already exists - # Duplicate key error is ignored hence no entries should be inserted - # ''' - # logging.info("Attempting to load duplicate data...") - # print("Attempting to load duplicate data...") - # (tsdb_count, ucdb_count) = eprit.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True) - # self.assertEqual(tsdb_count, 0) - - def testPurgeRestorePipelineFull(self): - with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + def getEntriesToExport(self, tmpdirname): self.assertTrue(os.path.isdir(tmpdirname)) #Set the envrionment variable @@ -125,194 +50,104 @@ def testPurgeRestorePipelineFull(self): sort_key = ts._get_sort_key(tq) (ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) entries_to_export = list(ts_db_result) + return entries_to_export + + def prePipelineTests(self): + ''' + Test 1 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.debug(f"About to purge {res} entries") + self.assertEqual(res, 1906) + + def postPipelineTests(self, entries_to_export, file_names): + logging.debug("Exported file names: %s" % file_names) + ''' + Test 2 - Assert the file exists after the export process + ''' + exported_data = [] + for file_name in file_names: + self.assertTrue(pl.Path(file_name + ".gz").is_file()) + with gzip.open(file_name + ".gz", 'r') as ef: + exported_data.extend(json.loads(ef.read().decode('utf-8'))) + # exported_data = json.loads(ef.read().decode('utf-8')) + self.assertEqual(len(exported_data), 1906) + + ''' + Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db + ''' + entries_from_db = entries_to_export + logging.debug("Entries from db size: %s" % len(entries_from_db)) + entries_from_db = entries_from_db[:5] + entries_from_db[-5:] + entries_from_file = exported_data[:5] + exported_data[-5:] + objectIds_from_db = [entry["_id"] for entry in entries_from_db] + objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file] + logging.debug("Object ids from db: %s" % objectIds_from_db) + logging.debug("Object ids from file: %s" % objectIds_from_file) + self.assertEqual(objectIds_from_db, objectIds_from_file) + + ''' + Test 4 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there is after + entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.debug(f"Purging complete: {res} entries remaining") - ''' - Test 1 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there was before - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"About to purge {res} entries") - print(f"About to purge {res} entries") - self.assertEqual(res, 1906) - - # Run the purge pipeline - file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") - print("Exported file names: %s" % file_names) - - ''' - Test 2 - Assert the file exists after the export process - ''' - self.assertTrue(pl.Path(file_names[0] + ".gz").is_file()) - with gzip.open(file_names[0] + ".gz", 'r') as ef: - exported_data = json.loads(ef.read().decode('utf-8')) - self.assertEqual(len(exported_data), 1906) - - ''' - Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db - ''' - entries_from_db = entries_to_export - print("Entries from db size: %s" % len(entries_from_db)) - entries_from_db = entries_from_db[:5] + entries_from_db[-5:] - entries_from_file = exported_data[:5] + exported_data[-5:] - objectIds_from_db = [entry["_id"] for entry in entries_from_db] - objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file] - print("Object ids from db: %s" % objectIds_from_db) - print("Object ids from file: %s" % objectIds_from_file) - self.assertEqual(objectIds_from_db, objectIds_from_file) - - ''' - Test 4 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there is after - entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"Purging complete: {res} entries remaining") - print(f"Purging complete: {res} entries remaining") - - # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline - stat_pipeline_key = entries[0].get('metadata').get('key') - print(f"stat_pipeline_key = {stat_pipeline_key}") - self.assertEqual(stat_pipeline_key,'stats/pipeline_time') - self.assertEqual(res, 1) + # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline + stat_pipeline_key = entries[0].get('metadata').get('key') + logging.debug(f"stat_pipeline_key = {stat_pipeline_key}") + self.assertEqual(stat_pipeline_key,'stats/pipeline_time') + self.assertEqual(res, 1) + + # Run the restore pipeline + logging.debug(f"About to restore entries") + logging.debug("File names: %s" % file_names) + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + + ''' + Test 5 - Verify that restoring timeseries data works with sample real data + ''' + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.debug(f"Restoring complete: {res-2} entries restored") + + # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline + self.assertEqual(res_stats_count, 2) + self.assertEqual(res, 1908) + + ''' + Test 6 - Verify that restoring timeseries data is skipped if data already exists + Duplicate key error is ignored in import_timeseries.py + Hence no entries should be inserted + ''' + logging.debug("Attempting to load duplicate data...") + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.debug(f"Restoring complete: {res-2} entries restored") + + # A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again + self.assertEqual(res_stats_count, 3) + self.assertEqual(res, 1909) - # Run the restore pipeline - logging.info(f"About to restore entries") - print(f"About to restore entries") - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - ''' - Test 5 - Verify that restoring timeseries data works with sample real data - ''' - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res-2} entries restored") - print(f"Restoring complete: {res-2} entries restored") - - # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline - self.assertEqual(res_stats_count, 2) - self.assertEqual(res, 1908) - - ''' - Test 6 - Verify that restoring timeseries data is skipped if data already exists - Duplicate key error is ignored in import_timeseries.py - Hence no entries should be inserted - ''' - logging.info("Attempting to load duplicate data...") - print("Attempting to load duplicate data...") - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res-2} entries restored") - print(f"Restoring complete: {res-2} entries restored") - - # A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again - self.assertEqual(res_stats_count, 3) - self.assertEqual(res, 1909) + def testPurgeRestorePipelineFull(self): + with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + entries_to_export = self.getEntriesToExport(tmpdirname) + self.prePipelineTests() + file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") + self.postPipelineTests(entries_to_export, file_names) def testPurgeRestorePipelineIncremental(self): with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: - self.assertTrue(os.path.isdir(tmpdirname)) - - #Set the envrionment variable - os.environ['DATA_DIR'] = tmpdirname - self.assertEqual(os.environ['DATA_DIR'], tmpdirname) - - # Fetch entries from timeseries db before purging to use in tests - ts = esta.TimeSeries.get_time_series(self.testUUID) - tq = estt.TimeQuery("data.ts", None, time.time() - 5) - sort_key = ts._get_sort_key(tq) - (ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) - entries_to_export = list(ts_db_result) - - ''' - Test 1 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there was before - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"About to purge {res} entries") - print(f"About to purge {res} entries") - self.assertEqual(res, 1906) - - # Run the purge pipeline + entries_to_export = self.getEntriesToExport(tmpdirname) + self.prePipelineTests() file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental") - print("Exported file names: %s" % file_names) - - ''' - Test 2 - Assert the file exists after the export process and checking contents - ''' - exported_data = [] - for file_name in file_names: - self.assertTrue(pl.Path(file_name + ".gz").is_file()) - with gzip.open(file_name + ".gz", 'r') as ef: - exported_data.extend(json.loads(ef.read().decode('utf-8'))) - self.assertEqual(len(exported_data), 1906) - - ''' - Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db - ''' - entries_from_db = entries_to_export - print("Entries from db size: %s" % len(entries_from_db)) - entries_from_db = entries_from_db[:5] + entries_from_db[-5:] - entries_from_file = exported_data[:5] + exported_data[-5:] - objectIds_from_db = [entry["_id"] for entry in entries_from_db] - objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file] - print("Object ids from db: %s" % objectIds_from_db) - print("Object ids from file: %s" % objectIds_from_file) - self.assertEqual(objectIds_from_db, objectIds_from_file) - - ''' - Test 4 - Verify that purging timeseries data works with sample real data - ''' - # Check how much data there is after - entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - logging.info(f"Purging complete: {res} entries remaining") - print(f"Purging complete: {res} entries remaining") - - # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline - stat_pipeline_key = entries[0].get('metadata').get('key') - print(f"stat_pipeline_key = {stat_pipeline_key}") - self.assertEqual(stat_pipeline_key,'stats/pipeline_time') - self.assertEqual(res, 1) - - # Run the restore pipeline - logging.info(f"About to restore entries") - print(f"About to restore entries") - print("File names: %s" % file_names) - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - - ''' - Test 5 - Verify that restoring timeseries data works with sample real data - ''' - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res-2} entries restored") - print(f"Restoring complete: {res-2} entries restored") - - # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline - self.assertEqual(res_stats_count, 2) - self.assertEqual(res, 1908) - - ''' - Test 6 - Verify that restoring timeseries data is skipped if data already exists - Duplicate key error is ignored in import_timeseries.py - Hence no entries should be inserted - ''' - logging.info("Attempting to load duplicate data...") - print("Attempting to load duplicate data...") - epr.run_restore_pipeline_for_user(self.testUUID, file_names) - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) - res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) - logging.info(f"Restoring complete: {res-2} entries restored") - print(f"Restoring complete: {res-2} entries restored") - - # A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again - self.assertEqual(res_stats_count, 3) - self.assertEqual(res, 1909) + self.postPipelineTests(entries_to_export, file_names) if __name__ == '__main__': etc.configLogging()