Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Merged
merged 42 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
acf7f3e
Typo Fix
TeachMeTW Oct 2, 2024
553a45d
Added Func Time Collection?
TeachMeTW Oct 4, 2024
d19392d
Fixed per feedback, removed decorator
TeachMeTW Oct 5, 2024
05bac58
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e23d4b6
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
7fc3f81
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
e7e50da
Update emission/storage/timeseries/builtin_timeseries.py
TeachMeTW Oct 12, 2024
dd253dd
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 12, 2024
8957a06
Review Changes
TeachMeTW Oct 12, 2024
f428069
Changes
TeachMeTW Oct 13, 2024
4f636a9
Reverted some
TeachMeTW Oct 14, 2024
b2e3cb5
Added verification
TeachMeTW Oct 14, 2024
a1ff54e
resolve
TeachMeTW Oct 14, 2024
12d5e86
Fixed whitespace commits
TeachMeTW Oct 14, 2024
cf5b5a1
Removed unused imports
TeachMeTW Oct 14, 2024
2b5cefd
Removed unused imports
TeachMeTW Oct 14, 2024
5ac74fa
Delete miniconda.sh
TeachMeTW Oct 14, 2024
64cfc3f
Removed unnecessary
TeachMeTW Oct 14, 2024
a7884e5
Merge branch 'feature/stats_timing' of https://github.com/TeachMeTW/e…
TeachMeTW Oct 14, 2024
196e410
Timeseries
TeachMeTW Oct 14, 2024
fc3f0ee
Used TimeSeries Interface and made it so _get_query accepts dict and …
TeachMeTW Oct 15, 2024
d02f186
Used get entry at ts
TeachMeTW Oct 15, 2024
b6bb6b9
Refactor
TeachMeTW Oct 15, 2024
bc9f638
Turned into unit test
TeachMeTW Oct 15, 2024
f814102
Wrong comment
TeachMeTW Oct 15, 2024
5066c85
Update .gitignore
TeachMeTW Oct 15, 2024
9996707
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 15, 2024
fed4964
Update emission/storage/decorations/stats_queries.py
TeachMeTW Oct 15, 2024
09f93df
remvoed whitespace
TeachMeTW Oct 15, 2024
9c418c8
Used find_entries
TeachMeTW Oct 15, 2024
5ba94d7
Jack Changes
TeachMeTW Oct 15, 2024
7f12f67
Reverts
TeachMeTW Oct 16, 2024
4c7b80b
Reverted Non_user
TeachMeTW Oct 16, 2024
e843169
Added more tests
TeachMeTW Oct 16, 2024
9f9ec9e
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
d76f63a
Update emission/tests/funcTests/TestFunctionTiming.py
TeachMeTW Oct 16, 2024
f5b07a4
Implemented Review Changes
TeachMeTW Oct 16, 2024
a24b03b
simplify tests for store_dashboard_time / store_dashboard_error
JGreenlee Oct 17, 2024
39f763d
move TestFunctionTiming -> TestStatsQueries
JGreenlee Oct 17, 2024
1bdc922
Merge pull request #1 from JGreenlee/simpler_stats_timing
TeachMeTW Oct 17, 2024
db1e7bb
Added comments and changed param
TeachMeTW Oct 17, 2024
0b39803
Toned back comments
TeachMeTW Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# dashboard time, measured on the server
"stats/dashboard_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# intended to log the occurrence of errors in the dashboard
"stats/dashboard_error": "statsevent",
# time for various client operations, measured on the client
# comparison with the server_api_time can help debug networking issues
"stats/client_time": "statsevent",
Expand Down
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
39 changes: 37 additions & 2 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import time
# Standard imports
from future import standard_library
standard_library.install_aliases()
from builtins import *
import logging
import time

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.entry as ecwe
import emission.core.timer as ec_timer


# metadata format is
Expand Down Expand Up @@ -46,3 +46,38 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved

def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer):
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"""
Stores statistics about execution times in dashboard code using a Timer object.
Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level
in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information.

:param code_fragment_name (str): The name of the function or code fragment being timed.
:param timer (ec_timer.Timer): The Timer object that records the execution duration.
"""
# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_time",
name=code_fragment_name,
ts=timestamp,
reading=timer.elapsed_ms
)


def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer):
# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_error",
name=code_fragment_name,
ts=timestamp,
reading=timer.elapsed_ms
)

2 changes: 2 additions & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/dashboard_time": self.timeseries_db,
TeachMeTW marked this conversation as resolved.
Show resolved Hide resolved
"stats/dashboard_error": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
157 changes: 157 additions & 0 deletions emission/tests/storageTests/TestStatsQueries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import unittest
import logging
import time

import emission.core.get_database as edb
import emission.core.timer as ect
import emission.storage.decorations.stats_queries as esdsq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.common as etc


