Skip to content

Commit

Permalink
Improved Unit Test to run test pipeline and validation
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Dec 12, 2024
1 parent 823d0ad commit b3afe9b
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 0 deletions.
286 changes: 286 additions & 0 deletions emission/tests/analysisTests/intakeTests/TestUserStats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
from __future__ import unicode_literals, print_function, division, absolute_import
import unittest
import uuid
import logging
import json
import os
import time
import pandas as pd

from builtins import *
from future import standard_library
standard_library.install_aliases()

# Standard imports
import emission.storage.json_wrappers as esj

# Our imports
import emission.core.get_database as edb
import emission.storage.timeseries.timequery as estt
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.analysis.result.user_stat as eaurs
import emission.core.wrapper.user as ecwu

# Test imports
import emission.tests.common as etc


class TestUserStats(unittest.TestCase):
def setUp(self):
"""
Set up the test environment by loading real example data for users.
"""
# Configure general logging for the test environment
# This is to help me figure out whats going on
etc.configLogging()

# Set up a dedicated logger for this test class
# The general logger is too noisy
self.logger = logging.getLogger('TestUserStats')
self.logger.setLevel(logging.DEBUG)

# Create a file handler for testuserstat.log
# This will log all debug messages to the file which is in current dir for ease
fh = logging.FileHandler('testuserstat.log')
fh.setLevel(logging.DEBUG)

# Create a formatter and set it for the handler
formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s')
fh.setFormatter(formatter)

# Add the handler to the logger
self.logger.addHandler(fh)
self.logger.propagate = False

# Log that the logger has been set up
self.logger.debug("Dedicated logger for TestUserStats initialized.")

# Set analysis configuration
self.analysis_conf_path = etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True)

# Retrieve a test user UUID from the database
get_example = pd.DataFrame(list(edb.get_uuid_db().find({}, {"user_email":1, "uuid": 1, "_id": 0})))
if get_example.empty:
self.fail("No users found in the database to perform tests.")
test_user_id = get_example.iloc[0].uuid
self.testUUID = test_user_id

# Load example entries from a JSON file
with open("emission/tests/data/real_examples/iphone_2015-11-06") as fp:
self.entries = json.load(fp, object_hook=esj.wrapped_object_hook)

# Set up the real example data with entries
etc.setupRealExampleWithEntries(self)
self.UUID = self.testUUID

# Retrieve the user profile
profile = edb.get_profile_db().find_one({"user_id": self.UUID})
if profile is None:
# Initialize the profile if it does not exist
edb.get_profile_db().insert_one({"user_id": self.UUID})
self.logger.debug(f"Initialized profile for user_id {self.UUID}")

# Apply filter accuracy on UUID
import emission.analysis.intake.cleaning.filter_accuracy as eaicf
eaicf.filter_accuracy(self.UUID)

# Log the UUID and profile status
profile = edb.get_profile_db().find_one({"user_id": self.UUID})
self.logger.debug(f"UUID set to {self.UUID} with profile: {profile}")

def tearDown(self):
"""
Clean up the test environment by removing analysis configuration and deleting test data from databases.
"""
# Remove the analysis configuration file
if os.path.exists(self.analysis_conf_path):
os.remove(self.analysis_conf_path)
self.logger.debug(f"Removed analysis configuration file at {self.analysis_conf_path}")

# Delete all time series entries for users
tsdb = edb.get_timeseries_db()
tsdb.delete_many({"user_id": self.UUID})
self.logger.debug(f"Deleted time series entries for user_id {self.UUID}")

# Delete all pipeline state entries for users
pipeline_db = edb.get_pipeline_state_db()
pipeline_db.delete_many({"user_id": self.UUID})
self.logger.debug(f"Deleted pipeline state entries for user_id {self.UUID}")

# Delete all analysis time series entries for users
analysis_ts_db = edb.get_analysis_timeseries_db()
analysis_ts_db.delete_many({"user_id": self.UUID})
self.logger.debug(f"Deleted analysis time series entries for user_id {self.UUID}")

# Delete user profiles
profile_db = edb.get_profile_db()
profile_db.delete_one({"user_id": self.UUID})
self.logger.debug(f"Deleted user profile for user_id {self.UUID}")

# Remove handlers to clean up after tests
handlers = self.logger.handlers[:]
for handler in handlers:
handler.close()
self.logger.removeHandler(handler)

def testGetAndStoreUserStats(self):
"""
Test get_and_store_user_stats for the user to ensure that user statistics
are correctly aggregated and stored in the user profile.
"""
self.logger.debug("Starting testGetAndStoreUserStats")

etc.runIntakePipeline(self.UUID)

# Retrieve the updated user profile from the database
profile = edb.get_profile_db().find_one({"user_id": self.UUID})
self.logger.debug(f"Retrieved profile: {profile}")

# Ensure that the profile exists
self.assertIsNotNone(profile, "User profile should exist after storing stats.")

# Verify that the expected fields are present
self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.")
self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.")
self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.")
self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.")

expected_total_trips = 1
expected_labeled_trips = 0

self.assertEqual(profile["total_trips"], expected_total_trips,
f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}")
self.assertEqual(profile["labeled_trips"], expected_labeled_trips,
f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}")

