Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline stats for Emission Server #43

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
285 changes: 285 additions & 0 deletions emission/pipeline_stats/[broken] time_series_db_method_clean.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set up the environment\n",
"%env DB_HOST=mongodb://localhost/openpath_stage\n",
"\n",
"# Import necessary libraries and modules\n",
"import emission.core.get_database as edb\n",
"import emission.storage.timeseries.abstract_timeseries as esta\n",
"import emission.storage.timeseries.builtin_timeseries as estb\n",
"import emission.storage.decorations.analysis_timeseries_queries as esda\n",
"import emission.core.wrapper.entry as ecwe\n",
"import emission.storage.decorations.trip_queries as esdt\n",
"import emission.storage.timeseries.timequery as estt\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"import pytz\n",
"import pprint\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def get_all_user_uuids():\n",
" try:\n",
" uuid_cursor = edb.get_uuid_db().find({}, {\"uuid\": 1, \"_id\": 0})\n",
" uuid_list = [doc['uuid'] for doc in uuid_cursor]\n",
" print(f\"Retrieved {len(uuid_list)} user UUIDs.\")\n",
" return uuid_list\n",
" except Exception as e:\n",
" print(f\"Error retrieving user UUIDs: {e}\")\n",
" return []\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# THE MAIN ISSUE IS THAT `GET_DATA_DF` DOES NOT HAVE ANY OF THE NEW PIPELINE STATS?"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"def fetch_pipeline_time_data(uuid_list):\n",
"\n",
" all_users_pipeline_dfs = []\n",
" total_users = len(uuid_list)\n",
" for idx, user_uuid in enumerate(uuid_list, start=1):\n",
" try:\n",
" ts = esta.TimeSeries.get_time_series(user_uuid)\n",
" pipeline_df = ts.get_data_df(\"stats/pipeline_time\", time_query=None)\n",
" if not pipeline_df.empty:\n",
" all_users_pipeline_dfs.append(pipeline_df)\n",
" print(f\"[{idx}/{total_users}] Fetched data for user {user_uuid}.\")\n",
" print(pipeline_df['name'].unique())\n",
" else:\n",
" print(f\"[{idx}/{total_users}] No 'stats/pipeline_time' data for user {user_uuid}.\")\n",
" except Exception as e:\n",
" print(f\"[{idx}/{total_users}] Error fetching data for user {user_uuid}: {e}\")\n",
" return all_users_pipeline_dfs\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"def analyze_usercache(combined_df, start_date_str='2024-11-08'):\n",
" print(\"\\nAnalyzing 'USERCACHE' executions...\")\n",
" # Step 1: Filter for rows where name is \"USERCACHE\"\n",
" usercache_df = combined_df[combined_df['name'] == \"USERCACHE\"].copy()\n",
"\n",
" if usercache_df.empty:\n",
" print(\"No 'USERCACHE' entries found.\")\n",
" return\n",
"\n",
" # Step 2: Convert metawrite_ts to datetime\n",
" usercache_df['datetime'] = pd.to_datetime(usercache_df['metawrite_ts'], unit='s')\n",
"\n",
" # Step 3: Define the start date for filtering\n",
" start_date = pd.Timestamp(start_date_str)\n",
"\n",
" # Step 4: Filter for rows since the start date\n",
" usercache_df = usercache_df[usercache_df['datetime'] >= start_date]\n",
"\n",
" # Step 5: Group by hour and count executions\n",
" hourly_execution_counts = usercache_df.groupby(usercache_df['datetime'].dt.floor('H')).size()\n",
"\n",
" # Step 6: Output the results\n",
" if hourly_execution_counts.empty:\n",
" print(f\"No executions of 'USERCACHE' since {start_date_str}.\")\n",
" else:\n",
" print(f\"Hourly execution counts since {start_date_str}:\")\n",
" print(hourly_execution_counts)\n"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"def process_function_level_data(combined_df, exclude_names, base_dir=os.getcwd()):\n",
" print(\"\\nProcessing function-level ..\")\n",
" # Step 1: Filter for function-level data only (entries with slashes in 'name') and exclude specified names\n",
" function_level_df = combined_df[\n",
" combined_df['name'].str.contains('/') &\n",
" ~combined_df['name'].isin(exclude_names)\n",
" ].copy()\n",
"\n",
" if function_level_df.empty:\n",
" print(\"No function-level data after filtering.\")\n",
" return\n",
"\n",
" # Step 2: Select the relevant columns\n",
" selected_columns = function_level_df[['reading', 'name']].copy()\n",
"\n",
" # Step 3: Data Cleaning\n",
" selected_columns.dropna(subset=['reading', 'name'], inplace=True)\n",
" selected_columns = selected_columns[pd.to_numeric(selected_columns['reading'], errors='coerce').notnull()]\n",
"\n",
" if selected_columns.empty:\n",
" print(\"No valid 'reading' after cleaning.\")\n",
" return\n",
"\n",
" # Step 4: Aggregate 'reading' by 'name'\n",
" aggregated_sum = selected_columns.groupby('name', as_index=False)['reading'].sum()\n",
" aggregated_sum.rename(columns={'reading': 'total_reading'}, inplace=True)\n",
"\n",
" aggregated_mean = selected_columns.groupby('name', as_index=False)['reading'].mean()\n",
" aggregated_mean.rename(columns={'reading': 'average_reading'}, inplace=True)\n",
"\n",
" # Step 5: Determine the 80th percentile threshold\n",
" threshold_sum = aggregated_sum['total_reading'].quantile(0.80)\n",
" threshold_mean = aggregated_mean['average_reading'].quantile(0.80)\n",
"\n",
" # Step 6: Split the DataFrame into top 20% and bottom 80%\n",
" top20_sum = aggregated_sum[aggregated_sum['total_reading'] >= threshold_sum].sort_values(by='total_reading', ascending=False)\n",
" bottom80_sum = aggregated_sum[aggregated_sum['total_reading'] < threshold_sum].sort_values(by='total_reading', ascending=False)\n",
"\n",
" top20_mean = aggregated_mean[aggregated_mean['average_reading'] >= threshold_mean].sort_values(by='average_reading', ascending=False)\n",
" bottom80_mean = aggregated_mean[aggregated_mean['average_reading'] < threshold_mean].sort_values(by='average_reading', ascending=False)\n",
"\n",
" # Step 7: Define file paths\n",
" aggregated_sum_path = os.path.join(base_dir, 'aggregated_sum_function_level.csv')\n",
" top20_sum_path = os.path.join(base_dir, 'top20_function_level_sum_sorted.csv')\n",
" bottom80_sum_path = os.path.join(base_dir, 'bottom80_function_level_sum_sorted.csv')\n",
"\n",
" aggregated_mean_path = os.path.join(base_dir, 'aggregated_mean_function_level.csv')\n",
" top20_mean_path = os.path.join(base_dir, 'top20_function_level_mean_sorted.csv')\n",
" bottom80_mean_path = os.path.join(base_dir, 'bottom80_function_level_mean_sorted.csv')\n",
"\n",
" # Step 8: Save to CSV\n",
" try:\n",
" aggregated_sum.to_csv(aggregated_sum_path, index=False)\n",
" top20_sum.to_csv(top20_sum_path, index=False)\n",
" bottom80_sum.to_csv(bottom80_sum_path, index=False)\n",
"\n",
" aggregated_mean.to_csv(aggregated_mean_path, index=False)\n",
" top20_mean.to_csv(top20_mean_path, index=False)\n",
" bottom80_mean.to_csv(bottom80_mean_path, index=False)\n",
"\n",
" print(f\"Aggregated Sum Function-Level Data saved to {aggregated_sum_path}\")\n",
" print(f\"Top 20% (Sum) function-level data saved to {top20_sum_path}\")\n",
" print(f\"Bottom 80% (Sum) function-level data saved to {bottom80_sum_path}\")\n",
"\n",
" print(f\"\\nAggregated Mean Function-Level Data saved to {aggregated_mean_path}\")\n",
" print(f\"Top 20% (Mean) function-level data saved to {top20_mean_path}\")\n",
" print(f\"Bottom 80% (Mean) function-level data saved to {bottom80_mean_path}\")\n",
" except Exception as e:\n",
" print(f\"Error saving aggregated data to CSV: {e}\")\n",
" return\n",
"\n",
" # Step 9: Verify the splits\n",
" print(f\"\\nSum Aggregation - Top 20% row count: {len(top20_sum)}\")\n",
" print(f\"Sum Aggregation - Bottom 80% row count: {len(bottom80_sum)}\")\n",
"\n",
" print(f\"\\nMean Aggregation - Top 20% row count: {len(top20_mean)}\")\n",
" print(f\"Mean Aggregation - Bottom 80% row count: {len(bottom80_mean)}\")\n",
"\n",
" # Step 10: Inspect some entries\n",
" print(\"\\nSample Top 20% Sum Aggregation Entries:\")\n",
" print(top20_sum.head())\n",
"\n",
" print(\"\\nSample Bottom 80% Sum Aggregation Entries:\")\n",
" print(bottom80_sum.head())\n",
"\n",
" print(\"\\nSample Top 20% Mean Aggregation Entries:\")\n",
" print(top20_mean.head())\n",
"\n",
" print(\"\\nSample Bottom 80% Mean Aggregation Entries:\")\n",
" print(bottom80_mean.head())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def main():\n",
" # Step 1: Retrieve all user UUIDs\n",
" user_uuid_list = get_all_user_uuids()\n",
" #user_uuid_list = user_uuid_list[1:3]\n",
" if not user_uuid_list:\n",
" print(\"No user UUIDs retrieved. Exiting script.\")\n",
" return\n",
"\n",
" # Step 2: Fetch 'stats/pipeline_time' data for all users\n",
" all_users_pipeline_dfs = fetch_pipeline_time_data(user_uuid_list)\n",
"\n",
" if not all_users_pipeline_dfs:\n",
" print(\"No pipeline data fetched for any user.\")\n",
" return\n",
"\n",
" # Step 3: Combine all users' DataFrames\n",
" combined_pipeline_df = pd.concat(all_users_pipeline_dfs, ignore_index=True)\n",
" print(f\"\\nCombined Pipeline Data Shape: {combined_pipeline_df.shape}\")\n",
"\n",
" # Step 4: Describe and get info about the combined DataFrame\n",
" print(\"\\nCombined Pipeline Data Description:\")\n",
" print(combined_pipeline_df.describe())\n",
"\n",
" print(combined_pipeline_df.info())\n",
"\n",
" # Step 5: Get unique 'name' entries\n",
" unique_names = combined_pipeline_df['name'].unique()\n",
" print(f\"\\nUnique 'name' entries:\")\n",
" print(unique_names)\n",
"\n",
" # Step 6: Analyze 'USERCACHE' executions\n",
" analyze_usercache(combined_pipeline_df)\n",
"\n",
" # Step 7: Define the list of 'name' entries to exclude\n",
" exclude_data_names = [\n",
" 'TRIP_SEGMENTATION/segment_into_trips',\n",
" 'TRIP_SEGMENTATION/segment_into_trips_dist/loop'\n",
" ]\n",
"\n",
" # Step 8: Process function-level data\n",
" process_function_level_data(combined_pipeline_df, exclude_data_names)\n",
"\n",
"main()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.20"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading