From 1066f9116c0ad9c9ce069554688980ba70eb4622 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 14:35:36 -0800 Subject: [PATCH 01/10] Discussed with Jack, approved with this change to clarify pipeline stage --- emission/pipeline/intake_stage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index b681f93d5..424d85e32 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -199,11 +199,11 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): time.time(), crt.elapsed) with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10) + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) _get_and_store_range(uuid, "analysis/composite_trip") - esds.store_pipeline_time(uuid, 'GENERATE_STORE_AND_RANGE', + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): From 8d45a278875b837d3bd2da19964529d56633a507 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 8 Nov 2024 10:54:59 -0800 Subject: [PATCH 02/10] Added total, labeled, and last call --- emission/pipeline/intake_stage.py | 101 ++++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 424d85e32..0b3499e3c 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -13,6 +13,7 @@ from uuid import UUID import time import pymongo +from datetime import datetime import emission.core.get_database as edb import emission.core.timer as ect @@ -207,14 +208,92 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - user = ecwu.User(user_id) - user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) - logging.debug("After updating, new profiles is %s" % user.getProfile()) + """ + Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call. + + Parameters: + - user_id (str): The UUID of the user. + - trip_key (str): The key representing the trip data in the time series. + """ + + try: + logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") + + # Fetch the time series for the user + ts = esta.TimeSeries.get_time_series(user_id) + logging.debug("Fetched time series data.") + + # Get start timestamp + start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + start_ts = None if start_ts == -1 else start_ts + logging.debug(f"Start timestamp: {start_ts}") + + # Get end timestamp + end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + end_ts = None if end_ts == -1 else end_ts + logging.debug(f"End timestamp: {end_ts}") + + # Initialize counters + total_trips = 0 + labeled_trips = 0 + + # Retrieve trip entries as an iterator + trip_entries = ts.find_entries([trip_key], time_query=None) + + # Iterate through trip_entries once to count total_trips and labeled_trips + for trip in trip_entries: + total_trips += 1 + if trip.get('data', {}).get('user_input'): + labeled_trips += 1 + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + + # Retrieve last GET and PUT calls from stats/server_api_time + docs_cursor = edb.get_timeseries_db().find({ + "metadata.key": "stats/server_api_time", + }) + logging.debug("Fetched API call statistics.") + + last_get = None + last_put = None + + for doc in docs_cursor: + api_call_name = doc.get("data", {}).get("name", "") + api_call_ts = doc.get("data", {}).get("ts") + + if not api_call_ts: + logging.warning(f"Missing 'ts' in document: {doc}") + continue + + if api_call_name.startswith("GET_"): + if not last_get or api_call_ts > last_get: + last_get = api_call_ts + logging.debug(f"Updated last_get to: {last_get}") + elif api_call_name.startswith("PUT_"): + if not last_put or api_call_ts > last_put: + last_put = api_call_ts + logging.debug(f"Updated last_put to: {last_put}") + + # Determine the most recent call + if last_get and last_put: + last_call_ts = max(last_get, last_put) + else: + last_call_ts = last_get or last_put + + logging.info(f"Last call timestamp: {last_call_ts}") + + # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call + user = ecwu.User.fromUUID(user_id) + user.update({ + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips, + "last_call": last_call_ts + }) + logging.debug("User profile updated successfully.") + logging.debug("After updating, new profile is %s", user.getProfile()) + + except Exception as e: + logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}") \ No newline at end of file From 5f8d25a1a24cbcc87abb190036a5bdbef0d23cda Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 8 Nov 2024 14:04:37 -0800 Subject: [PATCH 03/10] Modified total and labeled trips to match op-admin implementation --- emission/pipeline/intake_stage.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 0b3499e3c..4e80683a5 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -233,23 +233,22 @@ def _get_and_store_range(user_id, trip_key): end_ts = None if end_ts == -1 else end_ts logging.debug(f"End timestamp: {end_ts}") - # Initialize counters - total_trips = 0 - labeled_trips = 0 - - # Retrieve trip entries as an iterator - trip_entries = ts.find_entries([trip_key], time_query=None) - - # Iterate through trip_entries once to count total_trips and labeled_trips - for trip in trip_entries: - total_trips += 1 - if trip.get('data', {}).get('user_input'): - labeled_trips += 1 - logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + # Retrieve trip entries + total_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + ) + + labeled_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(type(user_id)) # Retrieve last GET and PUT calls from stats/server_api_time docs_cursor = edb.get_timeseries_db().find({ "metadata.key": "stats/server_api_time", + "user_id" : user_id }) logging.debug("Fetched API call statistics.") From a46ec97ca1f42a3b536de356978c01c2a10807ab Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 8 Nov 2024 14:12:18 -0800 Subject: [PATCH 04/10] Modified last call to mirror op-admin implementation --- emission/pipeline/intake_stage.py | 48 +++++++++++-------------------- 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 4e80683a5..2d85294ea 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -215,7 +215,7 @@ def _get_and_store_range(user_id, trip_key): - user_id (str): The UUID of the user. - trip_key (str): The key representing the trip data in the time series. """ - + time_format = 'YYYY-MM-DD HH:mm:ss' try: logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") @@ -245,43 +245,29 @@ def _get_and_store_range(user_id, trip_key): logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") logging.info(type(user_id)) - # Retrieve last GET and PUT calls from stats/server_api_time - docs_cursor = edb.get_timeseries_db().find({ - "metadata.key": "stats/server_api_time", - "user_id" : user_id - }) logging.debug("Fetched API call statistics.") - last_get = None - last_put = None - - for doc in docs_cursor: - api_call_name = doc.get("data", {}).get("name", "") - api_call_ts = doc.get("data", {}).get("ts") - - if not api_call_ts: - logging.warning(f"Missing 'ts' in document: {doc}") - continue - - if api_call_name.startswith("GET_"): - if not last_get or api_call_ts > last_get: - last_get = api_call_ts - logging.debug(f"Updated last_get to: {last_get}") - elif api_call_name.startswith("PUT_"): - if not last_put or api_call_ts > last_put: - last_put = api_call_ts - logging.debug(f"Updated last_put to: {last_put}") - - # Determine the most recent call - if last_get and last_put: - last_call_ts = max(last_get, last_put) - else: - last_call_ts = last_get or last_put + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) logging.info(f"Last call timestamp: {last_call_ts}") # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call user = ecwu.User.fromUUID(user_id) + if last_call_ts != -1: + # Format the timestamp using arrow + formatted_last_call = arrow.get(last_call_ts).format(time_format) + # Assign using attribute access or the update method + # Option 1: Attribute Assignment (if supported) + # user.last_call = formatted_last_call + + # Option 2: Using the update method + user.update({ + "last_call": formatted_last_call + }) user.update({ "pipeline_range": { "start_ts": start_ts, From 0758b8d2bdbe12b569d5cf3a5663c2c63c671443 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 8 Nov 2024 14:24:17 -0800 Subject: [PATCH 05/10] Separated Get and Store into a separate file Addressed comments, reduced overkill on refactor Forgot to add last_call_ts --- emission/analysis/result/user_stat.py | 112 ++++++++++++++++++++++++++ emission/pipeline/intake_stage.py | 81 +------------------ 2 files changed, 115 insertions(+), 78 deletions(-) create mode 100644 emission/analysis/result/user_stat.py diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py new file mode 100644 index 000000000..947c08fb6 --- /dev/null +++ b/emission/analysis/result/user_stat.py @@ -0,0 +1,112 @@ +# emission/analysis/result/user_stats.py + +import logging +import pymongo +import arrow +from typing import Optional, Dict, Any +import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.user as ecwu + +TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss' + +def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int: + """ + Counts the number of trips based on the provided query. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :param key_list: List of keys to filter trips. + :type key_list: list + :param extra_query_list: Additional queries, defaults to None. + :type extra_query_list: Optional[list], optional + :return: The count of trips. + :rtype: int + """ + count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list) + logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}") + return count + + +def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]: + """ + Retrieves the last API call timestamp. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :return: The last call timestamp or None if not found. + :rtype: Optional[int] + """ + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) + logging.debug(f"Last call timestamp: {last_call_ts}") + return None if last_call_ts == -1 else last_call_ts + + +def update_user_profile(user_id: str, data: Dict[str, Any]) -> None: + """ + Updates the user profile with the provided data. + + :param user_id: The UUID of the user. + :type user_id: str + :param data: The data to update in the user profile. + :type data: Dict[str, Any] + :return: None + """ + user = ecwu.User.fromUUID(user_id) + user.update(data) + logging.debug(f"User profile updated with data: {data}") + logging.debug(f"New profile: {user.getProfile()}") + + +def get_and_store_user_stats(user_id: str, trip_key: str) -> None: + """ + Aggregates and stores user statistics into the user profile. + + :param user_id: The UUID of the user. + :type user_id: str + :param trip_key: The key representing the trip data in the time series. + :type trip_key: str + :return: None + """ + try: + logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}") + + ts = esta.TimeSeries.get_time_series(user_id) + start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + start_ts = None if start_ts_result == -1 else start_ts_result + + end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + end_ts = None if end_ts_result == -1 else end_ts_result + + total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"]) + labeled_trips = count_trips( + ts, + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(f"user_id type: {type(user_id)}") + + last_call_ts = get_last_call_timestamp(ts) + logging.info(f"Last call timestamp: {last_call_ts}") + + update_data = { + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips, + "last_call_ts": last_call_ts + } + + update_user_profile(user_id, update_data) + + logging.debug("User profile updated successfully.") + + except Exception as e: + logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}") \ No newline at end of file diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 2d85294ea..d58be2b25 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -40,6 +40,7 @@ import emission.storage.decorations.stats_queries as esds import emission.core.wrapper.user as ecwu +import emission.analysis.result.user_stat as eaurs def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ @@ -202,83 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): with ect.Timer() as gsr: logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - _get_and_store_range(uuid, "analysis/composite_trip") + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed) - -def _get_and_store_range(user_id, trip_key): - """ - Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call. - - Parameters: - - user_id (str): The UUID of the user. - - trip_key (str): The key representing the trip data in the time series. - """ - time_format = 'YYYY-MM-DD HH:mm:ss' - try: - logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") - - # Fetch the time series for the user - ts = esta.TimeSeries.get_time_series(user_id) - logging.debug("Fetched time series data.") - - # Get start timestamp - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - start_ts = None if start_ts == -1 else start_ts - logging.debug(f"Start timestamp: {start_ts}") - - # Get end timestamp - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - end_ts = None if end_ts == -1 else end_ts - logging.debug(f"End timestamp: {end_ts}") - - # Retrieve trip entries - total_trips = ts.find_entries_count( - key_list=["analysis/confirmed_trip"], - ) - - labeled_trips = ts.find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'data.user_input': {'$ne': {}}}] - ) - - logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") - logging.info(type(user_id)) - logging.debug("Fetched API call statistics.") - - last_call_ts = ts.get_first_value_for_field( - key='stats/server_api_time', - field='data.ts', - sort_order=pymongo.DESCENDING - ) - - logging.info(f"Last call timestamp: {last_call_ts}") - - # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call - user = ecwu.User.fromUUID(user_id) - if last_call_ts != -1: - # Format the timestamp using arrow - formatted_last_call = arrow.get(last_call_ts).format(time_format) - # Assign using attribute access or the update method - # Option 1: Attribute Assignment (if supported) - # user.last_call = formatted_last_call - - # Option 2: Using the update method - user.update({ - "last_call": formatted_last_call - }) - user.update({ - "pipeline_range": { - "start_ts": start_ts, - "end_ts": end_ts - }, - "total_trips": total_trips, - "labeled_trips": labeled_trips, - "last_call": last_call_ts - }) - logging.debug("User profile updated successfully.") - logging.debug("After updating, new profile is %s", user.getProfile()) - - except Exception as e: - logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}") \ No newline at end of file + time.time(), gsr.elapsed) From 25250f5d4d9e10e1e7615a77473e3e26a3832e93 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Wed, 4 Dec 2024 12:42:11 -0800 Subject: [PATCH 06/10] Add initial unit tests for `get_and_store_user_stats` - Introduced comprehensive test suite to validate functionality of the `get_and_store_user_stats` method. - Covered core scenarios including: - Correct data aggregation and storage. - Handling cases with no trips or missing data. - Verifying behavior with partial data and multiple invocations. - handling of invalid UUID inputs. - setup and teardown to ensure test isolation and clean up user data. - Included data insertion for confirmed trips, composite trips, and server API timestamps to simulate realistic scenarios. This initial test suite establishes a baseline for ensuring reliability of the `get_and_store_user_stats` function. remove Modified test Refactored the test and simplified it; validated all new user stats --- .../analysisTests/intakeTests/TestUserStat.py | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 emission/tests/analysisTests/intakeTests/TestUserStat.py diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py new file mode 100644 index 000000000..ecf65d8a4 --- /dev/null +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -0,0 +1,141 @@ +from __future__ import unicode_literals, print_function, division, absolute_import +import unittest +import uuid +import logging +import json +import os +import time +import pandas as pd + +from builtins import * +from future import standard_library +standard_library.install_aliases() + +# Standard imports +import emission.storage.json_wrappers as esj + +# Our imports +import emission.core.get_database as edb +import emission.storage.timeseries.timequery as estt +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.wrapper.user as ecwu +import emission.net.api.stats as enac + +# Test imports +import emission.tests.common as etc + + +class TestUserStats(unittest.TestCase): + def setUp(self): + """ + Set up the test environment by loading real example data for both Android and users. + """ + # Configure logging for the test + etc.configLogging() + + # Retrieve a test user UUID from the database + get_example = pd.DataFrame(list(edb.get_uuid_db().find({}, {"user_email":1, "uuid": 1, "_id": 0}))) + if get_example.empty: + self.fail("No users found in the database to perform tests.") + test_user_id = get_example.iloc[0].uuid + self.testUUID = test_user_id + self.UUID = test_user_id + # Load example entries from a JSON file + with open("emission/tests/data/real_examples/shankari_2015-aug-27") as fp: + self.entries = json.load(fp, object_hook=esj.wrapped_object_hook) + + # Set up the real example data with entries + etc.setupRealExampleWithEntries(self) + + # Retrieve the user profile + profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + if profile is None: + # Initialize the profile if it does not exist + edb.get_profile_db().insert_one({"user_id": self.UUID}) + + etc.runIntakePipeline(self.UUID) + + logging.debug("UUID = %s" % (self.UUID)) + + def tearDown(self): + """ + Clean up the test environment by removing analysis configuration and deleting test data from databases. + """ + + # Delete all time series entries for users + tsdb = edb.get_timeseries_db() + tsdb.delete_many({"user_id": self.UUID}) + + # Delete all pipeline state entries for users + pipeline_db = edb.get_pipeline_state_db() + pipeline_db.delete_many({"user_id": self.UUID}) + + # Delete all analysis time series entries for users + analysis_ts_db = edb.get_analysis_timeseries_db() + analysis_ts_db.delete_many({"user_id": self.UUID}) + + # Delete user profiles + profile_db = edb.get_profile_db() + profile_db.delete_one({"user_id": self.UUID}) + + def testGetAndStoreUserStats(self): + """ + Test get_and_store_user_stats for the user to ensure that user statistics + are correctly aggregated and stored in the user profile. + """ + + # Retrieve the updated user profile from the database + profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + + # Ensure that the profile exists + self.assertIsNotNone(profile, "User profile should exist after storing stats.") + + # Verify that the expected fields are present + self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.") + self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.") + self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.") + self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.") + + expected_total_trips = 8 + expected_labeled_trips = 0 + + self.assertEqual(profile["total_trips"], expected_total_trips, + f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}") + self.assertEqual(profile["labeled_trips"], expected_labeled_trips, + f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}") + + # Verify pipeline range + pipeline_range = profile.get("pipeline_range", {}) + self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.") + self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.") + + expected_start_ts = 1440688739.672 + expected_end_ts = 1440729142.709 + + self.assertEqual(pipeline_range["start_ts"], expected_start_ts, + f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}") + self.assertEqual(pipeline_range["end_ts"], expected_end_ts, + f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}") + + def testLastCall(self): + # Call the function with all required arguments + enac.store_server_api_time(self.UUID, "test_call", 1440729142.709, 69) + + etc.runIntakePipeline(self.UUID) + + # Retrieve the profile from the database + profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + + # Verify that last_call_ts is updated correctly + expected_last_call_ts = 1440729142.709 + actual_last_call_ts = profile.get("last_call_ts") + + self.assertEqual( + actual_last_call_ts, + expected_last_call_ts, + f"Expected last_call_ts to be {expected_last_call_ts}, got {actual_last_call_ts}" + ) + +if __name__ == '__main__': + unittest.main() From 179594b78ff1f51b03c4511bab8d1024521dcb0b Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 13 Dec 2024 12:07:47 -0800 Subject: [PATCH 07/10] added store to intake --- emission/tests/common.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/emission/tests/common.py b/emission/tests/common.py index baae6053c..23bb9ec7f 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -193,6 +193,7 @@ def runIntakePipeline(uuid): import emission.analysis.userinput.expectations as eaue import emission.analysis.classification.inference.labels.pipeline as eacilp import emission.analysis.plotting.composite_trip_creation as eapcc + import emission.analysis.result.user_stat as eaurs eaum.match_incoming_user_inputs(uuid) eaicf.filter_accuracy(uuid) @@ -205,6 +206,8 @@ def runIntakePipeline(uuid): eaue.populate_expectations(uuid) eaum.create_confirmed_objects(uuid) eapcc.create_composite_objects(uuid) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + def configLogging(): """ From 3ef0517450407e3c87b71036e7ad478c99cc01fc Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 13 Dec 2024 20:33:48 -0800 Subject: [PATCH 08/10] Updated Based on requested changes --- .../analysisTests/intakeTests/TestUserStat.py | 59 ++++++------------- 1 file changed, 18 insertions(+), 41 deletions(-) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py index ecf65d8a4..260342d86 100644 --- a/emission/tests/analysisTests/intakeTests/TestUserStat.py +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -31,53 +31,28 @@ def setUp(self): """ Set up the test environment by loading real example data for both Android and users. """ - # Configure logging for the test - etc.configLogging() - - # Retrieve a test user UUID from the database - get_example = pd.DataFrame(list(edb.get_uuid_db().find({}, {"user_email":1, "uuid": 1, "_id": 0}))) - if get_example.empty: - self.fail("No users found in the database to perform tests.") - test_user_id = get_example.iloc[0].uuid - self.testUUID = test_user_id - self.UUID = test_user_id - # Load example entries from a JSON file - with open("emission/tests/data/real_examples/shankari_2015-aug-27") as fp: - self.entries = json.load(fp, object_hook=esj.wrapped_object_hook) - # Set up the real example data with entries - etc.setupRealExampleWithEntries(self) + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") # Retrieve the user profile - profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) if profile is None: # Initialize the profile if it does not exist - edb.get_profile_db().insert_one({"user_id": self.UUID}) + edb.get_profile_db().insert_one({"user_id": self.testUUID}) - etc.runIntakePipeline(self.UUID) + etc.runIntakePipeline(self.testUUID) - logging.debug("UUID = %s" % (self.UUID)) + logging.debug("UUID = %s" % (self.testUUID)) def tearDown(self): """ Clean up the test environment by removing analysis configuration and deleting test data from databases. """ - # Delete all time series entries for users - tsdb = edb.get_timeseries_db() - tsdb.delete_many({"user_id": self.UUID}) - - # Delete all pipeline state entries for users - pipeline_db = edb.get_pipeline_state_db() - pipeline_db.delete_many({"user_id": self.UUID}) - - # Delete all analysis time series entries for users - analysis_ts_db = edb.get_analysis_timeseries_db() - analysis_ts_db.delete_many({"user_id": self.UUID}) - - # Delete user profiles - profile_db = edb.get_profile_db() - profile_db.delete_one({"user_id": self.UUID}) + edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_profile_db().delete_one({"user_id": self.testUUID}) def testGetAndStoreUserStats(self): """ @@ -86,7 +61,7 @@ def testGetAndStoreUserStats(self): """ # Retrieve the updated user profile from the database - profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) # Ensure that the profile exists self.assertIsNotNone(profile, "User profile should exist after storing stats.") @@ -120,15 +95,15 @@ def testGetAndStoreUserStats(self): def testLastCall(self): # Call the function with all required arguments - enac.store_server_api_time(self.UUID, "test_call", 1440729142.709, 69) - - etc.runIntakePipeline(self.UUID) + test_call_ts = time.time() + enac.store_server_api_time(self.testUUID, "test_call_ts", test_call_ts, 69420) + etc.runIntakePipeline(self.testUUID) # Retrieve the profile from the database - profile = edb.get_profile_db().find_one({"user_id": self.UUID}) + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) # Verify that last_call_ts is updated correctly - expected_last_call_ts = 1440729142.709 + expected_last_call_ts = test_call_ts actual_last_call_ts = profile.get("last_call_ts") self.assertEqual( @@ -138,4 +113,6 @@ def testLastCall(self): ) if __name__ == '__main__': - unittest.main() + # Configure logging for the test + etc.configLogging() + unittest.main() \ No newline at end of file From 3d688f2b63a3f7c738f671cffbe7b4d217fdcf95 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Fri, 13 Dec 2024 22:18:19 -0800 Subject: [PATCH 09/10] Removed unnecessary wrapper --- emission/analysis/result/user_stat.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py index 947c08fb6..fa1d7ac95 100644 --- a/emission/analysis/result/user_stat.py +++ b/emission/analysis/result/user_stat.py @@ -7,26 +7,6 @@ import emission.storage.timeseries.abstract_timeseries as esta import emission.core.wrapper.user as ecwu -TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss' - -def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int: - """ - Counts the number of trips based on the provided query. - - :param ts: The time series object. - :type ts: esta.TimeSeries - :param key_list: List of keys to filter trips. - :type key_list: list - :param extra_query_list: Additional queries, defaults to None. - :type extra_query_list: Optional[list], optional - :return: The count of trips. - :rtype: int - """ - count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list) - logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}") - return count - - def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]: """ Retrieves the last API call timestamp. @@ -81,9 +61,8 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) end_ts = None if end_ts_result == -1 else end_ts_result - total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"]) - labeled_trips = count_trips( - ts, + total_trips = ts.find_entries_count(key_list=["analysis/confirmed_trip"]) + labeled_trips = ts.find_entries_count( key_list=["analysis/confirmed_trip"], extra_query_list=[{'data.user_input': {'$ne': {}}}] ) From e7a03ef0a4193d315807d592b88ec9a7423a4086 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Mon, 16 Dec 2024 11:50:57 -0800 Subject: [PATCH 10/10] Fix file handling to prevent ValueError and terminal crashes - Replaced manual file open/close with `with` statement to ensure proper resource management. - Removed redundant file read operation after the file was closed. - Resolved `ValueError: I/O operation on closed file` and addressed terminal crashing issue during execution. --- .../net/ext_service/transit_matching/match_stops.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/emission/net/ext_service/transit_matching/match_stops.py b/emission/net/ext_service/transit_matching/match_stops.py index afa5d6abd..3e6deb1c3 100644 --- a/emission/net/ext_service/transit_matching/match_stops.py +++ b/emission/net/ext_service/transit_matching/match_stops.py @@ -15,12 +15,12 @@ url = "https://lz4.overpass-api.de/" try: - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template') -except: + with open('conf/net/ext_service/overpass_transit_stops_query_template', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) +except FileNotFoundError: print("transit stops query not configured, falling back to default") - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template.sample') - -query_string = "".join(query_file.readlines()) + with open('conf/net/ext_service/overpass_transit_stops_query_template.sample', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) RETRY = -1