diff --git a/pages/data.py b/pages/data.py index 29a5c86..8d0c81d 100644 --- a/pages/data.py +++ b/pages/data.py @@ -6,15 +6,15 @@ from dash import dcc, html, Input, Output, callback, register_page, dash_table, State, callback_context, Patch # Etc import logging -import time import pandas as pd from dash.exceptions import PreventUpdate -from concurrent.futures import ThreadPoolExecutor, as_completed -from utils import constants from utils import permissions as perm_utils from utils import db_utils from utils.db_utils import df_to_filtered_records, query_trajectories from utils.datetime_utils import iso_to_date_only +import emission.core.timer as ect +import emission.storage.decorations.stats_queries as esdsq + register_page(__name__, path="/data") intro = """## Data""" @@ -54,33 +54,125 @@ ) - def clean_location_data(df): - if 'data.start_loc.coordinates' in df.columns: - df['data.start_loc.coordinates'] = df['data.start_loc.coordinates'].apply(lambda x: f'({x[0]}, {x[1]})') - if 'data.end_loc.coordinates' in df.columns: - df['data.end_loc.coordinates'] = df['data.end_loc.coordinates'].apply(lambda x: f'({x[0]}, {x[1]})') + """ + Cleans the 'data.start_loc.coordinates' and 'data.end_loc.coordinates' columns + by formatting them as strings in the format '(latitude, longitude)'. + + :param df (pd.DataFrame): The DataFrame containing location data. + :return: The cleaned DataFrame with formatted coordinate strings. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Clean 'data.start_loc.coordinates' + with ect.Timer(verbose=False) as stage1_timer: + if 'data.start_loc.coordinates' in df.columns: + df['data.start_loc.coordinates'] = df['data.start_loc.coordinates'].apply( + lambda x: f'({x[0]}, {x[1]})' if isinstance(x, (list, tuple)) and len(x) >= 2 else x + ) + esdsq.store_dashboard_time( + "data/clean_location_data/clean_start_loc_coordinates", + stage1_timer # Pass the Timer object + ) + + # Stage 2: Clean 'data.end_loc.coordinates' + with ect.Timer(verbose=False) as stage2_timer: + if 'data.end_loc.coordinates' in df.columns: + df['data.end_loc.coordinates'] = df['data.end_loc.coordinates'].apply( + lambda x: f'({x[0]}, {x[1]})' if isinstance(x, (list, tuple)) and len(x) >= 2 else x + ) + esdsq.store_dashboard_time( + "data/clean_location_data/clean_end_loc_coordinates", + stage2_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/clean_location_data/total_time", + total_timer # Pass the Timer object + ) + return df -def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_uuids, key_list): - df = query_trajectories(start_date=start_date, end_date=end_date, tz=tz, key_list=key_list) - records = df_to_filtered_records(df, 'user_id', excluded_uuids["data"]) - store = { - "data": records, - "length": len(records), - } + +def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_uuids, key_list): + """ + Queries trajectories within a specified date range and timezone, filters out excluded UUIDs, + and returns a store dictionary containing the filtered records and their count. + + :param start_date (str): Start date in ISO format. + :param end_date (str): End date in ISO format. + :param tz (str): Timezone string. + :param excluded_uuids (dict): Dictionary containing UUIDs to exclude. + :param key_list (list[str]): List of keys to filter trajectories. + :return: Dictionary with 'data' as filtered records and 'length' as the count of records. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Query Trajectories + with ect.Timer(verbose=False) as stage1_timer: + df = query_trajectories(start_date=start_date, end_date=end_date, tz=tz, key_list=key_list) + esdsq.store_dashboard_time( + "data/update_store_trajectories/query_trajectories", + stage1_timer # Pass the Timer object + ) + + # Stage 2: Filter Records + with ect.Timer(verbose=False) as stage2_timer: + records = df_to_filtered_records(df, 'user_id', excluded_uuids["data"]) + esdsq.store_dashboard_time( + "data/update_store_trajectories/filter_records", + stage2_timer # Pass the Timer object + ) + + # Stage 3: Create Store Dictionary + with ect.Timer(verbose=False) as stage3_timer: + store = { + "data": records, + "length": len(records), + } + esdsq.store_dashboard_time( + "data/update_store_trajectories/create_store_dict", + stage3_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/update_store_trajectories/total_time", + total_timer # Pass the Timer object + ) + return store + @callback( Output('keylist-switch-container', 'style'), Input('tabs-datatable', 'value'), ) def show_keylist_switch(tab): - if tab == 'tab-trajectories-datatable': - return {'display': 'block'} - return {'display': 'none'} # Hide the keylist-switch on all other tabs + """ + Toggles the visibility of the keylist switch container based on the selected tab. + + :param tab (str): The currently selected tab. + :return: Dictionary with CSS style to show or hide the container. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Determine Display Style + with ect.Timer(verbose=False) as stage1_timer: + if tab == 'tab-trajectories-datatable': + style = {'display': 'block'} + else: + style = {'display': 'none'} # Hide the keylist-switch on all other tabs + esdsq.store_dashboard_time( + "data/show_keylist_switch/determine_display_style", + stage1_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/show_keylist_switch/total_time", + total_timer # Pass the Timer object + ) + + return style + @callback( @@ -236,67 +328,158 @@ def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_de return None, loaded_uuids_store, True - -# handle subtabs for demographic table when there are multiple surveys @callback( Output('subtabs-demographics-content', 'children'), Input('subtabs-demographics', 'value'), Input('store-demographics', 'data'), ) - def update_sub_tab(tab, store_demographics): - data = store_demographics["data"] - if tab in data: - data = data[tab] - if data: - columns = list(data[0].keys()) - - df = pd.DataFrame(data) - if df.empty: - return None - - df = df.drop(columns=[col for col in df.columns if col not in columns]) + """ + Updates the content of the demographics subtabs based on the selected tab and stored demographics data. + + :param tab (str): The currently selected subtab. + :param store_demographics (dict): Dictionary containing demographics data. + :return: Dash components to populate the subtabs content. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Extract Data for Selected Tab + with ect.Timer(verbose=False) as stage1_timer: + data = store_demographics.get("data", {}) + if tab in data: + data = data[tab] + if data: + columns = list(data[0].keys()) + else: + data = {} + columns = [] + esdsq.store_dashboard_time( + "data/update_sub_tab/extract_data_for_selected_tab", + stage1_timer # Pass the Timer object + ) + + # Stage 2: Create DataFrame + with ect.Timer(verbose=False) as stage2_timer: + df = pd.DataFrame(data) + esdsq.store_dashboard_time( + "data/update_sub_tab/create_dataframe", + stage2_timer # Pass the Timer object + ) + + # Stage 3: Check if DataFrame is Empty + with ect.Timer(verbose=False) as stage3_timer: + if df.empty: + return None + esdsq.store_dashboard_time( + "data/update_sub_tab/check_if_dataframe_empty", + stage3_timer # Pass the Timer object + ) + + # Stage 4: Drop Unnecessary Columns + with ect.Timer(verbose=False) as stage4_timer: + df = df.drop(columns=[col for col in df.columns if col not in columns]) + esdsq.store_dashboard_time( + "data/update_sub_tab/drop_unnecessary_columns", + stage4_timer # Pass the Timer object + ) + + # Stage 5: Populate DataTable + with ect.Timer(verbose=False) as stage5_timer: + table = populate_datatable(df) + esdsq.store_dashboard_time( + "data/update_sub_tab/populate_datatable", + stage5_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/update_sub_tab/total_time", + total_timer # Pass the Timer object + ) - return populate_datatable(df) + return table @callback( - Output('trips-table', 'hidden_columns'), # Output hidden columns in the trips-table - Output('button-clicked', 'children'), #updates button label - Input('button-clicked', 'n_clicks'), #number of clicks on the button - State('button-clicked', 'children') #State representing the current label of button + Output('trips-table', 'hidden_columns'), # Output hidden columns in the trips-table + Output('button-clicked', 'children'), # Updates button label + Input('button-clicked', 'n_clicks'), # Number of clicks on the button + State('button-clicked', 'children') # Current label of button ) -#Controls visibility of columns in trips table and updates the label of button based on the number of clicks. def update_dropdowns_trips(n_clicks, button_label): - if n_clicks % 2 == 0: - hidden_col = ["data.duration_seconds", "data.distance_meters","data.distance"] - button_label = 'Display columns with raw units' - else: - hidden_col = ["data.duration", "data.distance_miles", "data.distance_km", "data.distance"] - button_label = 'Display columns with humanzied units' - #return the list of hidden columns and the updated button label + """ + Controls the visibility of columns in the trips table and updates the button label based on the number of clicks. + + :param n_clicks (int): Number of times the button has been clicked. + :param button_label (str): Current label of the button. + :return: Tuple containing the list of hidden columns and the updated button label. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Determine Hidden Columns and Button Label + with ect.Timer(verbose=False) as stage1_timer: + if n_clicks is None: + n_clicks = 0 # Handle initial state when button hasn't been clicked + if n_clicks % 2 == 0: + hidden_col = ["data.duration_seconds", "data.distance_meters", "data.distance"] + button_label = 'Display columns with raw units' + else: + hidden_col = ["data.duration", "data.distance_miles", "data.distance_km", "data.distance"] + button_label = 'Display columns with humanized units' + esdsq.store_dashboard_time( + "data/update_dropdowns_trips/determine_hidden_columns_and_button_label", + stage1_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/update_dropdowns_trips/total_time", + total_timer # Pass the Timer object + ) + return hidden_col, button_label + def populate_datatable(df, table_id=''): - if not isinstance(df, pd.DataFrame): - raise PreventUpdate - return dash_table.DataTable( - id= table_id, - # columns=[{"name": i, "id": i} for i in df.columns], - data=df.to_dict('records'), - export_format="csv", - filter_options={"case": "sensitive"}, - # filter_action="native", - sort_action="native", # give user capability to sort columns - sort_mode="single", # sort across 'multi' or 'single' columns - page_current=0, # page number that user is on - page_size=50, # number of rows visible per page - style_cell={ - 'textAlign': 'left', - # 'minWidth': '100px', - # 'width': '100px', - # 'maxWidth': '100px', - }, - style_table={'overflowX': 'auto'}, - css=[{"selector":".show-hide", "rule":"display:none"}] + """ + Populates a Dash DataTable with the provided DataFrame. + + :param df (pd.DataFrame): The DataFrame to display in the table. + :param table_id (str, optional): The ID to assign to the DataTable. + :return: Dash DataTable component. + """ + with ect.Timer(verbose=False) as total_timer: + # Stage 1: Validate DataFrame + with ect.Timer(verbose=False) as stage1_timer: + if not isinstance(df, pd.DataFrame): + raise PreventUpdate + esdsq.store_dashboard_time( + "data/populate_datatable/validate_dataframe", + stage1_timer # Pass the Timer object + ) + + # Stage 2: Create DataTable + with ect.Timer(verbose=False) as stage2_timer: + table = dash_table.DataTable( + id=table_id, + data=df.to_dict('records'), + columns=[{"name": i, "id": i} for i in df.columns], + export_format="csv", + filter_options={"case": "sensitive"}, + sort_action="native", # Give user capability to sort columns + sort_mode="single", # Sort across 'multi' or 'single' columns + page_current=0, # Page number that user is on + page_size=50, # Number of rows visible per page + style_cell={ + 'textAlign': 'left', + }, + style_table={'overflowX': 'auto'}, + css=[{"selector": ".show-hide", "rule": "display:none"}] + ) + esdsq.store_dashboard_time( + "data/populate_datatable/create_datatable", + stage2_timer # Pass the Timer object + ) + + esdsq.store_dashboard_time( + "data/populate_datatable/total_time", + total_timer # Pass the Timer object ) + + return table \ No newline at end of file diff --git a/pages/home.py b/pages/home.py index c0b75f9..bbd5133 100644 --- a/pages/home.py +++ b/pages/home.py @@ -16,6 +16,8 @@ # e-mission modules import emission.core.get_database as edb +import emission.core.timer as ect +import emission.storage.decorations.stats_queries as esdsq from utils.permissions import has_permission from utils.datetime_utils import iso_to_date_only @@ -52,69 +54,190 @@ def compute_sign_up_trend(uuid_df): - uuid_df['update_ts'] = pd.to_datetime(uuid_df['update_ts'], utc=True) - res_df = ( - uuid_df - .groupby(uuid_df['update_ts'].dt.date) - .size() - .reset_index(name='count') - .rename(columns={'update_ts': 'date'}) + """ + Computes the sign-up trend by counting the number of unique user sign-ups per day. + + :param uuid_df (pd.DataFrame): DataFrame containing user UUIDs with 'update_ts' timestamps. + :return: pandas DataFrame with columns ['date', 'count'] representing sign-ups per day. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert 'update_ts' to datetime + with ect.Timer() as stage1_timer: + uuid_df['update_ts'] = pd.to_datetime(uuid_df['update_ts'], utc=True) + esdsq.store_dashboard_time( + "home/compute_sign_up_trend/convert_update_ts_to_datetime", + stage1_timer + ) + + # Stage 2: Group by date and count sign-ups + with ect.Timer() as stage2_timer: + res_df = ( + uuid_df + .groupby(uuid_df['update_ts'].dt.date) + .size() + .reset_index(name='count') + .rename(columns={'update_ts': 'date'}) + ) + esdsq.store_dashboard_time( + "home/compute_sign_up_trend/group_by_date_and_count", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/compute_sign_up_trend/total_time", + total_timer ) + return res_df def compute_trips_trend(trips_df, date_col): - trips_df[date_col] = pd.to_datetime(trips_df[date_col], utc=True) - trips_df[date_col] = pd.DatetimeIndex(trips_df[date_col]).date - res_df = ( - trips_df - .groupby(date_col) - .size() - .reset_index(name='count') - .rename(columns={date_col: 'date'}) + """ + Computes the trips trend by counting the number of trips per specified date column. + + :param trips_df (pd.DataFrame): DataFrame containing trip data with a date column. + :param date_col (str): The column name representing the date to group by. + :return: pandas DataFrame with columns ['date', 'count'] representing trips per day. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert date_col to datetime and extract date + with ect.Timer() as stage1_timer: + trips_df[date_col] = pd.to_datetime(trips_df[date_col], utc=True) + trips_df[date_col] = pd.DatetimeIndex(trips_df[date_col]).date + esdsq.store_dashboard_time( + "home/compute_trips_trend/convert_date_col_to_datetime_and_extract_date", + stage1_timer + ) + + # Stage 2: Group by date and count trips + with ect.Timer() as stage2_timer: + res_df = ( + trips_df + .groupby(date_col) + .size() + .reset_index(name='count') + .rename(columns={date_col: 'date'}) + ) + esdsq.store_dashboard_time( + "home/compute_trips_trend/group_by_date_and_count_trips", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/compute_trips_trend/total_time", + total_timer ) + return res_df def find_last_get(uuid_list): - uuid_list = [UUID(npu) for npu in uuid_list] - last_item = list(edb.get_timeseries_db().aggregate([ - {'$match': {'user_id': {'$in': uuid_list}}}, - {'$match': {'metadata.key': 'stats/server_api_time'}}, - {'$match': {'data.name': 'POST_/usercache/get'}}, - {'$group': {'_id': '$user_id', 'write_ts': {'$max': '$metadata.write_ts'}}}, - ])) + """ + Finds the last 'POST_/usercache/get' API call timestamp for each user in the provided UUID list. + + :param uuid_list (list[str]): List of user UUIDs as strings. + :return: List of dictionaries with '_id' as user_id and 'write_ts' as the latest timestamp. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert UUID strings to UUID objects + with ect.Timer() as stage1_timer: + uuid_objects = [UUID(npu) for npu in uuid_list] + esdsq.store_dashboard_time( + "home/find_last_get/convert_uuid_strings_to_objects", + stage1_timer + ) + + # Stage 2: Perform aggregate query to find the latest 'write_ts' per user + with ect.Timer() as stage2_timer: + pipeline = [ + {'$match': {'user_id': {'$in': uuid_objects}}}, + {'$match': {'metadata.key': 'stats/server_api_time'}}, + {'$match': {'data.name': 'POST_/usercache/get'}}, + {'$group': {'_id': '$user_id', 'write_ts': {'$max': '$metadata.write_ts'}}}, + ] + last_item = list(edb.get_timeseries_db().aggregate(pipeline)) + esdsq.store_dashboard_time( + "home/find_last_get/perform_aggregate_query", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/find_last_get/total_time", + total_timer + ) + return last_item def get_number_of_active_users(uuid_list, threshold): - last_get_entries = find_last_get(uuid_list) - number_of_active_users = 0 - for item in last_get_entries: - last_get = item['write_ts'] - if last_get is not None: - last_call_diff = arrow.get().timestamp() - last_get - if last_call_diff <= threshold: - number_of_active_users += 1 + """ + Determines the number of active users based on the time threshold since their last 'get' API call. + + :param uuid_list (list[str]): List of user UUIDs as strings. + :param threshold (int): Time threshold in seconds to consider a user as active. + :return: Integer representing the number of active users. + """ + with ect.Timer() as total_timer: + # Stage 1: Find last 'get' API call entries for users + with ect.Timer() as stage1_timer: + last_get_entries = find_last_get(uuid_list) + esdsq.store_dashboard_time( + "home/get_number_of_active_users/find_last_get_entries", + stage1_timer + ) + + # Stage 2: Calculate number of active users based on threshold + with ect.Timer() as stage2_timer: + number_of_active_users = 0 + current_timestamp = arrow.utcnow().timestamp() + for item in last_get_entries: + last_get = item.get('write_ts') + if last_get is not None: + last_call_diff = current_timestamp - last_get + if last_call_diff <= threshold: + number_of_active_users += 1 + esdsq.store_dashboard_time( + "home/get_number_of_active_users/calculate_active_users", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/get_number_of_active_users/total_time", + total_timer + ) + return number_of_active_users def generate_card(title_text, body_text, icon): - card = dbc.CardGroup([ + """ + Generates a Bootstrap CardGroup with a title, body text, and an icon. + + :param title_text (str): The title text for the card. + :param body_text (str): The body text for the card. + :param icon (str): The CSS class for the icon to display. + :return: A Dash Bootstrap CardGroup component. + """ + with ect.Timer() as total_timer: + card = dbc.CardGroup([ dbc.Card( dbc.CardBody( [ - html.H5(title_text, className="card-title"), - html.P(body_text, className="card-text",), - ] - ) - ), - dbc.Card( - html.Div(className=icon, style=card_icon), - className="bg-primary", - style={"maxWidth": 75}, - ), - ]) + html.H5(title_text, className="card-title"), + html.P(body_text, className="card-text"), + ] + ) + ), + dbc.Card( + html.Div(className=icon, style=card_icon), + className="bg-primary", + style={"maxWidth": 75}, + ), + ]) + esdsq.store_dashboard_time( + "home/generate_card/total_time", + total_timer + ) return card @@ -123,8 +246,33 @@ def generate_card(title_text, body_text, icon): Input('store-uuids', 'data'), ) def update_card_users(store_uuids): - number_of_users = store_uuids.get('length') if has_permission('overview_users') else 0 - card = generate_card("# Users", f"{number_of_users} users", "fa fa-users") + """ + Updates the '# Users' card with the number of users. + + :param store_uuids (dict): Dictionary containing user UUID data. + :return: Dash Bootstrap CardGroup component displaying the number of users. + """ + with ect.Timer() as total_timer: + # Stage 1: Retrieve Number of Users + with ect.Timer() as stage1_timer: + number_of_users = store_uuids.get('length') if has_permission('overview_users') else 0 + esdsq.store_dashboard_time( + "home/update_card_users/retrieve_number_of_users", + stage1_timer + ) + + # Stage 2: Generate User Card + with ect.Timer() as stage2_timer: + card = generate_card("# Users", f"{number_of_users} users", "fa fa-users") + esdsq.store_dashboard_time( + "home/update_card_users/generate_user_card", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/update_card_users/total_time", + total_timer + ) return card @@ -133,12 +281,44 @@ def update_card_users(store_uuids): Input('store-uuids', 'data'), ) def update_card_active_users(store_uuids): - uuid_df = pd.DataFrame(store_uuids.get('data')) - number_of_active_users = 0 - if not uuid_df.empty and has_permission('overview_active_users'): - one_day = 24 * 60 * 60 - number_of_active_users = get_number_of_active_users(uuid_df['user_id'], one_day) - card = generate_card("# Active users", f"{number_of_active_users} users", "fa fa-person-walking") + """ + Updates the '# Active users' card with the number of active users. + + :param store_uuids (dict): Dictionary containing user UUID data. + :return: Dash Bootstrap CardGroup component displaying the number of active users. + """ + with ect.Timer() as total_timer: + # Stage 1: Create DataFrame from UUID data + with ect.Timer() as stage1_timer: + uuid_df = pd.DataFrame(store_uuids.get('data')) + esdsq.store_dashboard_time( + "home/update_card_active_users/create_dataframe", + stage1_timer + ) + + # Stage 2: Calculate Number of Active Users + with ect.Timer() as stage2_timer: + number_of_active_users = 0 + if not uuid_df.empty and has_permission('overview_active_users'): + one_day = 24 * 60 * 60 + number_of_active_users = get_number_of_active_users(uuid_df['user_id'], one_day) + esdsq.store_dashboard_time( + "home/update_card_active_users/calculate_number_of_active_users", + stage2_timer + ) + + # Stage 3: Generate Active Users Card + with ect.Timer() as stage3_timer: + card = generate_card("# Active users", f"{number_of_active_users} users", "fa fa-person-walking") + esdsq.store_dashboard_time( + "home/update_card_active_users/generate_active_users_card", + stage3_timer + ) + + esdsq.store_dashboard_time( + "home/update_card_active_users/total_time", + total_timer + ) return card @@ -147,16 +327,55 @@ def update_card_active_users(store_uuids): Input('store-trips', 'data'), ) def update_card_trips(store_trips): - number_of_trips = store_trips.get('length') if has_permission('overview_trips') else 0 - card = generate_card("# Confirmed trips", f"{number_of_trips} trips", "fa fa-angles-right") + """ + Updates the '# Confirmed trips' card with the number of trips. + + :param store_trips (dict): Dictionary containing trip data. + :return: Dash Bootstrap CardGroup component displaying the number of confirmed trips. + """ + with ect.Timer() as total_timer: + # Stage 1: Retrieve Number of Trips + with ect.Timer() as stage1_timer: + number_of_trips = store_trips.get('length') if has_permission('overview_trips') else 0 + esdsq.store_dashboard_time( + "home/update_card_trips/retrieve_number_of_trips", + stage1_timer + ) + + # Stage 2: Generate Trips Card + with ect.Timer() as stage2_timer: + card = generate_card("# Confirmed trips", f"{number_of_trips} trips", "fa fa-angles-right") + esdsq.store_dashboard_time( + "home/update_card_trips/generate_trips_card", + stage2_timer + ) + + esdsq.store_dashboard_time( + "home/update_card_trips/total_time", + total_timer + ) return card def generate_barplot(data, x, y, title): - fig = px.bar() - if data is not None: - fig = px.bar(data, x=x, y=y) - fig.update_layout(title=title) + """ + Generates a Plotly bar plot based on the provided data. + + :param data (pd.DataFrame): The data to plot. + :param x (str): The column name for the x-axis. + :param y (str): The column name for the y-axis. + :param title (str): The title of the plot. + :return: A Plotly Figure object representing the bar plot. + """ + with ect.Timer() as total_timer: + fig = px.bar() + if data is not None: + fig = px.bar(data, x=x, y=y) + fig.update_layout(title=title) + esdsq.store_dashboard_time( + "home/generate_barplot/total_time", + total_timer + ) return fig @@ -165,25 +384,98 @@ def generate_barplot(data, x, y, title): Input('store-uuids', 'data'), ) def generate_plot_sign_up_trend(store_uuids): - df = pd.DataFrame(store_uuids.get("data")) - trend_df = None - if not df.empty and has_permission('overview_signup_trends'): - trend_df = compute_sign_up_trend(df) - fig = generate_barplot(trend_df, x = 'date', y = 'count', title = "Sign-ups trend") + """ + Generates a bar plot showing the sign-up trend over time. + + :param store_uuids (dict): Dictionary containing user UUID data. + :return: Plotly Figure object representing the sign-up trend bar plot. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert UUID Data to DataFrame + with ect.Timer() as stage1_timer: + df = pd.DataFrame(store_uuids.get("data")) + esdsq.store_dashboard_time( + "home/generate_plot_sign_up_trend/convert_uuid_data_to_dataframe", + stage1_timer + ) + + # Stage 2: Compute Sign-Up Trend + with ect.Timer() as stage2_timer: + trend_df = None + if not df.empty and has_permission('overview_signup_trends'): + trend_df = compute_sign_up_trend(df) + esdsq.store_dashboard_time( + "home/generate_plot_sign_up_trend/compute_sign_up_trend", + stage2_timer + ) + + # Stage 3: Generate Bar Plot + with ect.Timer() as stage3_timer: + fig = generate_barplot(trend_df, x='date', y='count', title="Sign-ups trend") + esdsq.store_dashboard_time( + "home/generate_plot_sign_up_trend/generate_bar_plot", + stage3_timer + ) + + esdsq.store_dashboard_time( + "home/generate_plot_sign_up_trend/total_time", + total_timer + ) return fig @callback( Output('fig-trips-trend', 'figure'), Input('store-trips', 'data'), - Input('date-picker', 'start_date'), # these are ISO strings - Input('date-picker', 'end_date'), # these are ISO strings + Input('date-picker', 'start_date'), # these are ISO strings + Input('date-picker', 'end_date'), # these are ISO strings ) def generate_plot_trips_trend(store_trips, start_date, end_date): - df = pd.DataFrame(store_trips.get("data")) - trend_df = None - (start_date, end_date) = iso_to_date_only(start_date, end_date) - if not df.empty and has_permission('overview_trips_trend'): - trend_df = compute_trips_trend(df, date_col = "trip_start_time_str") - fig = generate_barplot(trend_df, x = 'date', y = 'count', title = f"Trips trend({start_date} to {end_date})") + """ + Generates a bar plot showing the trips trend over a specified date range. + + :param store_trips (dict): Dictionary containing trip data. + :param start_date (str): Start date in ISO format. + :param end_date (str): End date in ISO format. + :return: Plotly Figure object representing the trips trend bar plot. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert Trip Data to DataFrame + with ect.Timer() as stage1_timer: + df = pd.DataFrame(store_trips.get("data")) + esdsq.store_dashboard_time( + "home/generate_plot_trips_trend/convert_trip_data_to_dataframe", + stage1_timer + ) + + # Stage 2: Convert and Extract Date Range + with ect.Timer() as stage2_timer: + (start_date, end_date) = iso_to_date_only(start_date, end_date) + esdsq.store_dashboard_time( + "home/generate_plot_trips_trend/convert_and_extract_date_range", + stage2_timer + ) + + # Stage 3: Compute Trips Trend + with ect.Timer() as stage3_timer: + trend_df = None + if not df.empty and has_permission('overview_trips_trend'): + trend_df = compute_trips_trend(df, date_col="trip_start_time_str") + esdsq.store_dashboard_time( + "home/generate_plot_trips_trend/compute_trips_trend", + stage3_timer + ) + + # Stage 4: Generate Bar Plot + with ect.Timer() as stage4_timer: + fig = generate_barplot(trend_df, x='date', y='count', title=f"Trips trend({start_date} to {end_date})") + esdsq.store_dashboard_time( + "home/generate_plot_trips_trend/generate_bar_plot", + stage4_timer + ) + + esdsq.store_dashboard_time( + "home/generate_plot_trips_trend/total_time", + total_timer + ) return fig diff --git a/utils/db_utils.py b/utils/db_utils.py index b76aa32..ef4e1ea 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -12,268 +12,465 @@ import emission.core.wrapper.motionactivity as ecwm import emission.storage.timeseries.geoquery as estg import emission.storage.decorations.section_queries as esds +import emission.core.timer as ect +import emission.storage.decorations.stats_queries as esdsq from utils import constants from utils import permissions as perm_utils from utils.datetime_utils import iso_range_to_ts_range -from functools import lru_cache def df_to_filtered_records(df, col_to_filter=None, vals_to_exclude=None): - start_time = time.time() - # Check if df is a valid DataFrame and if it is empty + """ + Filters a DataFrame based on specified column and exclusion values, then converts it to a list of records. + + :param df (pd.DataFrame): The DataFrame to filter. + :param col_to_filter (str, optional): The column name to apply the filter on. + :param vals_to_exclude (list[str], optional): List of values to exclude from the filter. + :return: List of dictionaries representing the filtered records. + """ + # Stage 1: Validate DataFrame and Set Defaults + with ect.Timer() as stage1_timer: + # Check if df is a valid DataFrame and if it is empty + if not isinstance(df, pd.DataFrame) or len(df) == 0: + # Exiting the context to set 'elapsed_ms' + pass # Do nothing here; handle after the 'with' block + else: + # Default to an empty list if vals_to_exclude is None + if vals_to_exclude is None: + vals_to_exclude = [] + # Store stage1 timing after exiting the 'with' block if not isinstance(df, pd.DataFrame) or len(df) == 0: + esdsq.store_dashboard_time( + "db_utils/df_to_filtered_records/validate_dataframe_and_set_defaults", + stage1_timer + ) return [] + else: + esdsq.store_dashboard_time( + "db_utils/df_to_filtered_records/validate_dataframe_and_set_defaults", + stage1_timer + ) - # Default to an empty list if vals_to_exclude is None - if vals_to_exclude is None: - vals_to_exclude = [] + # Stage 2: Perform Filtering + with ect.Timer() as stage2_timer: + # Perform filtering if col_to_filter and vals_to_exclude are provided + if col_to_filter and vals_to_exclude: + # Ensure vals_to_exclude is a list of strings + if not isinstance(vals_to_exclude, list) or not all(isinstance(val, str) for val in vals_to_exclude): + raise ValueError("vals_to_exclude must be a list of strings.") + df = df[~df[col_to_filter].isin(vals_to_exclude)] + # Store stage2 timing after exiting the 'with' block + esdsq.store_dashboard_time( + "db_utils/df_to_filtered_records/perform_filtering", + stage2_timer + ) - # Perform filtering if col_to_filter and vals_to_exclude are provided - if col_to_filter and vals_to_exclude: - # Ensure vals_to_exclude is a list of strings - if not isinstance(vals_to_exclude, list) or not all(isinstance(val, str) for val in vals_to_exclude): - raise ValueError("vals_to_exclude must be a list of strings.") - df = df[~df[col_to_filter].isin(vals_to_exclude)] + # Store total timing + with ect.Timer() as total_timer: + pass # No operations here; 'elapsed_ms' will capture the time from start to now + esdsq.store_dashboard_time( + "db_utils/df_to_filtered_records/total_time", + total_timer + ) - # Return the filtered DataFrame as a list of dictionaries - end_time = time.time() # End timing - execution_time = end_time - start_time - logging.debug(f'Time taken to df_to_filtered: {execution_time:.4f} seconds') return df.to_dict("records") + def query_uuids(start_date: str, end_date: str, tz: str): - # As of now, time filtering does not apply to UUIDs; we just query all of them. - # Vestigial code commented out and left below for future reference - - # logging.debug("Querying the UUID DB for %s -> %s" % (start_date,end_date)) - # query = {'update_ts': {'$exists': True}} - # if start_date is not None: - # # have arrow create a datetime using start_date and time 00:00:00 in UTC - # start_time = arrow.get(start_date).datetime - # query['update_ts']['$gte'] = start_time - # if end_date is not None: - # # have arrow create a datetime using end_date and time 23:59:59 in UTC - # end_time = arrow.get(end_date).replace(hour=23, minute=59, second=59).datetime - # query['update_ts']['$lt'] = end_time - # projection = { - # '_id': 0, - # 'user_id': '$uuid', - # 'user_token': '$user_email', - # 'update_ts': 1 - # } - - logging.debug("Querying the UUID DB for (no date range)") - - # This should actually use the profile DB instead of (or in addition to) - # the UUID DB so that we can see the app version, os, manufacturer... - # I will write a couple of functions to get all the users in a time range - # (although we should define what that time range should be) and to merge - # that with the profile data - start_time = time.time() - entries = edb.get_uuid_db().find() - df = pd.json_normalize(list(entries)) - if not df.empty: - df['update_ts'] = pd.to_datetime(df['update_ts']) - df['user_id'] = df['uuid'].apply(str) - df['user_token'] = df['user_email'] - df.drop(columns=["uuid", "_id"], inplace=True) - end_time = time.time() # End timing - execution_time = end_time - start_time - logging.debug(f'Time taken for Query_UUIDs: {execution_time:.4f} seconds') + """ + Queries UUIDs from the database within a specified date range and timezone. + + :param start_date (str): Start date in ISO format. + :param end_date (str): End date in ISO format. + :param tz (str): Timezone string. + :return: Processed pandas DataFrame of UUIDs. + """ + with ect.Timer() as total_timer: + # Stage 1: Log Debug Message + with ect.Timer() as stage1_timer: + logging.debug("Querying the UUID DB for (no date range)") + esdsq.store_dashboard_time( + "db_utils/query_uuids/log_debug_message", + stage1_timer + ) + + # Stage 2: Fetch Aggregate Time Series + with ect.Timer() as stage2_timer: + # This should actually use the profile DB instead of (or in addition to) + # the UUID DB so that we can see the app version, os, manufacturer... + # I will write a couple of functions to get all the users in a time range + # (although we should define what that time range should be) and to merge + # that with the profile data + entries = edb.get_uuid_db().find() + df = pd.json_normalize(list(entries)) + esdsq.store_dashboard_time( + "db_utils/query_uuids/fetch_aggregate_time_series", + stage2_timer + ) + + # Stage 3: Process DataFrame + with ect.Timer() as stage3_timer: + if not df.empty: + df['update_ts'] = pd.to_datetime(df['update_ts']) + df['user_id'] = df['uuid'].apply(str) + df['user_token'] = df['user_email'] + df.drop(columns=["uuid", "_id"], inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_uuids/process_dataframe", + stage3_timer + ) + + esdsq.store_dashboard_time( + "db_utils/query_uuids/total_time", + total_timer + ) + return df def query_confirmed_trips(start_date: str, end_date: str, tz: str): - start_time = time.time() - (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) - ts = esta.TimeSeries.get_aggregate_time_series() - # Note to self, allow end_ts to also be null in the timequery - # we can then remove the start_time, end_time logic - df = ts.get_data_df("analysis/confirmed_trip", - time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts), - ) - user_input_cols = [] - - logging.debug("Before filtering, df columns are %s" % df.columns) - if not df.empty: - # Since we use `get_data_df` instead of `pd.json_normalize`, - # we lose the "data" prefix on the fields and they are only flattened one level - # Here, we restore the prefix for the VALID_TRIP_COLS from constants.py - # for backwards compatibility. We do this for all columns since columns which don't exist are ignored by the rename command. - rename_cols = constants.VALID_TRIP_COLS - # the mapping is `{distance: data.distance, duration: data.duration} etc - rename_mapping = dict(zip([c.replace("data.", "") for c in rename_cols], rename_cols)) - logging.debug("Rename mapping is %s" % rename_mapping) - df.rename(columns=rename_mapping, inplace=True) - logging.debug("After renaming columns, they are %s" % df.columns) - - # Now copy over the coordinates - df['data.start_loc.coordinates'] = df['start_loc'].apply(lambda g: g["coordinates"]) - df['data.end_loc.coordinates'] = df['end_loc'].apply(lambda g: g["coordinates"]) - - # Add primary modes from the sensed, inferred and ble summaries. Note that we do this - # **before** filtering the `all_trip_columns` because the - # *_section_summary columns are not currently valid + """ + Queries confirmed trips within a specified date range and timezone. + + :param start_date (str): Start date in ISO format. + :param end_date (str): End date in ISO format. + :param tz (str): Timezone string. + :return: Tuple containing the processed DataFrame and list of user input columns. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert Date Range to Timestamps + with ect.Timer() as stage1_timer: + (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/convert_date_range_to_timestamps", + stage1_timer + ) + + # Stage 2: Fetch Aggregate Time Series + with ect.Timer() as stage2_timer: + ts = esta.TimeSeries.get_aggregate_time_series() + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/fetch_aggregate_time_series", + stage2_timer + ) - # Check if 'md' is not a dictionary or does not contain the key 'distance' - # or if 'md["distance"]' is not a dictionary. - # If any of these conditions are true, return "INVALID". - get_max_mode_from_summary = lambda md: ( - "INVALID" - if not isinstance(md, dict) - or "distance" not in md - or not isinstance(md["distance"], dict) - # If 'md' is a dictionary and 'distance' is a valid key pointing to a dictionary: - else ( - # Get the maximum value from 'md["distance"]' using the values of 'md["distance"].get' as the key for 'max'. - # This operation only happens if the length of 'md["distance"]' is greater than 0. - # Otherwise, return "INVALID". - max(md["distance"], key=md["distance"].get) - if len(md["distance"]) > 0 - else "INVALID" + # Stage 3: Fetch Confirmed Trip Entries + with ect.Timer() as stage3_timer: + # Note to self, allow end_ts to also be null in the timequery + # we can then remove the start_time, end_time logic + df = ts.get_data_df("analysis/confirmed_trip", + time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts), ) + user_input_cols = [] + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/fetch_confirmed_trip_entries", + stage3_timer ) - - df["data.primary_sensed_mode"] = df.cleaned_section_summary.apply(get_max_mode_from_summary) - df["data.primary_predicted_mode"] = df.inferred_section_summary.apply(get_max_mode_from_summary) - if 'ble_sensed_summary' in df.columns: - df["data.primary_ble_sensed_mode"] = df.ble_sensed_summary.apply(get_max_mode_from_summary) - else: - logging.debug("No BLE support found, not fleet version, ignoring...") - - # Expand the user inputs - user_input_df = pd.json_normalize(df.user_input) - df = pd.concat([df, user_input_df], axis='columns') - logging.debug(f"Before filtering {user_input_df.columns=}") - user_input_cols = [c for c in user_input_df.columns - if "metadata" not in c and - "xmlns" not in c and - "local_dt" not in c and - 'xmlResponse' not in c and - "_id" not in c] - logging.debug(f"After filtering {user_input_cols=}") - - combined_col_list = list(perm_utils.get_all_trip_columns()) + user_input_cols - logging.debug(f"Combined list {combined_col_list=}") - columns = [col for col in combined_col_list if col in df.columns] - df = df[columns] - logging.debug(f"After filtering against the combined list {df.columns=}") - # logging.debug("After getting all columns, they are %s" % df.columns) - for col in constants.BINARY_TRIP_COLS: - if col in df.columns: - df[col] = df[col].apply(str) - for named_col in perm_utils.get_all_named_trip_columns(): - if named_col['path'] in df.columns: - df[named_col['label']] = df[named_col['path']] - # df = df.drop(columns=[named_col['path']]) - # TODO: We should really display both the humanized value and the raw value - # humanized value for people to see the entries in real time - # raw value to support analyses on the downloaded data - # I still don't fully grok which columns are displayed - # https://github.com/e-mission/op-admin-dashboard/issues/29#issuecomment-1530105040 - # https://github.com/e-mission/op-admin-dashboard/issues/29#issuecomment-1530439811 - # so just replacing the distance and duration with the humanized values for now - df['data.distance_meters'] = df['data.distance'] - use_imperial = perm_utils.config.get("display_config", - {"use_imperial": False}).get("use_imperial", False) - # convert to km to humanize - df['data.distance_km'] = df['data.distance'] / 1000 - # convert km further to miles because this is the US, Liberia or Myanmar - # https://en.wikipedia.org/wiki/Mile - df['data.duration_seconds'] = df['data.duration'] - if use_imperial: - df['data.distance_miles'] = df['data.distance_km'] * 0.6213712 - - df['data.duration'] = df['data.duration'].apply(lambda d: arrow.utcnow().shift(seconds=d).humanize(only_distance=True)) - - # logging.debug("After filtering, df columns are %s" % df.columns) - # logging.debug("After filtering, the actual data is %s" % df.head()) - # logging.debug("After filtering, the actual data is %s" % df.head().trip_start_time_str) - end_time = time.time() # End timing - execution_time = end_time - start_time - logging.debug(f'Time taken for Query_Confirmed_Trips: {execution_time:.4f} seconds') + + if not df.empty: + # Stage 4: Convert Object Columns to Strings + with ect.Timer() as stage4_timer: + for col in df.columns: + if df[col].dtype == 'object': + df[col] = df[col].apply(str) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/convert_object_columns_to_strings", + stage4_timer + ) + + # Stage 5: Drop Metadata Columns + with ect.Timer() as stage5_timer: + # Drop metadata columns + columns_to_drop = [col for col in df.columns if col.startswith("metadata")] + df.drop(columns=columns_to_drop, inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/drop_metadata_columns", + stage5_timer + ) + + # Stage 6: Drop or Modify Excluded Columns + with ect.Timer() as stage6_timer: + # Drop or modify excluded columns + for col in constants.EXCLUDED_TRAJECTORIES_COLS: + if col in df.columns: + df.drop(columns=[col], inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/drop_or_modify_excluded_columns", + stage6_timer + ) + + # Stage 7: Handle 'background/location' Key + with ect.Timer() as stage7_timer: + # Check if 'background/location' is in the key_list + if 'background/location' in key_list: + if 'data.mode' in df.columns: + # Set the values in data.mode to blank ('') + df['data.mode'] = '' + else: + # Map mode to its corresponding string value + df['data.mode_str'] = df['data.mode'].apply( + lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN' + ) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/handle_background_location_key", + stage7_timer + ) + + # Stage 8: Clean and Modify DataFrames + with ect.Timer() as stage8_timer: + # Expand the user inputs + user_input_df = pd.json_normalize(df.user_input) + df = pd.concat([df, user_input_df], axis='columns') + user_input_cols = [ + c for c in user_input_df.columns + if "metadata" not in c and + "xmlns" not in c and + "local_dt" not in c and + 'xmlResponse' not in c and + "_id" not in c + ] + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/clean_and_modify_dataframes", + stage8_timer + ) + + # Stage 9: Filter and Combine Columns + with ect.Timer() as stage9_timer: + combined_col_list = list(perm_utils.get_all_trip_columns()) + user_input_cols + columns = [col for col in combined_col_list if col in df.columns] + df = df[columns] + for col in constants.BINARY_TRIP_COLS: + if col in df.columns: + df[col] = df[col].apply(str) + for named_col in perm_utils.get_all_named_trip_columns(): + if named_col['path'] in df.columns: + df[named_col['label']] = df[named_col['path']] + # df = df.drop(columns=[named_col['path']]) + # TODO: We should really display both the humanized value and the raw value + # humanized value for people to see the entries in real time + # raw value to support analyses on the downloaded data + # I still don't fully grok which columns are displayed + # https://github.com/e-mission/op-admin-dashboard/issues/29#issuecomment-1530105040 + # https://github.com/e-mission/op-admin-dashboard/issues/29#issuecomment-1530439811 + # so just replacing the distance and duration with the humanized values for now + df['data.distance_meters'] = df['data.distance'] + use_imperial = perm_utils.config.get("display_config", + {"use_imperial": False}).get("use_imperial", False) + # convert to km to humanize + df['data.distance_km'] = df['data.distance'] / 1000 + # convert km further to miles because this is the US, Liberia or Myanmar + # https://en.wikipedia.org/wiki/Mile + df['data.duration_seconds'] = df['data.duration'] + if use_imperial: + df['data.distance_miles'] = df['data.distance_km'] * 0.6213712 + + df['data.duration'] = df['data.duration'].apply(lambda d: arrow.utcnow().shift(seconds=d).humanize(only_distance=True)) + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/filter_and_combine_columns", + stage9_timer + ) + + esdsq.store_dashboard_time( + "db_utils/query_confirmed_trips/total_time", + total_timer + ) return (df, user_input_cols) def query_demographics(): - start_time = time.time() - # Returns dictionary of df where key represent differnt survey id and values are df for each survey - logging.debug("Querying the demographics for (no date range)") - ts = esta.TimeSeries.get_aggregate_time_series() - - entries = ts.find_entries(["manual/demographic_survey"]) - data = list(entries) - - available_key = {} - for entry in data: - survey_key = list(entry['data']['jsonDocResponse'].keys())[0] - if survey_key not in available_key: - available_key[survey_key] = [] - available_key[survey_key].append(entry) - - dataframes = {} - for key, json_object in available_key.items(): - df = pd.json_normalize(json_object) - dataframes[key] = df - - for key, df in dataframes.items(): - if not df.empty: - for col in constants.BINARY_DEMOGRAPHICS_COLS: - if col in df.columns: - df[col] = df[col].apply(str) - columns_to_drop = [col for col in df.columns if col.startswith("metadata")] - df.drop(columns= columns_to_drop, inplace=True) - modified_columns = perm_utils.get_demographic_columns(df.columns) - df.columns = modified_columns - df.columns=[col.rsplit('.',1)[-1] if col.startswith('data.jsonDocResponse.') else col for col in df.columns] - for col in constants.EXCLUDED_DEMOGRAPHICS_COLS: - if col in df.columns: - df.drop(columns= [col], inplace=True) - - end_time = time.time() # End timing - execution_time = end_time - start_time - logging.debug(f'Time taken for Query Demographic: {execution_time:.4f} seconds') + """ + Queries demographic survey data and organizes it into a dictionary of DataFrames. + Each key in the dictionary represents a different survey ID, and the corresponding + value is a DataFrame containing the survey responses. + + :return: Dictionary where keys are survey IDs and values are corresponding DataFrames. + """ + with ect.Timer() as total_timer: + # Stage 1: Log Debug Message + with ect.Timer() as stage1_timer: + # Returns dictionary of df where key represent different survey id and values are df for each survey + logging.debug("Querying the demographics for (no date range)") + esdsq.store_dashboard_time( + "db_utils/query_demographics/log_debug_message", + stage1_timer + ) + + # Stage 2: Fetch Aggregate Time Series + with ect.Timer() as stage2_timer: + ts = esta.TimeSeries.get_aggregate_time_series() + esdsq.store_dashboard_time( + "db_utils/query_demographics/fetch_aggregate_time_series", + stage2_timer + ) + + # Stage 3: Find Demographic Survey Entries + with ect.Timer() as stage3_timer: + entries = ts.find_entries(["manual/demographic_survey"]) + data = list(entries) + esdsq.store_dashboard_time( + "db_utils/query_demographics/find_demographic_survey_entries", + stage3_timer + ) + + # Stage 4: Organize Entries by Survey Key + with ect.Timer() as stage4_timer: + available_key = {} + for entry in data: + survey_key = list(entry['data']['jsonDocResponse'].keys())[0] + if survey_key not in available_key: + available_key[survey_key] = [] + available_key[survey_key].append(entry) + esdsq.store_dashboard_time( + "db_utils/query_demographics/organize_entries_by_survey_key", + stage4_timer + ) + + # Stage 5: Create DataFrames from Organized Entries + with ect.Timer() as stage5_timer: + dataframes = {} + for key, json_object in available_key.items(): + df = pd.json_normalize(json_object) + dataframes[key] = df + esdsq.store_dashboard_time( + "db_utils/query_demographics/create_dataframes_from_organized_entries", + stage5_timer + ) + + # Stage 6: Clean and Modify DataFrames + with ect.Timer() as stage6_timer: + for key, df in dataframes.items(): + if not df.empty: + # Convert binary demographic columns to strings + for col in constants.BINARY_DEMOGRAPHICS_COLS: + if col in df.columns: + df[col] = df[col].apply(str) + + # Drop metadata columns + columns_to_drop = [col for col in df.columns if col.startswith("metadata")] + df.drop(columns=columns_to_drop, inplace=True) + + # Modify column names + modified_columns = perm_utils.get_demographic_columns(df.columns) + df.columns = modified_columns + + # Simplify column names by removing prefixes + df.columns = [ + col.rsplit('.', 1)[-1] if col.startswith('data.jsonDocResponse.') else col + for col in df.columns + ] + + # Drop excluded demographic columns + for col in constants.EXCLUDED_DEMOGRAPHICS_COLS: + if col in df.columns: + df.drop(columns=[col], inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_demographics/clean_and_modify_dataframes", + stage6_timer + ) + + esdsq.store_dashboard_time( + "db_utils/query_demographics/total_time", + total_timer + ) + return dataframes def query_trajectories(start_date: str, end_date: str, tz: str, key_list): - (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) - ts = esta.TimeSeries.get_aggregate_time_series() - - # Check if key_list contains 'background/location' - key_list = [key_list] - entries = ts.find_entries( - key_list=key_list, - time_query=estt.TimeQuery("data.ts", start_ts, end_ts), - ) - df = pd.json_normalize(list(entries)) - - if not df.empty: - for col in df.columns: - if df[col].dtype == 'object': - df[col] = df[col].apply(str) - - # Drop metadata columns - columns_to_drop = [col for col in df.columns if col.startswith("metadata")] - df.drop(columns=columns_to_drop, inplace=True) - - # Drop or modify excluded columns - for col in constants.EXCLUDED_TRAJECTORIES_COLS: - if col in df.columns: - df.drop(columns=[col], inplace=True) - - # Check if 'background/location' is in the key_list - if 'background/location' in key_list: - if 'data.mode' in df.columns: - # Set the values in data.mode to blank ('') - df['data.mode'] = '' - else: - # Map mode to its corresponding string value - df['data.mode_str'] = df['data.mode'].apply( - lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN' + """ + Queries trajectories within a specified date range and timezone based on provided key list. + + :param start_date (str): Start date in ISO format. + :param end_date (str): End date in ISO format. + :param tz (str): Timezone string. + :param key_list (list[str]): List of keys to query. + :return: Processed pandas DataFrame of trajectories. + """ + with ect.Timer() as total_timer: + # Stage 1: Convert Date Range to Timestamps + with ect.Timer() as stage1_timer: + (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/convert_date_range_to_timestamps", + stage1_timer + ) + + # Stage 2: Fetch Aggregate Time Series + with ect.Timer() as stage2_timer: + ts = esta.TimeSeries.get_aggregate_time_series() + esdsq.store_dashboard_time( + "db_utils/query_trajectories/fetch_aggregate_time_series", + stage2_timer + ) + + # Stage 3: Fetch Trajectory Entries + with ect.Timer() as stage3_timer: + # Check if key_list contains 'background/location' + key_list = [key_list] + entries = ts.find_entries( + key_list=key_list, + time_query=estt.TimeQuery("data.ts", start_ts, end_ts), ) - + df = pd.json_normalize(list(entries)) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/fetch_trajectory_entries", + stage3_timer + ) + + if not df.empty: + # Stage 4: Convert Object Columns to Strings + with ect.Timer() as stage4_timer: + for col in df.columns: + if df[col].dtype == 'object': + df[col] = df[col].apply(str) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/convert_object_columns_to_strings", + stage4_timer + ) + + # Stage 5: Drop Metadata Columns + with ect.Timer() as stage5_timer: + # Drop metadata columns + columns_to_drop = [col for col in df.columns if col.startswith("metadata")] + df.drop(columns=columns_to_drop, inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/drop_metadata_columns", + stage5_timer + ) + + # Stage 6: Drop or Modify Excluded Columns + with ect.Timer() as stage6_timer: + # Drop or modify excluded columns + for col in constants.EXCLUDED_TRAJECTORIES_COLS: + if col in df.columns: + df.drop(columns=[col], inplace=True) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/drop_or_modify_excluded_columns", + stage6_timer + ) + + # Stage 7: Handle 'background/location' Key + with ect.Timer() as stage7_timer: + # Check if 'background/location' is in the key_list + if 'background/location' in key_list: + if 'data.mode' in df.columns: + # Set the values in data.mode to blank ('') + df['data.mode'] = '' + else: + # Map mode to its corresponding string value + df['data.mode_str'] = df['data.mode'].apply( + lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN' + ) + esdsq.store_dashboard_time( + "db_utils/query_trajectories/handle_background_location_key", + stage7_timer + ) + + esdsq.store_dashboard_time( + "db_utils/query_trajectories/total_time", + total_timer + ) return df - +# unchanged for now -- since reverting def add_user_stats(user_data, batch_size=5): start_time = time.time() time_format = 'YYYY-MM-DD HH:mm:ss' @@ -356,45 +553,142 @@ def batch_process(users_batch): return processed_data -def query_segments_crossing_endpoints(poly_region_start, poly_region_end, start_date: str, end_date: str, tz: str, excluded_uuids: list[str]): - (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) - tq = estt.TimeQuery("data.ts", start_ts, end_ts) - not_excluded_uuid_query = {'user_id': {'$nin': [UUID(uuid) for uuid in excluded_uuids]}} - agg_ts = estag.AggregateTimeSeries().get_aggregate_time_series() - - locs_matching_start = agg_ts.get_data_df( - "analysis/recreated_location", - geo_query = estg.GeoQuery(['data.loc'], poly_region_start), - time_query = tq, - extra_query_list=[not_excluded_uuid_query] - ) - locs_matching_start = locs_matching_start.drop_duplicates(subset=['section']) - if locs_matching_start.empty: - return locs_matching_start - - locs_matching_end = agg_ts.get_data_df( - "analysis/recreated_location", - geo_query = estg.GeoQuery(['data.loc'], poly_region_end), - time_query = tq, - extra_query_list=[not_excluded_uuid_query] - ) - locs_matching_end = locs_matching_end.drop_duplicates(subset=['section']) - if locs_matching_end.empty: - return locs_matching_end - - merged = locs_matching_start.merge(locs_matching_end, how='outer', on=['section']) - filtered = merged.loc[merged['idx_x']= min_users_required: + logging.info( + f"Returning filtered segments with {number_user_seen} unique users." + ) + result = filtered + else: + logging.info( + f"Insufficient unique users ({number_user_seen}) to meet the " + f"minimum requirement ({min_users_required}). Returning empty DataFrame." + ) + result = pd.DataFrame.from_dict([]) + esdsq.store_dashboard_time( + "db_utils/query_segments_crossing_endpoints/evaluate_user_count_and_final_filtering", + stage6_timer + ) - if perm_utils.permissions.get("segment_trip_time_min_users", 0) <= number_user_seen: - return filtered - return pd.DataFrame.from_dict([]) + esdsq.store_dashboard_time( + "db_utils/query_segments_crossing_endpoints/total_time", + total_timer + ) + return result # The following query can be called multiple times, let's open db only once analysis_timeseries_db = edb.get_analysis_timeseries_db()