Skip to content

Commit

Permalink
Added pipeline stats
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Dec 7, 2024
1 parent 3ff35d0 commit ff10cf1
Show file tree
Hide file tree
Showing 5 changed files with 1,771 additions and 0 deletions.
Binary file added .DS_Store
Binary file not shown.
285 changes: 285 additions & 0 deletions emission/pipeline_stats/[broken] time_series_db_method_clean.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set up the environment\n",
"%env DB_HOST=mongodb://localhost/openpath_stage\n",
"\n",
"# Import necessary libraries and modules\n",
"import emission.core.get_database as edb\n",
"import emission.storage.timeseries.abstract_timeseries as esta\n",
"import emission.storage.timeseries.builtin_timeseries as estb\n",
"import emission.storage.decorations.analysis_timeseries_queries as esda\n",
"import emission.core.wrapper.entry as ecwe\n",
"import emission.storage.decorations.trip_queries as esdt\n",
"import emission.storage.timeseries.timequery as estt\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"import pytz\n",
"import pprint\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def get_all_user_uuids():\n",
" try:\n",
" uuid_cursor = edb.get_uuid_db().find({}, {\"uuid\": 1, \"_id\": 0})\n",
" uuid_list = [doc['uuid'] for doc in uuid_cursor]\n",
" print(f\"Retrieved {len(uuid_list)} user UUIDs.\")\n",
" return uuid_list\n",
" except Exception as e:\n",
" print(f\"Error retrieving user UUIDs: {e}\")\n",
" return []\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# THE MAIN ISSUE IS THAT `GET_DATA_DF` DOES NOT HAVE ANY OF THE NEW PIPELINE STATS?"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"def fetch_pipeline_time_data(uuid_list):\n",
"\n",
" all_users_pipeline_dfs = []\n",
" total_users = len(uuid_list)\n",
" for idx, user_uuid in enumerate(uuid_list, start=1):\n",
" try:\n",
" ts = esta.TimeSeries.get_time_series(user_uuid)\n",
" pipeline_df = ts.get_data_df(\"stats/pipeline_time\", time_query=None)\n",
" if not pipeline_df.empty:\n",
" all_users_pipeline_dfs.append(pipeline_df)\n",
" print(f\"[{idx}/{total_users}] Fetched data for user {user_uuid}.\")\n",
" print(pipeline_df['name'].unique())\n",
" else:\n",
" print(f\"[{idx}/{total_users}] No 'stats/pipeline_time' data for user {user_uuid}.\")\n",
" except Exception as e:\n",
" print(f\"[{idx}/{total_users}] Error fetching data for user {user_uuid}: {e}\")\n",
" return all_users_pipeline_dfs\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"def analyze_usercache(combined_df, start_date_str='2024-11-08'):\n",
" print(\"\\nAnalyzing 'USERCACHE' executions...\")\n",
" # Step 1: Filter for rows where name is \"USERCACHE\"\n",
" usercache_df = combined_df[combined_df['name'] == \"USERCACHE\"].copy()\n",
"\n",
" if usercache_df.empty:\n",
" print(\"No 'USERCACHE' entries found.\")\n",
" return\n",
"\n",
" # Step 2: Convert metawrite_ts to datetime\n",
" usercache_df['datetime'] = pd.to_datetime(usercache_df['metawrite_ts'], unit='s')\n",
"\n",
" # Step 3: Define the start date for filtering\n",
" start_date = pd.Timestamp(start_date_str)\n",
"\n",
" # Step 4: Filter for rows since the start date\n",
" usercache_df = usercache_df[usercache_df['datetime'] >= start_date]\n",
"\n",
" # Step 5: Group by hour and count executions\n",
" hourly_execution_counts = usercache_df.groupby(usercache_df['datetime'].dt.floor('H')).size()\n",
"\n",
" # Step 6: Output the results\n",
" if hourly_execution_counts.empty:\n",
" print(f\"No executions of 'USERCACHE' since {start_date_str}.\")\n",
" else:\n",
" print(f\"Hourly execution counts since {start_date_str}:\")\n",
" print(hourly_execution_counts)\n"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"def process_function_level_data(combined_df, exclude_names, base_dir=os.getcwd()):\n",
" print(\"\\nProcessing function-level ..\")\n",
" # Step 1: Filter for function-level data only (entries with slashes in 'name') and exclude specified names\n",
" function_level_df = combined_df[\n",
" combined_df['name'].str.contains('/') &\n",
" ~combined_df['name'].isin(exclude_names)\n",
" ].copy()\n",
"\n",
" if function_level_df.empty:\n",
" print(\"No function-level data after filtering.\")\n",
" return\n",
"\n",
" # Step 2: Select the relevant columns\n",
" selected_columns = function_level_df[['reading', 'name']].copy()\n",
"\n",
" # Step 3: Data Cleaning\n",
" selected_columns.dropna(subset=['reading', 'name'], inplace=True)\n",
" selected_columns = selected_columns[pd.to_numeric(selected_columns['reading'], errors='coerce').notnull()]\n",
"\n",
" if selected_columns.empty:\n",
" print(\"No valid 'reading' after cleaning.\")\n",
" return\n",
"\n",
" # Step 4: Aggregate 'reading' by 'name'\n",
" aggregated_sum = selected_columns.groupby('name', as_index=False)['reading'].sum()\n",
" aggregated_sum.rename(columns={'reading': 'total_reading'}, inplace=True)\n",
"\n",
" aggregated_mean = selected_columns.groupby('name', as_index=False)['reading'].mean()\n",
" aggregated_mean.rename(columns={'reading': 'average_reading'}, inplace=True)\n",
"\n",
" # Step 5: Determine the 80th percentile threshold\n",
" threshold_sum = aggregated_sum['total_reading'].quantile(0.80)\n",
" threshold_mean = aggregated_mean['average_reading'].quantile(0.80)\n",
"\n",
" # Step 6: Split the DataFrame into top 20% and bottom 80%\n",
" top20_sum = aggregated_sum[aggregated_sum['total_reading'] >= threshold_sum].sort_values(by='total_reading', ascending=False)\n",
" bottom80_sum = aggregated_sum[aggregated_sum['total_reading'] < threshold_sum].sort_values(by='total_reading', ascending=False)\n",
"\n",
" top20_mean = aggregated_mean[aggregated_mean['average_reading'] >= threshold_mean].sort_values(by='average_reading', ascending=False)\n",
" bottom80_mean = aggregated_mean[aggregated_mean['average_reading'] < threshold_mean].sort_values(by='average_reading', ascending=False)\n",
"\n",
" # Step 7: Define file paths\n",
" aggregated_sum_path = os.path.join(base_dir, 'aggregated_sum_function_level.csv')\n",
" top20_sum_path = os.path.join(base_dir, 'top20_function_level_sum_sorted.csv')\n",
" bottom80_sum_path = os.path.join(base_dir, 'bottom80_function_level_sum_sorted.csv')\n",
"\n",
" aggregated_mean_path = os.path.join(base_dir, 'aggregated_mean_function_level.csv')\n",
" top20_mean_path = os.path.join(base_dir, 'top20_function_level_mean_sorted.csv')\n",
" bottom80_mean_path = os.path.join(base_dir, 'bottom80_function_level_mean_sorted.csv')\n",
"\n",
" # Step 8: Save to CSV\n",
" try:\n",
" aggregated_sum.to_csv(aggregated_sum_path, index=False)\n",
" top20_sum.to_csv(top20_sum_path, index=False)\n",
" bottom80_sum.to_csv(bottom80_sum_path, index=False)\n",
"\n",
" aggregated_mean.to_csv(aggregated_mean_path, index=False)\n",
" top20_mean.to_csv(top20_mean_path, index=False)\n",
" bottom80_mean.to_csv(bottom80_mean_path, index=False)\n",
"\n",
" print(f\"Aggregated Sum Function-Level Data saved to {aggregated_sum_path}\")\n",
" print(f\"Top 20% (Sum) function-level data saved to {top20_sum_path}\")\n",
" print(f\"Bottom 80% (Sum) function-level data saved to {bottom80_sum_path}\")\n",
"\n",
" print(f\"\\nAggregated Mean Function-Level Data saved to {aggregated_mean_path}\")\n",
" print(f\"Top 20% (Mean) function-level data saved to {top20_mean_path}\")\n",
" print(f\"Bottom 80% (Mean) function-level data saved to {bottom80_mean_path}\")\n",
" except Exception as e:\n",
" print(f\"Error saving aggregated data to CSV: {e}\")\n",
" return\n",
"\n",
" # Step 9: Verify the splits\n",
" print(f\"\\nSum Aggregation - Top 20% row count: {len(top20_sum)}\")\n",
" print(f\"Sum Aggregation - Bottom 80% row count: {len(bottom80_sum)}\")\n",
"\n",
" print(f\"\\nMean Aggregation - Top 20% row count: {len(top20_mean)}\")\n",
" print(f\"Mean Aggregation - Bottom 80% row count: {len(bottom80_mean)}\")\n",
"\n",
" # Step 10: Inspect some entries\n",
" print(\"\\nSample Top 20% Sum Aggregation Entries:\")\n",
" print(top20_sum.head())\n",
"\n",
" print(\"\\nSample Bottom 80% Sum Aggregation Entries:\")\n",
" print(bottom80_sum.head())\n",
"\n",
" print(\"\\nSample Top 20% Mean Aggregation Entries:\")\n",
" print(top20_mean.head())\n",
"\n",
" print(\"\\nSample Bottom 80% Mean Aggregation Entries:\")\n",
" print(bottom80_mean.head())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def main():\n",
" # Step 1: Retrieve all user UUIDs\n",
" user_uuid_list = get_all_user_uuids()\n",
" #user_uuid_list = user_uuid_list[1:3]\n",
" if not user_uuid_list:\n",
" print(\"No user UUIDs retrieved. Exiting script.\")\n",
" return\n",
"\n",
" # Step 2: Fetch 'stats/pipeline_time' data for all users\n",
" all_users_pipeline_dfs = fetch_pipeline_time_data(user_uuid_list)\n",
"\n",
" if not all_users_pipeline_dfs:\n",
" print(\"No pipeline data fetched for any user.\")\n",
" return\n",
"\n",
" # Step 3: Combine all users' DataFrames\n",
" combined_pipeline_df = pd.concat(all_users_pipeline_dfs, ignore_index=True)\n",
" print(f\"\\nCombined Pipeline Data Shape: {combined_pipeline_df.shape}\")\n",
"\n",
" # Step 4: Describe and get info about the combined DataFrame\n",
" print(\"\\nCombined Pipeline Data Description:\")\n",
" print(combined_pipeline_df.describe())\n",
"\n",
" print(combined_pipeline_df.info())\n",
"\n",
" # Step 5: Get unique 'name' entries\n",
" unique_names = combined_pipeline_df['name'].unique()\n",
" print(f\"\\nUnique 'name' entries:\")\n",
" print(unique_names)\n",
"\n",
" # Step 6: Analyze 'USERCACHE' executions\n",
" analyze_usercache(combined_pipeline_df)\n",
"\n",
" # Step 7: Define the list of 'name' entries to exclude\n",
" exclude_data_names = [\n",
" 'TRIP_SEGMENTATION/segment_into_trips',\n",
" 'TRIP_SEGMENTATION/segment_into_trips_dist/loop'\n",
" ]\n",
"\n",
" # Step 8: Process function-level data\n",
" process_function_level_data(combined_pipeline_df, exclude_data_names)\n",
"\n",
"main()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.20"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit ff10cf1

Please sign in to comment.