Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Oct 13, 2024
1 parent 8957a06 commit f428069
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 41 deletions.
2 changes: 2 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def _getData2Wrapper():
"stats/dashboard_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# intended to log the occurrence of errors in the dashboard
"stats/dashboard_error": "statsevent",
# time for various client operations, measured on the client
# comparison with the server_api_time can help debug networking issues
"stats/client_time": "statsevent",
Expand Down
7 changes: 4 additions & 3 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ def store_dashboard_time(code_fragment_name: str, ts: float, reading: float):
- code_fragment_name (str): The name of the function being timed.
- ts (float): The timestamp when the function execution started.
- reading (float): The duration of the function execution in milliseconds.
Returns:
- InsertResult: The result of the insert operation.
"""
store_stats_entry(None, "stats/function_time", stage_string, ts, reading)
return store_stats_entry(None, "stats/dashboard_time", code_fragment_name, ts, reading)


def store_function_error(user_id, stage_string, ts, reading):
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)
def store_dashboard_error(code_fragment_name: str, ts: float, reading: float):
store_stats_entry(None, "stats/dashboard_error", code_fragment_name, ts, reading)

51 changes: 32 additions & 19 deletions emission/storage/timeseries/non_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

import emission.core.get_database as edb
import emission.storage.timeseries.builtin_timeseries as bits
import emission.core.wrapper.entry as ecwe # Added missing import

class NonUserTimeSeries(bits.BuiltinTimeSeries):
def __init__(self):
super(AggregateTimeSeries, self).__init__(None)
super(NonUserTimeSeries, self).__init__(None) # Corrected superclass name
self.user_query = {}
self.timeseries_db = edb.get_non_user_timeseries_db()

Expand All @@ -17,26 +18,27 @@ def get_uuid_list():
return []

def get_timeseries_db(self, key):
return self.timeseries_db
return self.timeseries_db # Ensure this is intended to ignore 'key'

# _get_query: not overridden
# _get_sort_query: not overridden
# _to_df_entry: not overridden
# df_row_to_entry: not overridden
# get_entry_from_id: not overridden

def find_entries(self, key_list = None, time_query = None, geo_query = None,
extra_query_list=None):
def find_entries(self, key_list=None, time_query=None, geo_query=None,
extra_query_list=None):
sort_key = self._get_sort_key(time_query)
logging.debug("curr_query = %s, sort_key = %s" %
(self._get_query(key_list, time_query, geo_query,
extra_query_list), sort_key))
ts_db_result = self._get_entries_for_timeseries(self.timeseries_db,
key_list,
time_query,
geo_query,
extra_query_list,
sort_key)
current_query = self._get_query(key_list, time_query, geo_query, extra_query_list)
logging.debug(f"curr_query = {current_query}, sort_key = {sort_key}")
ts_db_result = self._get_entries_for_timeseries(
self.timeseries_db,
key_list,
time_query,
geo_query,
extra_query_list,
sort_key
)
return ts_db_result

# _get_entries_for_timeseries is unchanged
Expand All @@ -51,16 +53,28 @@ def insert(self, entry):
Inserts the specified entry and returns the object ID
"""
logging.debug("insert called")
if type(entry) == dict:
if isinstance(entry, dict):
entry = ecwe.Entry(entry)
if entry["user_id"] is not None:
raise AttributeError("Saving entry %s for %s in non_user_timeseries" %
(entry, entry["user_id"]))
raise AttributeError(
f"Saving entry {entry} for {entry['user_id']} in non_user_timeseries is not allowed."
)
else:
logging.debug("entry was fine, no need to fix it")

logging.debug("Inserting entry %s into timeseries" % entry)
return self.get_timeseries_db(entry.metadata.key).insert(entry)
# Get the collection and log its full name
collection = self.get_timeseries_db(entry.metadata.key)
logging.debug(f"Collection used for insertion: {collection.full_name}")

logging.debug(f"Inserting entry {entry} into timeseries")
try:
result = collection.insert_one(entry)
logging.debug(f"Inserted entry with ID: {result.inserted_id}")
return result.inserted_id
except pymongo.errors.PyMongoError as e:
logging.error(f"Failed to insert entry: {e}")
raise


# insert_data is unchanged
def insert_error(self, entry):
Expand All @@ -87,4 +101,3 @@ def update_data(user_id, key, obj_id, data):
versioned objects
"""
raise AttributeError("non_user_timeseries does not support updates")

84 changes: 65 additions & 19 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# emission/tests/funcTests/TestFunctionTiming.py

import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
from contextlib import contextmanager
from typing import Callable, List

# Import the store_dashboard_time and store_dashboard_error functions
from emission.storage.decorations.stats_queries import (
store_dashboard_time,
store_dashboard_error
)

# Import the existing Timer context manager
from emission.core.timer import Timer as ECT_Timer

# Import the run_function_pipeline function from time_functions.py
from emission.functions.time_functions import run_function_pipeline

# Define test functions
def test_function_1():
Expand All @@ -20,7 +22,7 @@ def test_function_1():

def test_function_2():
logging.info("Executing test_function_2")
time.sleep(1)
time.sleep(2)
return True

def test_function_faulty():
Expand All @@ -30,20 +32,64 @@ def test_function_faulty():

def test_function_3():
logging.info("Executing test_function_3")
time.sleep(1)
time.sleep(3)
return True

if __name__ == "__main__":
# Ensure the logs directory exists
os.makedirs("logs", exist_ok=True)

def execute_and_time_function(func: Callable[[], bool]):
"""
Executes a given function, measures its execution time using ECT_Timer,
and stores the timing information using store_dashboard_time.
If the function raises an exception, it stores the error using store_dashboard_error.
Parameters:
- func (Callable[[], bool]): The test function to execute and time.
"""
function_name = func.__name__
timestamp = time.time()

logging.info(f"Starting timing for function: {function_name}")

try:
with ECT_Timer() as timer:
result = func() # Execute the test function

elapsed_seconds = timer.elapsed # Accessing the float attribute directly
elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds

# Store the execution time
store_dashboard_time(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
)
print(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")
logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.")

except Exception as e:
# Even if the function fails, capture the elapsed time up to the exception
elapsed_seconds = timer.elapsed if 'timer' in locals() else 0 # Accessing the float attribute directly
elapsed_ms = elapsed_seconds * 1000

# Store the error timing
store_dashboard_error(
code_fragment_name=function_name,
ts=timestamp,
reading=elapsed_ms
)
print(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")
logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}")

def main():
# Define the list of test functions, including the faulty one
function_list = [
function_list: List[Callable[[], bool]] = [
test_function_1,
test_function_2,
test_function_faulty, # This will raise an exception
# test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]

# Run the pipeline with process number 1 and skip_if_no_new_data set to True
run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)
# Execute and time each function
for func in function_list:
execute_and_time_function(func)

if __name__ == "__main__":
main()

0 comments on commit f428069

Please sign in to comment.