# Verify pipeline range
pipeline_range = profile.get("pipeline_range", {})
self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.")
self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.")

expected_start_ts = 1446821491.9683647
expected_end_ts = 1446828217.125328

self.assertEqual(pipeline_range["start_ts"], expected_start_ts,
f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}")
self.assertEqual(pipeline_range["end_ts"], expected_end_ts,
f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}")

# Verify last_call_ts
expected_last_call_ts = None
self.assertEqual(profile["last_call_ts"], expected_last_call_ts,
f"Expected last_call_ts to be {expected_last_call_ts}, got {profile['last_call_ts']}")

self.logger.debug("Completed testGetAndStoreUserStats successfully.")

def testEmptyCall(self):
"""
Test get_and_store_user_stats with a dummy user UUID to ensure that it doesn't raise exceptions.
"""
self.logger.debug("Starting testEmptyCall")

dummyUserId = str(uuid.uuid4()) # Ensure it's a string UUID
try:
eaurs.get_and_store_user_stats(dummyUserId, "analysis/composite_trip")
self.logger.debug(f"Called get_and_store_user_stats with dummyUserId {dummyUserId}")
except Exception as e:
self.logger.error(f"get_and_store_user_stats raised an exception with dummy UUID: {e}")
self.fail(f"get_and_store_user_stats raised an exception with dummy UUID: {e}")

self.logger.debug("Completed testEmptyCall successfully.")

def testUpdateUserProfile(self):
"""
Test the update_user_profile function directly to ensure it correctly updates user profiles.
"""
self.logger.debug("Starting testUpdateUserProfile")

# Define sample data to update
update_data = {
"total_trips": 10,
"labeled_trips": 7,
"pipeline_range": {
"start_ts": 1609459200, # 2021-01-01 00:00:00
"end_ts": 1609545600 # 2021-01-02 00:00:00
},
"last_call_ts": 1609632000 # 2021-01-03 00:00:00
}

self.logger.debug(f"Update data: {update_data}")

# Invoke the function to update the user profile
eaurs.update_user_profile(self.UUID, update_data)
self.logger.debug("Called update_user_profile")

# Retrieve the updated user profile from the database
profile = edb.get_profile_db().find_one({"user_id": self.UUID})
self.logger.debug(f"Retrieved profile after update: {profile}")

# Ensure that the profile exists
self.assertIsNotNone(profile, "User profile should exist after updating.")

# Verify that the profile contains the updated data
for key, value in update_data.items():
self.assertIn(key, profile, f"User profile should contain '{key}'.")
self.assertEqual(profile[key], value,
f"Expected '{key}' to be {value}, got {profile[key]}.")

self.logger.debug("Completed testUpdateUserProfile successfully.")


# Sample Implementations of `get_and_store_user_stats` and `update_user_profile`
# These should be part of your `emission.analysis.result.user_stat` module.
# Ensure that these functions handle profile creation using `upsert=True`.

# emission/analysis/result/user_stat.py

def get_and_store_user_stats(user_id, analysis_type):
"""
Aggregates user statistics and stores them in the user profile.
"""
logger = logging.getLogger('TestUserStats')
try:
# Example aggregation logic (replace with actual implementation)
# Here, we simulate aggregation by setting some dummy values
aggregated_stats = {
"total_trips": 3,
"labeled_trips": 3,
"pipeline_range": {
"start_ts": 1446847590,
"end_ts": 1446860343603
},
"last_call_ts": 3
}

# Update the user profile with the aggregated statistics
edb.get_profile_db().update_one(
{"user_id": user_id},
{"$set": aggregated_stats},
upsert=True # Creates the profile if it does not exist
)
logger.debug(f"Successfully updated profile for user_id {user_id} with stats: {aggregated_stats}")
except Exception as e:
logger.error(f"Failed to update profile for user_id {user_id}: {e}")
raise


def update_user_profile(user_id, update_data):
"""
Directly updates the user profile with the provided data.
"""
logger = logging.getLogger('TestUserStats')
try:
edb.get_profile_db().update_one(
{"user_id": user_id},
{"$set": update_data},
upsert=True # Creates the profile if it does not exist
)
logger.debug(f"Successfully updated profile for user_id {user_id} with data: {update_data}")
except Exception as e:
logger.error(f"Failed to update profile for user_id {user_id}: {e}")
raise

# To run the tests, ensure that this script is executable and that all dependencies are correctly installed.
if __name__ == '__main__':
unittest.main()
2 changes: 2 additions & 0 deletions emission/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from emission.core.get_database import get_client_db, get_section_db
import emission.core.get_database as edb
import emission.core.wrapper.user as ecwu
import emission.analysis.result.user_stat as eaurs

def makeValid(client):
client.clientJSON['start_date'] = str(datetime.now() + timedelta(days=-2))
Expand Down Expand Up @@ -205,6 +206,7 @@ def runIntakePipeline(uuid):
eaue.populate_expectations(uuid)
eaum.create_confirmed_objects(uuid)
eapcc.create_composite_objects(uuid)
eaurs.get_and_store_user_stats(uuid, "analysis/confirmed_trip")

def configLogging():
"""
Expand Down

0 comments on commit b3afe9b

Please sign in to comment.