diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index ac0c56837..4f6d80a44 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -64,6 +64,8 @@ def _getData2Wrapper(): "stats/dashboard_time": "statsevent", # intended to log the occurrence of errors in the pipeline "stats/pipeline_error": "statsevent", + # intended to log the occurrence of errors in the dashboard + "stats/dashboard_error": "statsevent", # time for various client operations, measured on the client # comparison with the server_api_time can help debug networking issues "stats/client_time": "statsevent", diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py index 9b5c4f215..f68a2f8db 100644 --- a/emission/storage/decorations/stats_queries.py +++ b/emission/storage/decorations/stats_queries.py @@ -56,12 +56,13 @@ def store_dashboard_time(code_fragment_name: str, ts: float, reading: float): - code_fragment_name (str): The name of the function being timed. - ts (float): The timestamp when the function execution started. - reading (float): The duration of the function execution in milliseconds. + Returns: - InsertResult: The result of the insert operation. """ - store_stats_entry(None, "stats/function_time", stage_string, ts, reading) + return store_stats_entry(None, "stats/dashboard_time", code_fragment_name, ts, reading) -def store_function_error(user_id, stage_string, ts, reading): - store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading) +def store_dashboard_error(code_fragment_name: str, ts: float, reading: float): + store_stats_entry(None, "stats/dashboard_error", code_fragment_name, ts, reading) diff --git a/emission/storage/timeseries/non_user_timeseries.py b/emission/storage/timeseries/non_user_timeseries.py index 5e630356c..70c74d85d 100644 --- a/emission/storage/timeseries/non_user_timeseries.py +++ b/emission/storage/timeseries/non_user_timeseries.py @@ -5,10 +5,11 @@ import emission.core.get_database as edb import emission.storage.timeseries.builtin_timeseries as bits +import emission.core.wrapper.entry as ecwe # Added missing import class NonUserTimeSeries(bits.BuiltinTimeSeries): def __init__(self): - super(AggregateTimeSeries, self).__init__(None) + super(NonUserTimeSeries, self).__init__(None) # Corrected superclass name self.user_query = {} self.timeseries_db = edb.get_non_user_timeseries_db() @@ -17,7 +18,7 @@ def get_uuid_list(): return [] def get_timeseries_db(self, key): - return self.timeseries_db + return self.timeseries_db # Ensure this is intended to ignore 'key' # _get_query: not overridden # _get_sort_query: not overridden @@ -25,18 +26,19 @@ def get_timeseries_db(self, key): # df_row_to_entry: not overridden # get_entry_from_id: not overridden - def find_entries(self, key_list = None, time_query = None, geo_query = None, - extra_query_list=None): + def find_entries(self, key_list=None, time_query=None, geo_query=None, + extra_query_list=None): sort_key = self._get_sort_key(time_query) - logging.debug("curr_query = %s, sort_key = %s" % - (self._get_query(key_list, time_query, geo_query, - extra_query_list), sort_key)) - ts_db_result = self._get_entries_for_timeseries(self.timeseries_db, - key_list, - time_query, - geo_query, - extra_query_list, - sort_key) + current_query = self._get_query(key_list, time_query, geo_query, extra_query_list) + logging.debug(f"curr_query = {current_query}, sort_key = {sort_key}") + ts_db_result = self._get_entries_for_timeseries( + self.timeseries_db, + key_list, + time_query, + geo_query, + extra_query_list, + sort_key + ) return ts_db_result # _get_entries_for_timeseries is unchanged @@ -51,16 +53,28 @@ def insert(self, entry): Inserts the specified entry and returns the object ID """ logging.debug("insert called") - if type(entry) == dict: + if isinstance(entry, dict): entry = ecwe.Entry(entry) if entry["user_id"] is not None: - raise AttributeError("Saving entry %s for %s in non_user_timeseries" % - (entry, entry["user_id"])) + raise AttributeError( + f"Saving entry {entry} for {entry['user_id']} in non_user_timeseries is not allowed." + ) else: logging.debug("entry was fine, no need to fix it") - logging.debug("Inserting entry %s into timeseries" % entry) - return self.get_timeseries_db(entry.metadata.key).insert(entry) + # Get the collection and log its full name + collection = self.get_timeseries_db(entry.metadata.key) + logging.debug(f"Collection used for insertion: {collection.full_name}") + + logging.debug(f"Inserting entry {entry} into timeseries") + try: + result = collection.insert_one(entry) + logging.debug(f"Inserted entry with ID: {result.inserted_id}") + return result.inserted_id + except pymongo.errors.PyMongoError as e: + logging.error(f"Failed to insert entry: {e}") + raise + # insert_data is unchanged def insert_error(self, entry): @@ -87,4 +101,3 @@ def update_data(user_id, key, obj_id, data): versioned objects """ raise AttributeError("non_user_timeseries does not support updates") - diff --git a/emission/tests/funcTests/TestFunctionTiming.py b/emission/tests/funcTests/TestFunctionTiming.py index 5e0cea46b..9861ec90c 100644 --- a/emission/tests/funcTests/TestFunctionTiming.py +++ b/emission/tests/funcTests/TestFunctionTiming.py @@ -1,16 +1,18 @@ # emission/tests/funcTests/TestFunctionTiming.py -import json import logging -import logging.config -import os import time -import numpy as np -import arrow -from contextlib import contextmanager +from typing import Callable, List + +# Import the store_dashboard_time and store_dashboard_error functions +from emission.storage.decorations.stats_queries import ( + store_dashboard_time, + store_dashboard_error +) + +# Import the existing Timer context manager +from emission.core.timer import Timer as ECT_Timer -# Import the run_function_pipeline function from time_functions.py -from emission.functions.time_functions import run_function_pipeline # Define test functions def test_function_1(): @@ -20,7 +22,7 @@ def test_function_1(): def test_function_2(): logging.info("Executing test_function_2") - time.sleep(1) + time.sleep(2) return True def test_function_faulty(): @@ -30,20 +32,64 @@ def test_function_faulty(): def test_function_3(): logging.info("Executing test_function_3") - time.sleep(1) + time.sleep(3) return True -if __name__ == "__main__": - # Ensure the logs directory exists - os.makedirs("logs", exist_ok=True) - +def execute_and_time_function(func: Callable[[], bool]): + """ + Executes a given function, measures its execution time using ECT_Timer, + and stores the timing information using store_dashboard_time. + If the function raises an exception, it stores the error using store_dashboard_error. + + Parameters: + - func (Callable[[], bool]): The test function to execute and time. + """ + function_name = func.__name__ + timestamp = time.time() + + logging.info(f"Starting timing for function: {function_name}") + + try: + with ECT_Timer() as timer: + result = func() # Execute the test function + + elapsed_seconds = timer.elapsed # Accessing the float attribute directly + elapsed_ms = elapsed_seconds * 1000 # Convert to milliseconds + + # Store the execution time + store_dashboard_time( + code_fragment_name=function_name, + ts=timestamp, + reading=elapsed_ms + ) + print(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.") + logging.info(f"Function '{function_name}' executed successfully in {elapsed_ms:.2f} ms.") + + except Exception as e: + # Even if the function fails, capture the elapsed time up to the exception + elapsed_seconds = timer.elapsed if 'timer' in locals() else 0 # Accessing the float attribute directly + elapsed_ms = elapsed_seconds * 1000 + + # Store the error timing + store_dashboard_error( + code_fragment_name=function_name, + ts=timestamp, + reading=elapsed_ms + ) + print(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}") + logging.error(f"Function '{function_name}' failed after {elapsed_ms:.2f} ms with error: {e}") + +def main(): # Define the list of test functions, including the faulty one - function_list = [ + function_list: List[Callable[[], bool]] = [ test_function_1, test_function_2, - test_function_faulty, # This will raise an exception + # test_function_faulty, # This will raise an exception test_function_3 # This should execute normally after the faulty function ] - - # Run the pipeline with process number 1 and skip_if_no_new_data set to True - run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True) + # Execute and time each function + for func in function_list: + execute_and_time_function(func) + +if __name__ == "__main__": + main()