Skip to content

Commit

Permalink
Modified test
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Dec 12, 2024
1 parent f4c57f0 commit 823d0ad
Showing 1 changed file with 190 additions and 153 deletions.
343 changes: 190 additions & 153 deletions emission/tests/analysisTests/intakeTests/TestUserStat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,180 +2,217 @@
import unittest
import uuid
import logging
from datetime import datetime
import json
import os
import time

import emission.analysis.result.user_stat as eaurs
from builtins import *
from future import standard_library
standard_library.install_aliases()

# Standard imports
import emission.storage.json_wrappers as esj

# Our imports
import emission.core.get_database as edb
import emission.storage.timeseries.timequery as estt
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.analysis.result.user_stat as eaurs
import emission.core.wrapper.user as ecwu
import emission.core.get_database as edb

# Test imports
import emission.tests.common as etc

class TestGetAndStoreUserStats(unittest.TestCase):

class TestUserStats(unittest.TestCase):
def setUp(self):
"""
Register a test user and insert initial time series data.
Set up the test environment by loading real example data for both Android and iOS users.
"""
# Configure logging for the test
etc.configLogging()
self.test_email = f'testuser_{uuid.uuid4()}@example.com'
self.user = ecwu.User.register(self.test_email)
self.test_uuid = self.user.uuid

# Ensure the user profile exists
if not self.user.getProfile():
ecwu.User.createProfile(self.test_uuid, datetime.now())

ts = esta.TimeSeries.get_time_series(self.test_uuid)

# Insert 10 'analysis/confirmed_trip' entries
for i in range(10):
trip_entry = {
"metadata": {"key": "analysis/confirmed_trip", "write_ts": datetime.now().timestamp()},
"data": {
"trip_id": str(uuid.uuid4()),
"user_input": {"input": "test"} if i < 7 else {}
}
}
ts.insert(trip_entry)

# Insert a single 'analysis/composite_trip' entry
composite_trip = {
"metadata": {"key": "analysis/composite_trip", "write_ts": datetime.now().timestamp()},
"data": {"start_ts": 1609459200, "end_ts": 1612137600}
}
ts.insert(composite_trip)

# Insert a 'stats/server_api_time' entry
server_api_time = {
"metadata": {"key": "stats/server_api_time", "write_ts": datetime.now().timestamp()},
"data": {"ts": 1614556800}
}
ts.insert(server_api_time)


# Set analysis configuration
self.analysis_conf_path = etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True)

# Setup Android real example
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27")
self.androidUUID = self.testUUID

# Setup iOS real example
self.testUUID = uuid.UUID("c76a0487-7e5a-3b17-a449-47be666b36f6")
with open("emission/tests/data/real_examples/iphone_2015-11-06") as fp:
self.entries = json.load(fp, object_hook=esj.wrapped_object_hook)
etc.setupRealExampleWithEntries(self)
self.iosUUID = self.testUUID

# Apply filter accuracy on iOS UUID
import emission.analysis.intake.cleaning.filter_accuracy as eaicf
eaicf.filter_accuracy(self.iosUUID)

logging.debug("androidUUID = %s, iosUUID = %s" % (self.androidUUID, self.iosUUID))

def tearDown(self):
"""
Unregister the test user to clean up the database.
"""
try:
ecwu.User.unregister(self.test_email)
except Exception as e:
logging.error(f"Failed to unregister user {self.test_email}: {e}")

def test_correct_data(self):
"""
Verify that statistics are correctly aggregated when all data is present.
"""
eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")
profile = self.user.getProfile()

self.assertIn("pipeline_range", profile, "Missing 'pipeline_range' in profile.")
self.assertEqual(profile["pipeline_range"]["start_ts"], 1609459200, "Incorrect 'start_ts'.")
self.assertEqual(profile["pipeline_range"]["end_ts"], 1612137600, "Incorrect 'end_ts'.")

self.assertEqual(profile.get("total_trips", None), 10, "'total_trips' should be 10.")
self.assertEqual(profile.get("labeled_trips", None), 7, "'labeled_trips' should be 7.")

self.assertEqual(profile.get("last_call_ts", None), 1614556800, "'last_call_ts' mismatch.")

def test_no_trips(self):
"""
Ensure that statistics are zeroed out when there are no trips.
"""
tsdb = edb.get_timeseries_db()
tsdb.delete_many({"user_id": self.test_uuid, "metadata.key": "analysis/confirmed_trip"})

