diff --git a/emission/core/timer.py b/emission/core/timer.py new file mode 100644 index 000000000..4e47c2293 --- /dev/null +++ b/emission/core/timer.py @@ -0,0 +1,17 @@ +from timeit import default_timer + +class Timer(object): + def __init__(self, verbose=False): + self.verbose = verbose + self.timer = default_timer + + def __enter__(self): + self.start = self.timer() + return self + + def __exit__(self, *args): + end = self.timer() + self.elapsed = end - self.start + self.elapsed_ms = self.elapsed * 1000 # millisecs + if self.verbose: + print 'elapsed time: %f ms' % self.elapsed_ms diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index 20e820bfd..fadbc188c 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -33,6 +33,13 @@ def _getData2Wrapper(): "config/sensor_config": "sensorconfig", "config/sync_config": "syncconfig", "config/consent": "consentconfig", + "stats/server_api_time": "statsevent", + "stats/server_api_error": "statsevent", + "stats/pipeline_time": "statsevent", + "stats/pipeline_error": "statsevent", + "stats/client_time": "statsevent", + "stats/client_nav_event": "statsevent", + "stats/client_error": "statsevent", "segmentation/raw_trip": "rawtrip", "segmentation/raw_place": "rawplace", "segmentation/raw_section": "section", diff --git a/emission/core/wrapper/statsevent.py b/emission/core/wrapper/statsevent.py new file mode 100644 index 000000000..140a1b1b6 --- /dev/null +++ b/emission/core/wrapper/statsevent.py @@ -0,0 +1,35 @@ +import logging +import emission.core.wrapper.wrapperbase as ecwb +import enum as enum + +# class StatsEvent(enum.Enum): +# """ +# Indicates that some event happened that we want to record. +# The event can have an associated duration, or can be a +# """ +# UNKNOWN = 0 +# DISCHARGING = 1 +# CHARGING = 2 +# FULL = 3 +# NOT_CHARGING = 4 # This is an android-only state - unsure how often we will encounter it +# + +class Statsevent(ecwb.WrapperBase): + # TODO: should this be a string or an enum + # making it an enum will require us to change code every time we add a new stat, but will + # make it easier to know the list of stats. Let's leave it as a string for now. + props = {"name": ecwb.WrapperBase.Access.WORM, # string representing the stat. + "reading": ecwb.WrapperBase.Access.WORM, # None or -1 if not present + "ts": ecwb.WrapperBase.Access.WORM, + "local_dt": ecwb.WrapperBase.Access.WORM, + "fmt_time": ecwb.WrapperBase.Access.WORM, + "client_app_version": ecwb.WrapperBase.Access.WORM, + "client_os_version": ecwb.WrapperBase.Access.WORM + } + enums = {} + geojson = [] + nullable = [] + local_dates = ['local_dt'] + + def _populateDependencies(self): + pass diff --git a/emission/net/api/cfc_webapp.py b/emission/net/api/cfc_webapp.py index 5c6d07182..a79a25415 100644 --- a/emission/net/api/cfc_webapp.py +++ b/emission/net/api/cfc_webapp.py @@ -42,9 +42,9 @@ import emission.core.wrapper.motionactivity as ecwm import emission.storage.timeseries.timequery as estt import emission.storage.timeseries.aggregate_timeseries as estag +import emission.core.timer as ect import emission.core.get_database as edb - config_file = open('conf/net/api/webserver.conf') config_data = json.load(config_file) static_path = config_data["paths"]["static_path"] @@ -573,16 +573,25 @@ def habiticaProxy(): def before_request(): print("START %s %s %s" % (datetime.now(), request.method, request.path)) request.params.start_ts = time.time() + request.params.timer = ect.Timer() + request.params.timer.__enter__() logging.debug("START %s %s" % (request.method, request.path)) @app.hook('after_request') def after_request(): msTimeNow = time.time() + request.params.timer.__exit__() duration = msTimeNow - request.params.start_ts + new_duration = request.params.timer.elapsed + if round(duration - new_duration, 3) > 0: + logging.error("old style duration %s != timer based duration %s" % (duration, new_duration)) + stats.store_server_api_error(request.params.user_uuid, "MISMATCH_%s_%s" % + (request.method, request.path), msTimeNow, duration - new_duration) + print("END %s %s %s %s %s " % (datetime.now(), request.method, request.path, request.params.user_uuid, duration)) logging.debug("END %s %s %s %s " % (request.method, request.path, request.params.user_uuid, duration)) # Keep track of the time and duration for each call - stats.storeServerEntry(request.params.user_uuid, "%s %s" % (request.method, request.path), + stats.store_server_api_time(request.params.user_uuid, "%s_%s" % (request.method, request.path), msTimeNow, duration) # Auth helpers BEGIN diff --git a/emission/net/api/stats.py b/emission/net/api/stats.py index a2ef7c6e9..5f95c9550 100644 --- a/emission/net/api/stats.py +++ b/emission/net/api/stats.py @@ -1,9 +1,105 @@ -# Standard imports import logging -import time +import emission.storage.decorations.stats_queries as esds + +def store_server_api_time(user_id, call, ts, reading): + esds.store_server_api_time(user_id, call, ts, reading) + +def store_server_api_error(user_id, call, ts, reading): + esds.store_server_api_error(user_id, call, ts, reading) + +# Backward compat to store old-style client stats until the phone upgrade +# has been pushed out to all phones +def setClientMeasurements(user_id, reportedVals): + logging.info("Received %d client keys and %d client readings for user_id %s" % (len(reportedVals['Readings']), + getClientMeasurementCount(reportedVals['Readings']), user_id)) + logging.debug("reportedVals = %s" % reportedVals) + metadata = reportedVals['Metadata'] + stats = reportedVals['Readings'] + for key in stats: + values = stats[key] + for value in values: + storeClientEntry(user_id, key, value[0], value[1], metadata) -# Our imports -from emission.core.get_database import get_client_stats_db_backup, get_server_stats_db_backup, get_result_stats_db_backup +def getClientMeasurementCount(readings): + retSum = 0 + for currReading in readings: + currArray = readings[currReading] + # logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray))) + retSum += len(currArray) + return retSum + +def storeClientEntry(user_id, key, ts, reading, metadata): + logging.debug("storing client entry for user_id %s, key %s at timestamp %s" % (user_id, key, ts)) + + old_style_data = createEntry(user_id, key, ts, reading) + old_style_data.update(metadata) + save_to_timeseries(old_style_data) + +def save_to_timeseries(old_style_data): + import emission.storage.timeseries.abstract_timeseries as esta + + user_id = old_style_data["user"] + new_entry = old2new(old_style_data) + return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + +def old2new(old_style_data): + import emission.core.wrapper.entry as ecwe + import emission.core.wrapper.statsevent as ecws + import emission.core.wrapper.battery as ecwb + + int_with_none = lambda s: float(s) if s is not None else None + + user_id = old_style_data["user"] + del old_style_data["user"] + if old_style_data["stat"] == "battery_level": + new_style_data = ecwb.Battery({ + "battery_level_pct" : int_with_none(old_style_data["reading"]), + "ts": old_style_data["ts"] + }) + new_key = "background/battery" + else: + new_style_data = ecws.Statsevent() + new_style_data.name = old_style_data["stat"] + new_style_data.ts = old_style_data["ts"] + new_style_data.reading = int_with_none(old_style_data["reading"]) + new_style_data.client_app_version = old_style_data["client_app_version"] + new_style_data.client_os_version = old_style_data["client_os_version"] + new_key = stat2key(old_style_data["stat"]) + + new_entry = ecwe.Entry.create_entry(user_id, new_key, new_style_data) + # For legacy entries, make sure that the write_ts doesn't become the conversion + # time or the server arrival time + new_entry["metadata"]["write_ts"] = new_style_data.ts + del new_entry["metadata"]["write_local_dt"] + del new_entry["metadata"]["write_fmt_time"] + # We are not going to fill in the local_date and fmt_time entries because + # a) we don't know what they are for legacy entries + # b) we don't even know whether we need to use them + # c) even if we do, we don't know if we need to use them for older entries + # So let's leave the hacky reconstruction algorithm until we know that we really need it + return new_entry + +def stat2key(stat_name): + stat_name_mapping = { + "app_launched": "stats/client_nav_event", + "push_stats_duration": "stats/client_time", + "sync_duration": "stats/client_time", + "sync_launched": "stats/client_nav_event", + "button_sync_forced": "stats/client_nav_event", + "sync_pull_list_size": "stats/client_time", + "pull_duration": "stats/client_time" + } + return stat_name_mapping[stat_name] + + +def createEntry(user, stat, ts, reading): + return {'user': user, + 'stat': stat, + 'ts': float(ts), + 'reading': reading} + +# Dummy functions to keep the old, obsolete code happy. +# Will do a big purge over winter break STAT_TRIP_MGR_PCT_SHOWN = "tripManager.pctShown" STAT_TRIP_MGR_TRIPS_FOR_DAY = "tripManager.tripsForDay" @@ -24,101 +120,8 @@ STAT_GAME_SCORE = "game.score" STAT_VIEW_CHOICE = "view.choice" -# Store client measurements (currently into the database, but maybe in a log -# file in the future). The format of the stats received from the client is very -# similar to the input to SMAP, to make it easier to store them in a SMAP -# database in the future -def setClientMeasurements(user, reportedVals): - logging.info("Received %d client keys and %d client readings for user %s" % (len(reportedVals['Readings']), - getClientMeasurementCount(reportedVals['Readings']), user)) - logging.debug("reportedVals = %s" % reportedVals) - metadata = reportedVals['Metadata'] - metadata['reported_ts'] = time.time() - stats = reportedVals['Readings'] - for key in stats: - values = stats[key] - for value in values: - storeClientEntry(user, key, value[0], value[1], metadata) - -# metadata format is -def storeClientEntry(user, key, ts, reading, metadata): - logging.debug("storing client entry for user %s, key %s at timestamp %s" % (user, key, ts)) - response = None - # float first, because int doesn't recognize floats represented as strings. - # Android timestamps are in milliseconds, while Giles expects timestamps to be - # in seconds, so divide by 1000 when you hit this case. - # ios timestamps are in seconds. - ts = float(ts) - - if ts > 9999999999: - ts = ts/1000 - - currEntry = createEntry(user, key, ts, reading) - # Add the os and app versions from the metadata dict - currEntry.update(metadata) - - try: - # response = get_client_stats_db().insert(currEntry) - get_client_stats_db_backup().insert(currEntry) - except Exception as e: - logging.debug("failed to store client entry for user %s, key %s at timestamp %s" % (user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_client_stats_db_backup().insert(currEntry) - return response != None - -# server measurements will call this directly since there's not much point in -# batching in a different location and then making a call here since it runs on -# the same server. Might change if we move engagement stats to a different server -# Note also that there is no server metadata since we currently have no -# versioning on the server. Should probably add some soon - -def storeServerEntry(user, key, ts, reading): - logging.debug("storing server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - response = None - currEntry = createEntry(user, key, ts, reading) - - try: - # response = get_server_stats_db().insert(currEntry) - get_server_stats_db_backup().insert(currEntry) - except Exception as e: - logging.debug("failed to store server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_server_stats_db_backup().insert(currEntry) - - # Return boolean that tells you whether the insertion was successful or not - return response != None - -def storeResultEntry(user, key, ts, reading): - logging.debug("storing result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - response = None - - # Sometimes timestamp comes in as a float, represented as seconds.[somethign else]; truncate digits after the - # decimal - ts = int(ts) - currEntry = createEntry(user, key, ts, reading) - - try: - get_result_stats_db_backup().insert(currEntry) - # response = get_result_stats_db().insert(currEntry) - except Exception as e: - logging.debug("failed to store result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_result_stats_db_backup().insert(currEntry) - - # Return boolean that tells you whether the insertion was successful or not - return response != None +def storeServerEntry(user, stat, ts, reading): + pass - -def getClientMeasurementCount(readings): - retSum = 0 - for currReading in readings: - currArray = readings[currReading] - # logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray))) - retSum = retSum + len(currArray) - return retSum - -def createEntry(user, stat, ts, reading): - return {'user': user, - 'stat': stat, - 'ts': ts, - 'reading': reading} +def storeResultEntry(user, stat, ts, reading): + pass diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 4b3fc1dbb..a6bc93f0c 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -3,8 +3,12 @@ import numpy as np import arrow from uuid import UUID +import time import emission.core.get_database as edb +import emission.core.timer as ect + +import emission.core.wrapper.pipelinestate as ecwp import emission.net.usercache.abstract_usercache_handler as euah import emission.net.usercache.abstract_usercache as enua @@ -18,6 +22,8 @@ import emission.analysis.intake.cleaning.clean_and_resample as eaicr import emission.net.ext_service.habitica.executor as autocheck +import emission.storage.decorations.stats_queries as esds + def run_intake_pipeline(process_number, uuid_list): """ @@ -56,16 +62,21 @@ def run_intake_pipeline(process_number, uuid_list): try: run_intake_pipeline_for_user(uuid) except Exception as e: + esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) def run_intake_pipeline_for_user(uuid): uh = euah.UserCacheHandler.getUserCacheHandler(uuid) - logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + with ect.Timer() as uct: + logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + uh.moveToLongTerm() + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, + time.time(), uct.elapsed) - uh.moveToLongTerm() # Hack until we delete these spurious entries # https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868 @@ -74,36 +85,58 @@ def run_intake_pipeline_for_user(uuid): logging.debug("Found no entries for %s, skipping" % uuid) return - logging.info("*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) - eaicf.filter_accuracy(uuid) + with ect.Timer() as aft: + logging.info("*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) + eaicf.filter_accuracy(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.ACCURACY_FILTERING.name, + time.time(), aft.elapsed) + + with ect.Timer() as tst: + logging.info("*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) + eaist.segment_current_trips(uuid) - - logging.info("*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) - eaist.segment_current_trips(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.TRIP_SEGMENTATION.name, + time.time(), tst.elapsed) + with ect.Timer() as sst: + logging.info("*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) + eaiss.segment_current_sections(uuid) - logging.info("*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) - eaiss.segment_current_sections(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.SECTION_SEGMENTATION.name, + time.time(), sst.elapsed) + with ect.Timer() as jst: + logging.info("*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) + eaicl.filter_current_sections(uuid) - logging.info("*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) - eaicl.filter_current_sections(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.JUMP_SMOOTHING.name, + time.time(), jst.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) + eaicr.clean_and_resample(uuid) - logging.info("*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) - eaicr.clean_and_resample(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.CLEAN_RESAMPLING.name, + time.time(), crt.elapsed) + with ect.Timer() as act: + logging.info("*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) + autocheck.give_points_for_all_tasks(uuid) - logging.info("*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) - autocheck.give_points_for_all_tasks(uuid) + esds.store_pipeline_time(uuid, "AUTOCHECK_POINTS", + time.time(), act.elapsed) + with ect.Timer() as ogt: + logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) + uh.storeViewsToCache() - logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - uh.storeViewsToCache() + esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name, + time.time(), ogt.elapsed) diff --git a/emission/pipeline/scheduler.py b/emission/pipeline/scheduler.py index 80095dd67..f3c1f2bdc 100644 --- a/emission/pipeline/scheduler.py +++ b/emission/pipeline/scheduler.py @@ -37,7 +37,7 @@ def get_split_uuid_lists(n_splits, is_public_pipeline): sel_uuids = [u for u in all_uuids if u in estag.TEST_PHONE_IDS] else: sel_uuids = [u for u in all_uuids if u not in estag.TEST_PHONE_IDS] - # Add back the test phones for now so that we can test the data + # Add back the test phones for now so that we can test the data # collection changes before deploying them in the wild sel_uuids.extend(TEMP_HANDLED_PUBLIC_PHONES) diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py new file mode 100644 index 000000000..9f921664f --- /dev/null +++ b/emission/storage/decorations/stats_queries.py @@ -0,0 +1,41 @@ +# Standard imports +import logging +import time + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.entry as ecwe + + +# metadata format is + +def store_server_api_time(user_id, call, ts, reading): + store_stats_entry(user_id, "stats/server_api_time", call, ts, reading) + +def store_server_api_error(user_id, call, ts, reading): + store_stats_entry(user_id, "stats/server_api_error", call, ts, reading) + +def store_pipeline_time(user_id, stage_string, ts, reading): + """ + + :param user_id: id of the user + :param stage: string representing a particular time. Typically the stage name, + but can also be used for sub sections of a stage + :param ts: timestamp at the time of the reading + :param reading: the duration of the stage in ms + :return: + """ + store_stats_entry(user_id, "stats/pipeline_time", stage_string, ts, reading) + +def store_pipeline_error(user_id, stage_string, ts, reading): + store_stats_entry(user_id, "stats/pipeline_error", stage_string, ts, reading) + +def store_stats_entry(user_id, metadata_key, name, ts, reading): + data = { + "name": name, + "ts": ts, + "reading": reading + } + new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data) + return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index a454d8b8d..9e2eebe68 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -21,6 +21,16 @@ def __init__(self, user_id): self.user_query = {"user_id": self.user_id} # UUID is mandatory for this version self.timeseries_db = ts_enum_map[esta.EntryType.DATA_TYPE] self.analysis_timeseries_db = ts_enum_map[esta.EntryType.ANALYSIS_TYPE] + # Design question: Should the stats be a separate database, or should it be part + # of the timeseries database? Technically, it should be part of the timeseries + # database. However, I am concerned about the performance of the database + # with even more entries - it already takes 10 seconds to query for a document + # and I am not sure that adding a ton more data is going to make that better + # it is going to be easier to copy entries into the same database instead of + # splitting out, and we already support multiple indices, so I am tempted to put + # it separately. On the other hand, then the load/store from timeseries won't work + # if it is a separate database. Let's do the right thing and change the storage/ + # shift to a different timeseries if we need to self.ts_map = { "background/location": self.timeseries_db, "background/filtered_location": self.timeseries_db, @@ -30,6 +40,12 @@ def __init__(self, user_id): "config/sensor_config": self.timeseries_db, "config/sync_config": self.timeseries_db, "config/consent": self.timeseries_db, + "stats/server_api_time": self.timeseries_db, + "stats/server_api_error": self.timeseries_db, + "stats/pipeline_time": self.timeseries_db, + "stats/pipeline_error": self.timeseries_db, + "stats/client_time": self.timeseries_db, + "stats/client_nav_event": self.timeseries_db, "segmentation/raw_trip": self.analysis_timeseries_db, "segmentation/raw_place": self.analysis_timeseries_db, "segmentation/raw_section": self.analysis_timeseries_db,