Skip to content

Commit

Permalink
Cleaned up duplicate code, log statements + Refactored export.py
Browse files Browse the repository at this point in the history
Changes

1. Fetching only loc-like entries from the existing export data logic as the raw timeseries entries.
- Found a lot of references that trip and place entries are a part of analysis timeseries database.

Almost every place I’ve found uses data.start_ts for “analysis/*” metadata key entries

In bin/debug/export_participants_trips_csv.py
```
    ts = esta.TimeSeries.get_time_series(user_id)
    trip_time_query = estt.TimeQuery("data.start_ts", start_day_ts, end_day_ts)
    ct_df = ts.get_data_df("analysis/confirmed_trip", trip_time_query)

```

---------

In bin/debug/label_stats.py
```
for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})):
    if t["data"]["inferred_labels"] != []:
        confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"],
                "metadata.key": "analysis/confirmed_trip",
                "data.start_ts": t["data"]["start_ts"]})
```

Similarly for data.entry_ts.

-----------------

On the other hand for data.ts, timeseries_db was used since “background/*” metadata key entries were used:

In emission/analysis/intake/segmentation/section_segmentation.py
```
    get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"])
    trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts)
    trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts)
```

----------------

In emission/analysis/intake/segmentation/trip_segmentation.py
```
            untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location",
                                                     "data.ts", last_place_entry.data.enter_ts)).data

```

--------------------------------------

2. Refactored emission/export/export.py

- Added a separate function that returns exported entries so that this function can be reused in the purge pipeline code.
- This helped to remove repeated code for re-fetching exported entries.

- Also using databases parameter for exporting data from specific db. For the purge usecase, `databases` should only have 'timeseries_db'

--------------------------------------

3. Added raw_timeseries_only parameter to load_multi_timeline_for_range.py
- If this argument is set, then pipeline_states will not be loaded since we don't want pipeline states to be restored during restoring raw timeseries data.

--------------------------------------

4. Cleaned up tests
- Reduced repetitive code by moving assertion tests to functions that can be reused for both full and incremental export testing.

--------------------------------------

5. Removed export_timeseries.py and import_timeseries.py
- No need to have duplicate code since now using existing scripts present in load_multi_timeline_for_range.py and export.py

--------------------------------------
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Sep 16, 2024
1 parent 23734e5 commit 4703f04
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 642 deletions.
13 changes: 9 additions & 4 deletions bin/debug/load_multi_timeline_for_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def post_check(unique_user_list, all_rerun_list):
else:
logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!")

def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000):
def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000, raw_timeseries_only=False):
fn = file_prefix
logging.info("Loading file or prefix %s" % fn)
sel_file_list = common.read_files_with_prefix(fn)
Expand Down Expand Up @@ -144,11 +144,12 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con
logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1]))
wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]]
(tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error)
print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count))
logging.debug("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count))

unique_user_list = set(all_user_list)
if not info_only:
load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose)
if not raw_timeseries_only:
load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose)
if mapfile is not None:
register_mapped_users(mapfile, unique_user_list, verbose)
elif prefix is not None:
Expand Down Expand Up @@ -176,6 +177,9 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con

parser.add_argument("-s", "--batch-size", default=10000, type=int,
help="batch size to use for the entries")

parser.add_argument("-t", "--raw-timeseries-only", default=False, action='store_true',
help="load only raw timeseries data; if not set load both raw and analysis timeseries data")

group = parser.add_mutually_exclusive_group(required=False)
group.add_argument("-p", "--prefix", default="user",
Expand All @@ -189,4 +193,5 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con
else:
logging.basicConfig(level=logging.INFO)

load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size)
# load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size)
load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size, args.raw_timeseries_only)
106 changes: 79 additions & 27 deletions emission/export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,40 @@ def get_with_retry(retrieve_call, in_query):
query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]]
return list_so_far

def get_from_all_three_sources_with_retry(user_id, in_query):
def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):
logging.info("In get_from_all_three_sources_with_retry: Databases = %s" % databases)

import emission.storage.timeseries.builtin_timeseries as estb

ts = estb.BuiltinTimeSeries(user_id)
uc = enua.UserCache.getUserCache(user_id)

sort_key = ts._get_sort_key(in_query)

source_db_calls = []

if databases is None or 'timeseries_db' in databases:
logging.info("Fetching from timeseries_db")
base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
source_db_calls.append(base_ts_call)
if databases is None or 'analysis_db' in databases:
logging.info("Fetching from analysis_timeseries_db")
analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
source_db_calls.append(analysis_ts_call)
if databases is None or 'usercache_db' in databases:
logging.info("Fetching from usercache_db")
uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq))
source_db_calls.append(uc_ts_call)

retry_lists = []
for source_db_call in source_db_calls:
retry_lists = retry_lists + get_with_retry(source_db_call, in_query)

return retry_lists

'''
base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq,
Expand All @@ -55,48 +82,73 @@ def get_from_all_three_sources_with_retry(user_id, in_query):
return get_with_retry(base_ts_call, in_query) + \
get_with_retry(analysis_ts_call, in_query) + \
get_with_retry(uc_ts_call, in_query)
'''

def get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases=None):
combined_list = []
entries_lists = {
"loc_entry_list": None,
"trip_entry_list": None,
"place_entry_list": None
}

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool):
if databases == ['timeseries_db']:
loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases)
combined_list = loc_entry_list
logging.info("Found %d loc-like entries = %d total entries" %
(len(loc_entry_list), len(combined_list)))
entries_lists["loc_entry_list"] = loc_entry_list
else:
loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query)
# Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts
# from the usercache as well
trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts)
trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query)
place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts)
place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query)
# Handle the case of the first place, which has no enter_ts and won't be
# matched by the default query
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},
{'data.exit_ts': {'$exists': True}}]}
first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query]))
logging.info("First place entry list = %s" % first_place_entry_list)
combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list
logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" %
(len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list)))
entries_lists["loc_entry_list"] = loc_entry_list
entries_lists["trip_entry_list"] = trip_entry_list
entries_lists["place_entry_list"] = place_entry_list

return entries_lists, combined_list

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
logging.info("In export: Databases = %s" % databases)
logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" %
(user_id, start_ts, end_ts, file_name))

entries_lists, combined_list = get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases)

loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query)
# Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts
# from the usercache as well
trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts)
trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query)
place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts)
place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query)
# Handle the case of the first place, which has no enter_ts and won't be
# matched by the default query
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},
{'data.exit_ts': {'$exists': True}}]}
first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query]))
logging.info("First place entry list = %s" % first_place_entry_list)

combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list
logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" %
(len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list)))

validate_truncation(loc_entry_list, trip_entry_list, place_entry_list)
validate_truncation(entries_lists["loc_entry_list"], entries_lists["trip_entry_list"], entries_lists["place_entry_list"])

unique_key_list = set([e["metadata"]["key"] for e in combined_list])
logging.info("timeline has unique keys = %s" % unique_key_list)
if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']):
logging.info("No entries found in range for user %s, skipping save" % user_id)
return None
else:
combined_filename = "%s.gz" % (file_name)
with gzip.open(combined_filename, "wt") as gcfd:
json.dump(combined_list,
gcfd, default=esj.wrapped_default, allow_nan=False, indent=4)
return combined_list


def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
def validate_truncation(loc_entry_list=None, trip_entry_list=None, place_entry_list=None):
MAX_LIMIT = 25 * 10000
if len(loc_entry_list) == MAX_LIMIT:
if loc_entry_list is not None and len(loc_entry_list) == MAX_LIMIT:
logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list))
if len(trip_entry_list) == MAX_LIMIT:
if trip_entry_list is not None and len(trip_entry_list) == MAX_LIMIT:
logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list))
if len(place_entry_list) == MAX_LIMIT:
if place_entry_list is not None and len(place_entry_list) == MAX_LIMIT:
logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list))
124 changes: 0 additions & 124 deletions emission/purge_restore/export_timeseries.py

This file was deleted.

Loading

0 comments on commit 4703f04

Please sign in to comment.