# Confirm deletion
remaining_trips = tsdb.count_documents({"user_id": self.test_uuid, "metadata.key": "analysis/confirmed_trip"})
self.assertEqual(remaining_trips, 0, "Confirmed trips were not deleted.")

eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")
profile = self.user.getProfile()

self.assertEqual(profile.get("total_trips", None), 0, "'total_trips' should be 0.")
self.assertEqual(profile.get("labeled_trips", None), 0, "'labeled_trips' should be 0.")

def test_no_last_call(self):
"""
Check that 'last_call_ts' is None when there is no server API time entry.
Clean up the test environment by removing analysis configuration and deleting test data from databases.
"""
# Remove the analysis configuration file
os.remove(self.analysis_conf_path)

# Delete all time series entries for Android and iOS users
tsdb = edb.get_timeseries_db()
tsdb.delete_many({"user_id": self.test_uuid, "metadata.key": "stats/server_api_time"})

# Confirm deletion
remaining_api_times = tsdb.count_documents({"user_id": self.test_uuid, "metadata.key": "stats/server_api_time"})
self.assertEqual(remaining_api_times, 0, "Server API time entries were not deleted.")

eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")
profile = self.user.getProfile()

self.assertIsNone(profile.get("last_call_ts", None), "'last_call_ts' should be None.")

def test_partial_data(self):
tsdb.delete_many({"user_id": self.androidUUID})
tsdb.delete_many({"user_id": self.iosUUID})

# Delete all pipeline state entries for Android and iOS users
pipeline_db = edb.get_pipeline_state_db()
pipeline_db.delete_many({"user_id": self.androidUUID})
pipeline_db.delete_many({"user_id": self.iosUUID})

# Delete all analysis time series entries for Android and iOS users
analysis_ts_db = edb.get_analysis_timeseries_db()
analysis_ts_db.delete_many({"user_id": self.androidUUID})
analysis_ts_db.delete_many({"user_id": self.iosUUID})

# Delete user profiles
profile_db = edb.get_profile_db()
profile_db.delete_one({"user_id": str(self.androidUUID)})
profile_db.delete_one({"user_id": str(self.iosUUID)})

def testGetAndStoreUserStatsAndroid(self):
"""
Verify behavior when 'analysis/composite_trip' data is missing.
Test get_and_store_user_stats for the Android user to ensure that user statistics
are correctly aggregated and stored in the user profile.
"""
tsdb = edb.get_timeseries_db()
tsdb.delete_many({"user_id": self.test_uuid, "metadata.key": "analysis/composite_trip"})

# Confirm deletion
remaining_composite_trips = tsdb.count_documents({"user_id": self.test_uuid, "metadata.key": "analysis/composite_trip"})
self.assertEqual(remaining_composite_trips, 0, "Composite trips were not deleted.")

eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")
profile = self.user.getProfile()

self.assertIsNone(profile["pipeline_range"].get("start_ts"), "'start_ts' should be None.")
self.assertIsNone(profile["pipeline_range"].get("end_ts"), "'end_ts' should be None.")

self.assertEqual(profile.get("total_trips", None), 10, "'total_trips' should remain 10.")
self.assertEqual(profile.get("labeled_trips", None), 7, "'labeled_trips' should remain 7.")

def test_multiple_calls(self):
# Invoke the function to get and store user stats
eaurs.get_and_store_user_stats(str(self.androidUUID), "analysis/composite_trip")

# Retrieve the updated user profile from the database
profile = edb.get_profile_db().find_one({"user_id": str(self.androidUUID)})

# Ensure that the profile exists
self.assertIsNotNone(profile, "User profile should exist after storing stats.")

# Verify that the expected fields are present
self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.")
self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.")
self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.")
self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.")

expected_total_trips = -
expected_labeled_trips = -

self.assertEqual(profile["total_trips"], expected_total_trips,
f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}")
self.assertEqual(profile["labeled_trips"], expected_labeled_trips,
f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}")

# Verify pipeline range
pipeline_range = profile.get("pipeline_range", {})
self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.")
self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.")

expected_start_ts = -
expected_end_ts = -

self.assertEqual(pipeline_range["start_ts"], expected_start_ts,
f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}")
self.assertEqual(pipeline_range["end_ts"], expected_end_ts,
f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}")

# Verify last_call_ts
expected_last_call_ts = -
self.assertEqual(profile["last_call_ts"], expected_last_call_ts,
f"Expected last_call_ts to be {expected_last_call_ts}, got {profile['last_call_ts']}")