class TestFunctionTiming(unittest.TestCase):
@classmethod
def setUpClass(self):
"""
Set up resources before any tests are run.
"""
self.timeseries_db = esta.TimeSeries.get_time_series(None)

def tearDown(self):
"""
Clean up relevant database entries after each test to maintain isolation.
"""
keys_to_clean = ["stats/dashboard_time", "stats/dashboard_error"]
edb.get_timeseries_db().delete_many(
{"metadata.key": {"$in": keys_to_clean}}
)
logging.debug(f"After test, cleared DB entries for {keys_to_clean}")

def verify_stats_entries(self, expected_entries: list[tuple[str, str, float]]):
"""
Verifies that each of the expected entries, in the form of (key, name, elapsed_ms),
are stored correctly in the database.

:param expected_entries: A list of tuples containing (key, name, expected_elapsed_ms).
"""
logging.debug(f"Ensuring {len(expected_entries)} entries exist in DB.")
# Prepare keys for database query based on expected entries.
key_list = [key for (key, _, _) in expected_entries]
# Fetch matching entries from the timeseries database.
stored_entrys = list(self.timeseries_db.find_entries(key_list))
# Check if the number of retrieved entries matches expectations.
self.assertEqual(len(stored_entrys), len(expected_entries))

# Validate each stored entry against the expected data.
for i in range(len(expected_entries)):
stored_entry = stored_entrys[i]
expected_key, expected_name, expected_reading = expected_entries[i]
logging.debug(f"Comparing expected {expected_entries[i]} " +
f"with stored {stored_entry['metadata']['key']} {stored_entry['data']}")
# Verify the key matches.
self.assertEqual(stored_entry['metadata']['key'], expected_key)
# Verify the name matches.
self.assertEqual(stored_entry['data']['name'], expected_name)
# Verify the elapsed time is as expected.
self.assertEqual(stored_entry['data']['reading'], expected_reading)

def test_single_function_timing(self):
"""
Test the execution and timing of a single function.
This test measures how long 'sample_function' takes to execute and verifies
that the timing information is correctly stored in the database.
"""
def sample_function():
logging.debug("Executing sample_function")
time.sleep(2) # Simulate processing time by sleeping for 2 seconds.
return True

# Measure the execution time of 'sample_function'.
with ect.Timer() as timer:
sample_function()

# Record the timing data in the database.
esdsq.store_dashboard_time("sample_function", timer)

# Confirm the timing was recorded correctly.
self.verify_stats_entries([
("stats/dashboard_time", "sample_function", timer.elapsed_ms)
])

def test_multiple_functions_timing(self):
"""
Test the execution and timing of two functions.
This test records and validates the time taken for:
(i) function_one,
(ii) function_two, and
(iii) both functions together.
"""
def function_one():
# Simulate processing time by sleeping for 1 second.
return time.sleep(1)

def function_two():
# Simulate processing time by sleeping for 1.5 seconds.
return time.sleep(1.5)

# Track the total time for both functions.
with ect.Timer() as timer_both:
# Time 'function_one' execution.
with ect.Timer() as timer_one:
function_one()
# Record 'function_one' timing.
esdsq.store_dashboard_time("function_one", timer_one)

# Time 'function_two' execution.
with ect.Timer() as timer_two:
function_two()
# Record 'function_two' timing.
esdsq.store_dashboard_time("function_two", timer_two)

# Record the combined timing for both functions.
esdsq.store_dashboard_time("functions_one_and_two", timer_both)

# Validate individual and combined timings.
self.assertAlmostEqual(timer_one.elapsed_ms, 1000, delta=100)
self.assertAlmostEqual(timer_two.elapsed_ms, 1500, delta=100)
self.assertAlmostEqual(timer_both.elapsed_ms, 2500, delta=100)

# Ensure all timing entries are correctly stored.
self.verify_stats_entries([
("stats/dashboard_time", "function_one", timer_one.elapsed_ms),
("stats/dashboard_time", "function_two", timer_two.elapsed_ms),
("stats/dashboard_time", "functions_one_and_two", timer_both.elapsed_ms)
])

def test_faulty_function_timing(self):
"""
Test the execution and timing of a faulty function that is expected to raise an exception.
This test ensures that even when a function fails, the timing information is correctly
recorded as an error in the database.
"""
def faulty_function():
logging.debug("Executing faulty_function")
time.sleep(1) # Simulate processing time before failure.
raise ValueError("Simulated error in faulty_function")

try:
# Attempt to execute and time the faulty function.
with ect.Timer() as timer:
faulty_function()
except ValueError as e:
# Handle the expected exception and record the timing as an error.
logging.info(f"Caught expected error: {e}")
esdsq.store_dashboard_error('faulty_function', timer)
# Continue after handling the exception.
pass

# Verify that the error timing was recorded.
self.verify_stats_entries([
("stats/dashboard_error", "faulty_function", timer.elapsed_ms)
])



if __name__ == '__main__':
etc.configLogging()
unittest.main()
Loading