From b15fcb983c6b2f40e548f53550d417829a2f08fc Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Wed, 2 Oct 2024 13:08:18 -0700 Subject: [PATCH] Add function timing and error logging with unit tests - Implemented `store_dashboard_time` and `store_dashboard_error` functions to log execution times and errors in the database. - Added unit tests in `TestStatsQuery` to verify correct timing and error logging for individual, multiple, and faulty function executions. --- emission/core/wrapper/entry.py | 4 + emission/pipeline/intake_stage.py | 2 +- emission/storage/decorations/stats_queries.py | 39 ++++- .../storage/timeseries/builtin_timeseries.py | 2 + .../tests/storageTests/TestStatsQueries.py | 157 ++++++++++++++++++ 5 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 emission/tests/storageTests/TestStatsQueries.py diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index c6f7b7dd3..4f6d80a44 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -60,8 +60,12 @@ def _getData2Wrapper(): "stats/server_api_error": "statsevent", # pipeline stage time, measured on the server "stats/pipeline_time": "statsevent", + # dashboard time, measured on the server + "stats/dashboard_time": "statsevent", # intended to log the occurrence of errors in the pipeline "stats/pipeline_error": "statsevent", + # intended to log the occurrence of errors in the dashboard + "stats/dashboard_error": "statsevent", # time for various client operations, measured on the client # comparison with the server_api_time can help debug networking issues "stats/client_time": "statsevent", diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index efd600efa..a93ba2996 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): eaum.match_incoming_user_inputs(uuid) esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name, - time.time(), uct.elapsed) + time.time(), uit.elapsed) # Hack until we delete these spurious entries # https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868 diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py index b77b56ae9..163384342 100644 --- a/emission/storage/decorations/stats_queries.py +++ b/emission/storage/decorations/stats_queries.py @@ -2,16 +2,16 @@ from __future__ import print_function from __future__ import division from __future__ import absolute_import +import time # Standard imports from future import standard_library standard_library.install_aliases() from builtins import * -import logging -import time # Our imports import emission.storage.timeseries.abstract_timeseries as esta import emission.core.wrapper.entry as ecwe +import emission.core.timer as ec_timer # metadata format is @@ -46,3 +46,38 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading): new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data) return esta.TimeSeries.get_time_series(user_id).insert(new_entry) +def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer): + """ + Stores statistics about execution times in dashboard code using a Timer object. + Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level + in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information. + + :param code_fragment_name (str): The name of the function or code fragment being timed. + :param timer (ec_timer.Timer): The Timer object that records the execution duration. + """ + # Get the current timestamp in seconds since epoch + timestamp = time.time() + + # Call the existing store_stats_entry function + store_stats_entry( + user_id=None, # No user ID as per current dashboard design + metadata_key="stats/dashboard_time", + name=code_fragment_name, + ts=timestamp, + reading=timer.elapsed_ms + ) + + +def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer): + # Get the current timestamp in seconds since epoch + timestamp = time.time() + + # Call the existing store_stats_entry function + store_stats_entry( + user_id=None, # No user ID as per current dashboard design + metadata_key="stats/dashboard_error", + name=code_fragment_name, + ts=timestamp, + reading=timer.elapsed_ms + ) + diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 15ba9f0ec..78d785719 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -54,6 +54,8 @@ def __init__(self, user_id): "stats/server_api_time": self.timeseries_db, "stats/server_api_error": self.timeseries_db, "stats/pipeline_time": self.timeseries_db, + "stats/dashboard_time": self.timeseries_db, + "stats/dashboard_error": self.timeseries_db, "stats/pipeline_error": self.timeseries_db, "stats/client_time": self.timeseries_db, "stats/client_nav_event": self.timeseries_db, diff --git a/emission/tests/storageTests/TestStatsQueries.py b/emission/tests/storageTests/TestStatsQueries.py new file mode 100644 index 000000000..0d04f6726 --- /dev/null +++ b/emission/tests/storageTests/TestStatsQueries.py @@ -0,0 +1,157 @@ +import unittest +import logging +import time + +import emission.core.get_database as edb +import emission.core.timer as ect +import emission.storage.decorations.stats_queries as esdsq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.common as etc + + +class TestFunctionTiming(unittest.TestCase): + @classmethod + def setUpClass(self): + """ + Set up resources before any tests are run. + """ + self.timeseries_db = esta.TimeSeries.get_time_series(None) + + def tearDown(self): + """ + Clean up relevant database entries after each test to maintain isolation. + """ + keys_to_clean = ["stats/dashboard_time", "stats/dashboard_error"] + edb.get_timeseries_db().delete_many( + {"metadata.key": {"$in": keys_to_clean}} + ) + logging.debug(f"After test, cleared DB entries for {keys_to_clean}") + + def verify_stats_entries(self, expected_entries: list[tuple[str, str, float]]): + """ + Verifies that each of the expected entries, in the form of (key, name, elapsed_ms), + are stored correctly in the database. + + :param expected_entries: A list of tuples containing (key, name, expected_elapsed_ms). + """ + logging.debug(f"Ensuring {len(expected_entries)} entries exist in DB.") + # Prepare keys for database query based on expected entries. + key_list = [key for (key, _, _) in expected_entries] + # Fetch matching entries from the timeseries database. + stored_entrys = list(self.timeseries_db.find_entries(key_list)) + # Check if the number of retrieved entries matches expectations. + self.assertEqual(len(stored_entrys), len(expected_entries)) + + # Validate each stored entry against the expected data. + for i in range(len(expected_entries)): + stored_entry = stored_entrys[i] + expected_key, expected_name, expected_reading = expected_entries[i] + logging.debug(f"Comparing expected {expected_entries[i]} " + + f"with stored {stored_entry['metadata']['key']} {stored_entry['data']}") + # Verify the key matches. + self.assertEqual(stored_entry['metadata']['key'], expected_key) + # Verify the name matches. + self.assertEqual(stored_entry['data']['name'], expected_name) + # Verify the elapsed time is as expected. + self.assertEqual(stored_entry['data']['reading'], expected_reading) + + def test_single_function_timing(self): + """ + Test the execution and timing of a single function. + This test measures how long 'sample_function' takes to execute and verifies + that the timing information is correctly stored in the database. + """ + def sample_function(): + logging.debug("Executing sample_function") + time.sleep(2) # Simulate processing time by sleeping for 2 seconds. + return True + + # Measure the execution time of 'sample_function'. + with ect.Timer() as timer: + sample_function() + + # Record the timing data in the database. + esdsq.store_dashboard_time("sample_function", timer) + + # Confirm the timing was recorded correctly. + self.verify_stats_entries([ + ("stats/dashboard_time", "sample_function", timer.elapsed_ms) + ]) + + def test_multiple_functions_timing(self): + """ + Test the execution and timing of two functions. + This test records and validates the time taken for: + (i) function_one, + (ii) function_two, and + (iii) both functions together. + """ + def function_one(): + # Simulate processing time by sleeping for 1 second. + return time.sleep(1) + + def function_two(): + # Simulate processing time by sleeping for 1.5 seconds. + return time.sleep(1.5) + + # Track the total time for both functions. + with ect.Timer() as timer_both: + # Time 'function_one' execution. + with ect.Timer() as timer_one: + function_one() + # Record 'function_one' timing. + esdsq.store_dashboard_time("function_one", timer_one) + + # Time 'function_two' execution. + with ect.Timer() as timer_two: + function_two() + # Record 'function_two' timing. + esdsq.store_dashboard_time("function_two", timer_two) + + # Record the combined timing for both functions. + esdsq.store_dashboard_time("functions_one_and_two", timer_both) + + # Validate individual and combined timings. + self.assertAlmostEqual(timer_one.elapsed_ms, 1000, delta=100) + self.assertAlmostEqual(timer_two.elapsed_ms, 1500, delta=100) + self.assertAlmostEqual(timer_both.elapsed_ms, 2500, delta=100) + + # Ensure all timing entries are correctly stored. + self.verify_stats_entries([ + ("stats/dashboard_time", "function_one", timer_one.elapsed_ms), + ("stats/dashboard_time", "function_two", timer_two.elapsed_ms), + ("stats/dashboard_time", "functions_one_and_two", timer_both.elapsed_ms) + ]) + + def test_faulty_function_timing(self): + """ + Test the execution and timing of a faulty function that is expected to raise an exception. + This test ensures that even when a function fails, the timing information is correctly + recorded as an error in the database. + """ + def faulty_function(): + logging.debug("Executing faulty_function") + time.sleep(1) # Simulate processing time before failure. + raise ValueError("Simulated error in faulty_function") + + try: + # Attempt to execute and time the faulty function. + with ect.Timer() as timer: + faulty_function() + except ValueError as e: + # Handle the expected exception and record the timing as an error. + logging.info(f"Caught expected error: {e}") + esdsq.store_dashboard_error('faulty_function', timer) + # Continue after handling the exception. + pass + + # Verify that the error timing was recorded. + self.verify_stats_entries([ + ("stats/dashboard_error", "faulty_function", timer.elapsed_ms) + ]) + + + +if __name__ == '__main__': + etc.configLogging() + unittest.main()