def testGetAndStoreUserStatsIOS(self):
"""
Ensure that multiple invocations correctly update statistics without duplication.
Test get_and_store_user_stats for the iOS user to ensure that user statistics
are correctly aggregated and stored in the user profile.
"""
# Initial call
eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")

ts = esta.TimeSeries.get_time_series(self.test_uuid)

# Insert additional trips
for i in range(5):
trip_entry = {
"metadata": {"key": "analysis/confirmed_trip", "write_ts": datetime.now().timestamp()},
"data": {
"trip_id": str(uuid.uuid4()),
"user_input": {"input": "additional_test"} if i < 3 else {}
}
}
ts.insert(trip_entry)

# Insert new server API time entry
new_server_api_time = {
"metadata": {"key": "stats/server_api_time", "write_ts": datetime.now().timestamp()},
"data": {"ts": 1617235200}
}
ts.insert(new_server_api_time)

# Second call
eaurs.get_and_store_user_stats(self.test_uuid, "analysis/composite_trip")
profile = self.user.getProfile()

self.assertEqual(profile.get("total_trips", None), 15, "'total_trips' should be 15 after additional inserts.")
self.assertEqual(profile.get("labeled_trips", None), 10, "'labeled_trips' should be 10 after additional inserts.")
self.assertEqual(profile.get("last_call_ts", None), 1617235200, "'last_call_ts' should be updated to 1617235200.")

def test_exception_handling(self):
# Invoke the function to get and store user stats
eaurs.get_and_store_user_stats(str(self.iosUUID), "analysis/composite_trip")

# Retrieve the updated user profile from the database
profile = edb.get_profile_db().find_one({"user_id": str(self.iosUUID)})

# Ensure that the profile exists
self.assertIsNotNone(profile, "User profile should exist after storing stats.")

# Verify that the expected fields are present
self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.")
self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.")
self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.")
self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.")

expected_total_trips = -
expected_labeled_trips = -

self.assertEqual(profile["total_trips"], expected_total_trips,
f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}")
self.assertEqual(profile["labeled_trips"], expected_labeled_trips,
f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}")

# Verify pipeline range
pipeline_range = profile.get("pipeline_range", {})
self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.")
self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.")

expected_start_ts = -
expected_end_ts = -

self.assertEqual(pipeline_range["start_ts"], expected_start_ts,
f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}")
self.assertEqual(pipeline_range["end_ts"], expected_end_ts,
f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}")

# Verify last_call_ts
expected_last_call_ts = -
self.assertEqual(profile["last_call_ts"], expected_last_call_ts,
f"Expected last_call_ts to be {expected_last_call_ts}, got {profile['last_call_ts']}")


def testEmptyCall(self):
"""
Test handling of invalid UUID inputs.
Test get_and_store_user_stats with a dummy user UUID to ensure that it doesn't raise exceptions.
"""
invalid_uuid = "invalid-uuid-string"
dummyUserId = uuid.uuid4()
try:
eaurs.get_and_store_user_stats(invalid_uuid, "analysis/composite_trip")
eaurs.get_and_store_user_stats(str(dummyUserId), "analysis/composite_trip")
except Exception as e:
self.fail(f"get_and_store_user_stats raised an exception with invalid UUID: {e}")
else:
logging.debug("Handled invalid UUID without raising exceptions.")
self.fail(f"get_and_store_user_stats raised an exception with dummy UUID: {e}")

def testUpdateUserProfile(self):
"""
Test the update_user_profile function directly to ensure it correctly updates user profiles.
"""
# Define sample data to update
update_data = {
"total_trips": 10,
"labeled_trips": 7,
"pipeline_range": {
"start_ts": 1609459200, # 2021-01-01 00:00:00
"end_ts": 1609545600 # 2021-01-02 00:00:00
},
"last_call_ts": 1609632000 # 2021-01-03 00:00:00
}

# Invoke the function to update the user profile
eaurs.update_user_profile(str(self.androidUUID), update_data)

# Retrieve the updated user profile from the database
profile = edb.get_profile_db().find_one({"user_id": str(self.androidUUID)})

# Ensure that the profile exists
self.assertIsNotNone(profile, "User profile should exist after updating.")

# Verify that the profile contains the updated data
for key, value in update_data.items():
self.assertIn(key, profile, f"User profile should contain '{key}'.")
self.assertEqual(profile[key], value,
f"Expected '{key}' to be {value}, got {profile[key]}.")


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()

0 comments on commit 823d0ad

Please sign in to comment.