diff --git a/app_sidebar_collapsible.py b/app_sidebar_collapsible.py index da74ade..8966246 100644 --- a/app_sidebar_collapsible.py +++ b/app_sidebar_collapsible.py @@ -210,17 +210,19 @@ def make_controls(): 'flex-direction': 'column'} ) -page_content = dcc.Loading( - type='default', - fullscreen=True, - children=html.Div(dash.page_container, style={ +# Dcc Loading removed for Data Page Lazy Loading. +# TODO Figure out how to enable Loading on everything BUT Data Page UUIDs Tab +page_content = html.Div( + dash.page_container, + style={ "margin-left": "5rem", "margin-right": "2rem", "padding": "2rem 1rem", - }) + } ) + def make_home_page(): return [ sidebar, html.Div([make_controls(), page_content]) diff --git a/pages/data.py b/pages/data.py index f658202..478ea7d 100644 --- a/pages/data.py +++ b/pages/data.py @@ -3,12 +3,13 @@ Since the dcc.Location component is not in the layout when navigating to this page, it triggers the callback. The workaround is to check if the input value is None. """ -from dash import dcc, html, Input, Output, callback, register_page, dash_table, State +from dash import dcc, html, Input, Output, callback, register_page, dash_table, State, callback_context, Patch # Etc import logging +import time import pandas as pd from dash.exceptions import PreventUpdate - +from concurrent.futures import ThreadPoolExecutor, as_completed from utils import constants from utils import permissions as perm_utils from utils import db_utils @@ -28,10 +29,14 @@ dcc.Tab(label='Trajectories', value='tab-trajectories-datatable'), ]), html.Div(id='tabs-content'), + dcc.Interval(id='interval-load-more', interval=6000, n_intervals=0), + dcc.Store(id='store-uuids', data=[]), # Store to hold the original UUIDs data + dcc.Store(id='store-loaded-uuids', data={'data': [], 'loaded': False}) # Store to track loaded data ] ) + def clean_location_data(df): if 'data.start_loc.coordinates' in df.columns: df['data.start_loc.coordinates'] = df['data.start_loc.coordinates'].apply(lambda x: f'({x[0]}, {x[1]})') @@ -51,6 +56,8 @@ def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_ @callback( Output('tabs-content', 'children'), + Output('store-loaded-uuids', 'data'), + Output('interval-load-more', 'disabled'), # Disable interval when all data is loaded Input('tabs-datatable', 'value'), Input('store-uuids', 'data'), Input('store-excluded-uuids', 'data'), @@ -60,66 +67,110 @@ def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_ Input('date-picker', 'start_date'), Input('date-picker', 'end_date'), Input('date-picker-timezone', 'value'), + Input('interval-load-more', 'n_intervals'),# Interval to trigger the loading of more data + State('store-loaded-uuids', 'data'), # Use State to track already loaded data + State('store-loaded-uuids', 'loaded'), # Keep track if we have finished loading all data ) -def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, start_date, end_date, timezone): - data, columns, has_perm = None, [], False - if tab == 'tab-uuids-datatable': - data = store_uuids["data"] - data = db_utils.add_user_stats(data) - columns = perm_utils.get_uuids_columns() - has_perm = perm_utils.has_permission('data_uuids') +def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, start_date, end_date, timezone, n_intervals, loaded_uuids_store, all_data_loaded): + initial_batch_size = 10 # Define the batch size for loading UUIDs + + # Ensure store_uuids contains the key 'data' which is a list of dictionaries + if not isinstance(store_uuids, dict) or 'data' not in store_uuids: + logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") + return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True + + # Extract the list of UUIDs from the dict + uuids_list = store_uuids['data'] + + # Ensure uuids_list is a list for slicing + if not isinstance(uuids_list, list): + logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") + return html.Div([html.P("Data structure error.")]), loaded_uuids_store, True + + # Retrieve already loaded data from the store + loaded_data = loaded_uuids_store.get('data', []) + total_loaded = len(loaded_data) + + # Handle the UUIDs tab with lazy loading + if tab == 'tab-uuids-datatable' and not loaded_uuids_store.get('loaded', False): + total_to_load = total_loaded + initial_batch_size + total_to_load = min(total_to_load, len(uuids_list)) # Avoid loading more than available + + logging.debug(f"Loading next batch of UUIDs: {total_loaded} to {total_to_load}") + + # Slice the list of UUIDs from the dict + new_data = uuids_list[total_loaded:total_to_load] + + if new_data: + # Process and append the new data to the loaded store + processed_data = db_utils.add_user_stats(new_data, initial_batch_size) + loaded_data.extend(processed_data) + + # Create a Patch object to append data progressively + patched_data = Patch() + patched_data['data'] = processed_data + + # Update the store with the new data + loaded_uuids_store['data'] = loaded_data + loaded_uuids_store['loaded'] = len(loaded_data) >= len(uuids_list) # Mark all data as loaded if done + + logging.debug(f"New batch loaded. Total loaded: {len(loaded_data)}") + + # Prepare the data to be displayed + columns = perm_utils.get_uuids_columns() # Get the relevant columns + df = pd.DataFrame(loaded_data) + + if df.empty or not perm_utils.has_permission('data_uuids'): + logging.debug("No data or permission issues.") + return html.Div([html.P("No data available or you don't have permission.")]), loaded_uuids_store, False + + df = df.drop(columns=[col for col in df.columns if col not in columns]) + + # Use the Patch() object to append new data instead of fully replacing the table + logging.debug("Returning patched data to update the UI.") + return html.Div([populate_datatable(df)]), loaded_uuids_store, False if not loaded_uuids_store['loaded'] else True + + + # Handle other tabs normally elif tab == 'tab-trips-datatable': data = store_trips["data"] columns = perm_utils.get_allowed_trip_columns() - columns.update( - col['label'] for col in perm_utils.get_allowed_named_trip_columns() - ) + columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) columns.update(store_trips["userinputcols"]) has_perm = perm_utils.has_permission('data_trips') + df = pd.DataFrame(data) if df.empty or not has_perm: - return None + return None, loaded_uuids_store, True - logging.debug(f"Final list of retained cols {columns=}") - logging.debug(f"Before dropping, {df.columns=}") df = df.drop(columns=[col for col in df.columns if col not in columns]) - logging.debug(f"After dropping, {df.columns=}") df = clean_location_data(df) - trips_table = populate_datatable(df,'trips-table') - #Return an HTML Div containing a button (button-clicked) and the populated datatable + trips_table = populate_datatable(df, 'trips-table') + logging.debug(f"Returning 3 values: {trips_table}, {loaded_uuids_store}, True") return html.Div([ - html.Button( - 'Display columns with raw units', - id='button-clicked', #identifier for the button - n_clicks=0, #initialize number of clicks to 0 - style={'marginLeft':'5px'} - ), - trips_table, #populated trips table component - ]) - + html.Button('Display columns with raw units', id='button-clicked', n_clicks=0, style={'marginLeft': '5px'}), + trips_table + ]), loaded_uuids_store, True + elif tab == 'tab-demographics-datatable': data = store_demographics["data"] has_perm = perm_utils.has_permission('data_demographics') - # if only one survey is available, process it without creating a subtab - if len(data) == 1: - # here data is a dictionary + + if len(data) == 1: data = list(data.values())[0] columns = list(data[0].keys()) - # for multiple survey, create subtabs for unique surveys elif len(data) > 1: - #returns subtab only if has_perm is True if not has_perm: - return None + return None, loaded_uuids_store return html.Div([ dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ - dcc.Tab(label= key, value= key) for key in data - ]), + dcc.Tab(label=key, value=key) for key in data + ]), html.Div(id='subtabs-demographics-content') - ]) + ]), loaded_uuids_store, True + elif tab == 'tab-trajectories-datatable': - # Currently store_trajectories data is loaded only when the respective tab is selected - #Here we query for trajectory data once "Trajectories" tab is selected (start_date, end_date) = iso_to_date_only(start_date, end_date) if store_trajectories == {}: store_trajectories = update_store_trajectories(start_date, end_date, timezone, store_excluded_uuids) @@ -128,14 +179,17 @@ def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_de columns = list(data[0].keys()) columns = perm_utils.get_trajectories_columns(columns) has_perm = perm_utils.has_permission('data_trajectories') - - df = pd.DataFrame(data) - if df.empty or not has_perm: - return None - df = df.drop(columns=[col for col in df.columns if col not in columns]) + df = pd.DataFrame(data) + if df.empty or not has_perm: + return None, loaded_uuids_store + + df = df.drop(columns=[col for col in df.columns if col not in columns]) + return populate_datatable(df), loaded_uuids_store, True + + # Default case: if no data is loaded or the tab is not handled + return None, loaded_uuids_store, True - return populate_datatable(df) # handle subtabs for demographic table when there are multiple surveys @callback( @@ -177,7 +231,6 @@ def update_dropdowns_trips(n_clicks, button_label): #return the list of hidden columns and the updated button label return hidden_col, button_label - def populate_datatable(df, table_id=''): if not isinstance(df, pd.DataFrame): raise PreventUpdate diff --git a/utils/db_utils.py b/utils/db_utils.py index 555ed5d..24c48e9 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -1,10 +1,10 @@ import logging import arrow from uuid import UUID - +from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd import pymongo - +import time import emission.core.get_database as edb import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.timeseries.aggregate_timeseries as estag @@ -16,17 +16,32 @@ from utils import constants from utils import permissions as perm_utils from utils.datetime_utils import iso_range_to_ts_range +from functools import lru_cache -def df_to_filtered_records(df, col_to_filter=None, vals_to_exclude: list[str] = []): - """ - Returns a dictionary of df records, given a dataframe, a column to filter on, - and a list of values that rows in that column will be excluded if they match - """ - if df.empty: return [] - if col_to_filter and vals_to_exclude: # will only filter if both are not None or [] +def df_to_filtered_records(df, col_to_filter=None, vals_to_exclude=None): + start_time = time.time() + # Check if df is a valid DataFrame and if it is empty + if not isinstance(df, pd.DataFrame) or len(df) == 0: + return [] + + # Default to an empty list if vals_to_exclude is None + if vals_to_exclude is None: + vals_to_exclude = [] + + # Perform filtering if col_to_filter and vals_to_exclude are provided + if col_to_filter and vals_to_exclude: + # Ensure vals_to_exclude is a list of strings + if not isinstance(vals_to_exclude, list) or not all(isinstance(val, str) for val in vals_to_exclude): + raise ValueError("vals_to_exclude must be a list of strings.") df = df[~df[col_to_filter].isin(vals_to_exclude)] + + # Return the filtered DataFrame as a list of dictionaries + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken to df_to_filtered: {execution_time:.4f} seconds') return df.to_dict("records") + def query_uuids(start_date: str, end_date: str, tz: str): # As of now, time filtering does not apply to UUIDs; we just query all of them. # Vestigial code commented out and left below for future reference @@ -55,6 +70,7 @@ def query_uuids(start_date: str, end_date: str, tz: str): # I will write a couple of functions to get all the users in a time range # (although we should define what that time range should be) and to merge # that with the profile data + start_time = time.time() entries = edb.get_uuid_db().find() df = pd.json_normalize(list(entries)) if not df.empty: @@ -62,9 +78,13 @@ def query_uuids(start_date: str, end_date: str, tz: str): df['user_id'] = df['uuid'].apply(str) df['user_token'] = df['user_email'] df.drop(columns=["uuid", "_id"], inplace=True) + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken for Query_UUIDs: {execution_time:.4f} seconds') return df def query_confirmed_trips(start_date: str, end_date: str, tz: str): + start_time = time.time() (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) ts = esta.TimeSeries.get_aggregate_time_series() # Note to self, allow end_ts to also be null in the timequery @@ -94,7 +114,26 @@ def query_confirmed_trips(start_date: str, end_date: str, tz: str): # Add primary modes from the sensed, inferred and ble summaries. Note that we do this # **before** filtering the `all_trip_columns` because the # *_section_summary columns are not currently valid - get_max_mode_from_summary = lambda md: max(md["distance"], key=md["distance"].get) if len(md["distance"]) > 0 else "INVALID" + + # Check if 'md' is not a dictionary or does not contain the key 'distance' + # or if 'md["distance"]' is not a dictionary. + # If any of these conditions are true, return "INVALID". + get_max_mode_from_summary = lambda md: ( + "INVALID" + if not isinstance(md, dict) + or "distance" not in md + or not isinstance(md["distance"], dict) + # If 'md' is a dictionary and 'distance' is a valid key pointing to a dictionary: + else ( + # Get the maximum value from 'md["distance"]' using the values of 'md["distance"].get' as the key for 'max'. + # This operation only happens if the length of 'md["distance"]' is greater than 0. + # Otherwise, return "INVALID". + max(md["distance"], key=md["distance"].get) + if len(md["distance"]) > 0 + else "INVALID" + ) + ) + df["data.primary_sensed_mode"] = df.cleaned_section_summary.apply(get_max_mode_from_summary) df["data.primary_predicted_mode"] = df.inferred_section_summary.apply(get_max_mode_from_summary) if 'ble_sensed_summary' in df.columns: @@ -150,9 +189,13 @@ def query_confirmed_trips(start_date: str, end_date: str, tz: str): # logging.debug("After filtering, df columns are %s" % df.columns) # logging.debug("After filtering, the actual data is %s" % df.head()) # logging.debug("After filtering, the actual data is %s" % df.head().trip_start_time_str) + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken for Query_Confirmed_Trips: {execution_time:.4f} seconds') return (df, user_input_cols) def query_demographics(): + start_time = time.time() # Returns dictionary of df where key represent differnt survey id and values are df for each survey logging.debug("Querying the demographics for (no date range)") ts = esta.TimeSeries.get_aggregate_time_series() @@ -185,7 +228,10 @@ def query_demographics(): for col in constants.EXCLUDED_DEMOGRAPHICS_COLS: if col in df.columns: df.drop(columns= [col], inplace=True) - + + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken for Query Demographic: {execution_time:.4f} seconds') return dataframes def query_trajectories(start_date: str, end_date: str, tz: str): @@ -209,61 +255,95 @@ def query_trajectories(start_date: str, end_date: str, tz: str): df['data.mode_str'] = df['data.mode'].apply(lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN') return df +@lru_cache(maxsize=None) +def get_time_series_aggregate(): + return esta.TimeSeries.get_aggregate_time_series() -def add_user_stats(user_data): - for user in user_data: - user_uuid = UUID(user['user_id']) +@lru_cache(maxsize=None) +def get_user_profile(user_uuid): + return edb.get_profile_db().find_one({'user_id': user_uuid}) + +def add_user_stats(user_data, batch_size=5): + start_time = time.time() + time_format = 'YYYY-MM-DD HH:mm:ss' - total_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + def process_user(user): + user_uuid = UUID(user['user_id']) + + # Fetch aggregated data for all users once and cache it + ts_aggregate = get_time_series_aggregate() + + # Fetch data for the user, cached for repeated queries + profile_data = get_user_profile(user_uuid) + + total_trips = ts_aggregate.find_entries_count( key_list=["analysis/confirmed_trip"], extra_query_list=[{'user_id': user_uuid}] ) - user['total_trips'] = total_trips - - labeled_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + labeled_trips = ts_aggregate.find_entries_count( key_list=["analysis/confirmed_trip"], extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] ) + + user['total_trips'] = total_trips user['labeled_trips'] = labeled_trips - profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) - user['platform'] = profile_data.get('curr_platform') - user['manufacturer'] = profile_data.get('manufacturer') - user['app_version'] = profile_data.get('client_app_version') - user['os_version'] = profile_data.get('client_os_version') - user['phone_lang'] = profile_data.get('phone_lang') - - - + if profile_data: + user['platform'] = profile_data.get('curr_platform') + user['manufacturer'] = profile_data.get('manufacturer') + user['app_version'] = profile_data.get('client_app_version') + user['os_version'] = profile_data.get('client_os_version') + user['phone_lang'] = profile_data.get('phone_lang') if total_trips > 0: - time_format = 'YYYY-MM-DD HH:mm:ss' ts = esta.TimeSeries.get_time_series(user_uuid) - start_ts = ts.get_first_value_for_field( + first_trip_ts = ts.get_first_value_for_field( key='analysis/confirmed_trip', field='data.end_ts', sort_order=pymongo.ASCENDING ) - if start_ts != -1: - user['first_trip'] = arrow.get(start_ts).format(time_format) + if first_trip_ts != -1: + user['first_trip'] = arrow.get(first_trip_ts).format(time_format) - end_ts = ts.get_first_value_for_field( + last_trip_ts = ts.get_first_value_for_field( key='analysis/confirmed_trip', field='data.end_ts', sort_order=pymongo.DESCENDING ) - if end_ts != -1: - user['last_trip'] = arrow.get(end_ts).format(time_format) + if last_trip_ts != -1: + user['last_trip'] = arrow.get(last_trip_ts).format(time_format) - last_call = ts.get_first_value_for_field( + last_call_ts = ts.get_first_value_for_field( key='stats/server_api_time', field='data.ts', sort_order=pymongo.DESCENDING ) - if last_call != -1: - user['last_call'] = arrow.get(last_call).format(time_format) + if last_call_ts != -1: + user['last_call'] = arrow.get(last_call_ts).format(time_format) + + return user + + def batch_process(users_batch): + with ThreadPoolExecutor() as executor: # Adjust max_workers based on CPU cores + futures = [executor.submit(process_user, user) for user in users_batch] + processed_batch = [future.result() for future in as_completed(futures)] + return processed_batch + + total_users = len(user_data) + processed_data = [] + + for i in range(0, total_users, batch_size): + batch = user_data[i:i + batch_size] + processed_batch = batch_process(batch) + processed_data.extend(processed_batch) + + logging.debug(f'Processed {len(processed_data)} users out of {total_users}') + + end_time = time.time() # End timing + execution_time = end_time - start_time + logging.debug(f'Time taken to add_user_stats: {execution_time:.4f} seconds') - return user_data + return processed_data def query_segments_crossing_endpoints(poly_region_start, poly_region_end, start_date: str, end_date: str, tz: str, excluded_uuids: list[str]): (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz)