From 7487c82578e8933f4da8f9d3fa3522c102906c81 Mon Sep 17 00:00:00 2001 From: "K. Shankari" Date: Sun, 30 Oct 2016 19:38:00 -0700 Subject: [PATCH 1/3] Switch the stats to the timeseries, similar to the other data Back when we stored the raw data directly as sections and trips, we used to store the stats in a timeseries. Now that we are storing the raw data in a timeseries, and are planning to use the stats for analysis into user behavior, it would be more convenient to store the stats into the same timeseries. Then, the timeseries is really the a full representation of the source of truth, we can reuse similar code for analysis. This change: - defines a new wrapper class for the data - supports it in the entry and the timeseries - switches the server api tracking to the new timeseries - adds a backwards compat layer that stores old-style client stats to the new timeseries, while waiting for the clients to get updated In later changes: - migration script to migrate the older style stats to the new timeseries - new stats for the pipeline --- emission/core/timer.py | 17 ++ emission/core/wrapper/entry.py | 7 + emission/core/wrapper/statsevent.py | 35 +++ emission/net/api/cfc_webapp.py | 13 +- emission/net/api/stats.py | 214 ++++++++---------- emission/storage/decorations/stats_queries.py | 30 +++ .../storage/timeseries/builtin_timeseries.py | 16 ++ 7 files changed, 211 insertions(+), 121 deletions(-) create mode 100644 emission/core/timer.py create mode 100644 emission/core/wrapper/statsevent.py create mode 100644 emission/storage/decorations/stats_queries.py diff --git a/emission/core/timer.py b/emission/core/timer.py new file mode 100644 index 000000000..4e47c2293 --- /dev/null +++ b/emission/core/timer.py @@ -0,0 +1,17 @@ +from timeit import default_timer + +class Timer(object): + def __init__(self, verbose=False): + self.verbose = verbose + self.timer = default_timer + + def __enter__(self): + self.start = self.timer() + return self + + def __exit__(self, *args): + end = self.timer() + self.elapsed = end - self.start + self.elapsed_ms = self.elapsed * 1000 # millisecs + if self.verbose: + print 'elapsed time: %f ms' % self.elapsed_ms diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index 20e820bfd..fadbc188c 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -33,6 +33,13 @@ def _getData2Wrapper(): "config/sensor_config": "sensorconfig", "config/sync_config": "syncconfig", "config/consent": "consentconfig", + "stats/server_api_time": "statsevent", + "stats/server_api_error": "statsevent", + "stats/pipeline_time": "statsevent", + "stats/pipeline_error": "statsevent", + "stats/client_time": "statsevent", + "stats/client_nav_event": "statsevent", + "stats/client_error": "statsevent", "segmentation/raw_trip": "rawtrip", "segmentation/raw_place": "rawplace", "segmentation/raw_section": "section", diff --git a/emission/core/wrapper/statsevent.py b/emission/core/wrapper/statsevent.py new file mode 100644 index 000000000..140a1b1b6 --- /dev/null +++ b/emission/core/wrapper/statsevent.py @@ -0,0 +1,35 @@ +import logging +import emission.core.wrapper.wrapperbase as ecwb +import enum as enum + +# class StatsEvent(enum.Enum): +# """ +# Indicates that some event happened that we want to record. +# The event can have an associated duration, or can be a +# """ +# UNKNOWN = 0 +# DISCHARGING = 1 +# CHARGING = 2 +# FULL = 3 +# NOT_CHARGING = 4 # This is an android-only state - unsure how often we will encounter it +# + +class Statsevent(ecwb.WrapperBase): + # TODO: should this be a string or an enum + # making it an enum will require us to change code every time we add a new stat, but will + # make it easier to know the list of stats. Let's leave it as a string for now. + props = {"name": ecwb.WrapperBase.Access.WORM, # string representing the stat. + "reading": ecwb.WrapperBase.Access.WORM, # None or -1 if not present + "ts": ecwb.WrapperBase.Access.WORM, + "local_dt": ecwb.WrapperBase.Access.WORM, + "fmt_time": ecwb.WrapperBase.Access.WORM, + "client_app_version": ecwb.WrapperBase.Access.WORM, + "client_os_version": ecwb.WrapperBase.Access.WORM + } + enums = {} + geojson = [] + nullable = [] + local_dates = ['local_dt'] + + def _populateDependencies(self): + pass diff --git a/emission/net/api/cfc_webapp.py b/emission/net/api/cfc_webapp.py index 5c6d07182..a79a25415 100644 --- a/emission/net/api/cfc_webapp.py +++ b/emission/net/api/cfc_webapp.py @@ -42,9 +42,9 @@ import emission.core.wrapper.motionactivity as ecwm import emission.storage.timeseries.timequery as estt import emission.storage.timeseries.aggregate_timeseries as estag +import emission.core.timer as ect import emission.core.get_database as edb - config_file = open('conf/net/api/webserver.conf') config_data = json.load(config_file) static_path = config_data["paths"]["static_path"] @@ -573,16 +573,25 @@ def habiticaProxy(): def before_request(): print("START %s %s %s" % (datetime.now(), request.method, request.path)) request.params.start_ts = time.time() + request.params.timer = ect.Timer() + request.params.timer.__enter__() logging.debug("START %s %s" % (request.method, request.path)) @app.hook('after_request') def after_request(): msTimeNow = time.time() + request.params.timer.__exit__() duration = msTimeNow - request.params.start_ts + new_duration = request.params.timer.elapsed + if round(duration - new_duration, 3) > 0: + logging.error("old style duration %s != timer based duration %s" % (duration, new_duration)) + stats.store_server_api_error(request.params.user_uuid, "MISMATCH_%s_%s" % + (request.method, request.path), msTimeNow, duration - new_duration) + print("END %s %s %s %s %s " % (datetime.now(), request.method, request.path, request.params.user_uuid, duration)) logging.debug("END %s %s %s %s " % (request.method, request.path, request.params.user_uuid, duration)) # Keep track of the time and duration for each call - stats.storeServerEntry(request.params.user_uuid, "%s %s" % (request.method, request.path), + stats.store_server_api_time(request.params.user_uuid, "%s_%s" % (request.method, request.path), msTimeNow, duration) # Auth helpers BEGIN diff --git a/emission/net/api/stats.py b/emission/net/api/stats.py index a2ef7c6e9..b162b7723 100644 --- a/emission/net/api/stats.py +++ b/emission/net/api/stats.py @@ -1,124 +1,100 @@ -# Standard imports import logging -import time - -# Our imports -from emission.core.get_database import get_client_stats_db_backup, get_server_stats_db_backup, get_result_stats_db_backup - -STAT_TRIP_MGR_PCT_SHOWN = "tripManager.pctShown" -STAT_TRIP_MGR_TRIPS_FOR_DAY = "tripManager.tripsForDay" - -STAT_MY_CARBON_FOOTPRINT = "footprint.my_carbon" -STAT_MY_CARBON_FOOTPRINT_NO_AIR = "footprint.my_carbon.no_air" -STAT_MY_OPTIMAL_FOOTPRINT = "footprint.optimal" -STAT_MY_OPTIMAL_FOOTPRINT_NO_AIR = "footprint.optimal.no_air" -STAT_MY_ALLDRIVE_FOOTPRINT = "footprint.alldrive" - -STAT_PCT_CLASSIFIED = "game.score.pct_classified" -STAT_MINE_MINUS_OPTIMAL = "game.score.mine_minus_optimal" -STAT_ALL_DRIVE_MINUS_MINE = "game.score.all_drive_minus_mine" -STAT_SB375_DAILY_GOAL = "game.score.sb375_daily_goal" - -STAT_MEAN_FOOTPRINT = "footprint.mean" -STAT_MEAN_FOOTPRINT_NO_AIR = "footprint.mean.no_air" -STAT_GAME_SCORE = "game.score" -STAT_VIEW_CHOICE = "view.choice" - -# Store client measurements (currently into the database, but maybe in a log -# file in the future). The format of the stats received from the client is very -# similar to the input to SMAP, to make it easier to store them in a SMAP -# database in the future -def setClientMeasurements(user, reportedVals): - logging.info("Received %d client keys and %d client readings for user %s" % (len(reportedVals['Readings']), - getClientMeasurementCount(reportedVals['Readings']), user)) - logging.debug("reportedVals = %s" % reportedVals) - metadata = reportedVals['Metadata'] - metadata['reported_ts'] = time.time() - stats = reportedVals['Readings'] - for key in stats: - values = stats[key] - for value in values: - storeClientEntry(user, key, value[0], value[1], metadata) - -# metadata format is -def storeClientEntry(user, key, ts, reading, metadata): - logging.debug("storing client entry for user %s, key %s at timestamp %s" % (user, key, ts)) - response = None - # float first, because int doesn't recognize floats represented as strings. - # Android timestamps are in milliseconds, while Giles expects timestamps to be - # in seconds, so divide by 1000 when you hit this case. - # ios timestamps are in seconds. - ts = float(ts) - - if ts > 9999999999: - ts = ts/1000 - - currEntry = createEntry(user, key, ts, reading) - # Add the os and app versions from the metadata dict - currEntry.update(metadata) - - try: - # response = get_client_stats_db().insert(currEntry) - get_client_stats_db_backup().insert(currEntry) - except Exception as e: - logging.debug("failed to store client entry for user %s, key %s at timestamp %s" % (user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_client_stats_db_backup().insert(currEntry) - return response != None - -# server measurements will call this directly since there's not much point in -# batching in a different location and then making a call here since it runs on -# the same server. Might change if we move engagement stats to a different server -# Note also that there is no server metadata since we currently have no -# versioning on the server. Should probably add some soon - -def storeServerEntry(user, key, ts, reading): - logging.debug("storing server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - response = None - currEntry = createEntry(user, key, ts, reading) - - try: - # response = get_server_stats_db().insert(currEntry) - get_server_stats_db_backup().insert(currEntry) - except Exception as e: - logging.debug("failed to store server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_server_stats_db_backup().insert(currEntry) - - # Return boolean that tells you whether the insertion was successful or not - return response != None - -def storeResultEntry(user, key, ts, reading): - logging.debug("storing result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - response = None - - # Sometimes timestamp comes in as a float, represented as seconds.[somethign else]; truncate digits after the - # decimal - ts = int(ts) - currEntry = createEntry(user, key, ts, reading) - - try: - get_result_stats_db_backup().insert(currEntry) - # response = get_result_stats_db().insert(currEntry) - except Exception as e: - logging.debug("failed to store result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts)) - logging.debug("exception was: %s" % (e)) - get_result_stats_db_backup().insert(currEntry) - - # Return boolean that tells you whether the insertion was successful or not - return response != None - +import emission.storage.decorations.stats_queries as esds + +def store_server_api_time(user_id, call, ts, reading): + esds.store_server_api_time(user_id, call, ts, reading) + +def store_server_api_error(user_id, call, ts, reading): + esds.store_server_api_error(user_id, call, ts, reading) + +# Backward compat to store old-style client stats until the phone upgrade +# has been pushed out to all phones +def setClientMeasurements(user_id, reportedVals): + logging.info("Received %d client keys and %d client readings for user_id %s" % (len(reportedVals['Readings']), + getClientMeasurementCount(reportedVals['Readings']), user_id)) + logging.debug("reportedVals = %s" % reportedVals) + metadata = reportedVals['Metadata'] + stats = reportedVals['Readings'] + for key in stats: + values = stats[key] + for value in values: + storeClientEntry(user_id, key, value[0], value[1], metadata) def getClientMeasurementCount(readings): - retSum = 0 - for currReading in readings: - currArray = readings[currReading] - # logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray))) - retSum = retSum + len(currArray) - return retSum + retSum = 0 + for currReading in readings: + currArray = readings[currReading] + # logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray))) + retSum += len(currArray) + return retSum + +def storeClientEntry(user_id, key, ts, reading, metadata): + logging.debug("storing client entry for user_id %s, key %s at timestamp %s" % (user_id, key, ts)) + + old_style_data = createEntry(user_id, key, ts, reading) + old_style_data.update(metadata) + save_to_timeseries(old_style_data) + +def save_to_timeseries(old_style_data): + import emission.storage.timeseries.abstract_timeseries as esta + + user_id = old_style_data["user"] + new_entry = old2new(old_style_data) + return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + +def old2new(old_style_data): + import emission.core.wrapper.entry as ecwe + import emission.core.wrapper.statsevent as ecws + import emission.core.wrapper.battery as ecwb + + int_with_none = lambda s: float(s) if s is not None else None + + user_id = old_style_data["user"] + del old_style_data["user"] + if old_style_data["stat"] == "battery_level": + new_style_data = ecwb.Battery({ + "battery_level_pct" : int_with_none(old_style_data["reading"]), + "ts": old_style_data["ts"] + }) + new_key = "background/battery" + else: + new_style_data = ecws.Statsevent() + new_style_data.name = old_style_data["stat"] + new_style_data.ts = old_style_data["ts"] + new_style_data.reading = int_with_none(old_style_data["reading"]) + new_style_data.client_app_version = old_style_data["client_app_version"] + new_style_data.client_os_version = old_style_data["client_os_version"] + new_key = stat2key(old_style_data["stat"]) + + new_entry = ecwe.Entry.create_entry(user_id, new_key, new_style_data) + # For legacy entries, make sure that the write_ts doesn't become the conversion + # time or the server arrival time + new_entry["metadata"]["write_ts"] = new_style_data.ts + del new_entry["metadata"]["write_local_dt"] + del new_entry["metadata"]["write_fmt_time"] + # We are not going to fill in the local_date and fmt_time entries because + # a) we don't know what they are for legacy entries + # b) we don't even know whether we need to use them + # c) even if we do, we don't know if we need to use them for older entries + # So let's leave the hacky reconstruction algorithm until we know that we really need it + return new_entry + +def stat2key(stat_name): + stat_name_mapping = { + "app_launched": "stats/client_nav_event", + "push_stats_duration": "stats/client_time", + "sync_duration": "stats/client_time", + "sync_launched": "stats/client_nav_event", + "button_sync_forced": "stats/client_nav_event", + "sync_pull_list_size": "stats/client_time", + "pull_duration": "stats/client_time" + } + return stat_name_mapping[stat_name] + def createEntry(user, stat, ts, reading): - return {'user': user, - 'stat': stat, - 'ts': ts, - 'reading': reading} + return {'user': user, + 'stat': stat, + 'ts': float(ts), + 'reading': reading} + diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py new file mode 100644 index 000000000..8f9b4a91b --- /dev/null +++ b/emission/storage/decorations/stats_queries.py @@ -0,0 +1,30 @@ +# Standard imports +import logging +import time + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.entry as ecwe +from emission.core.get_database import get_client_stats_db_backup, get_server_stats_db_backup, get_result_stats_db_backup + + +# metadata format is + +def store_server_api_time(user_id, call, ts, reading): + data = { + "name": call, + "ts": ts, + "reading": reading + } + new_entry = ecwe.Entry.create_entry(user_id, "stats/server_api_time", data) + return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + +def store_server_api_error(user_id, call, ts, reading): + data = { + "name": call, + "ts": ts, + "reading": reading + } + new_entry = ecwe.Entry.create_entry(user_id, "stats/server_api_error", data) + return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index a454d8b8d..9e2eebe68 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -21,6 +21,16 @@ def __init__(self, user_id): self.user_query = {"user_id": self.user_id} # UUID is mandatory for this version self.timeseries_db = ts_enum_map[esta.EntryType.DATA_TYPE] self.analysis_timeseries_db = ts_enum_map[esta.EntryType.ANALYSIS_TYPE] + # Design question: Should the stats be a separate database, or should it be part + # of the timeseries database? Technically, it should be part of the timeseries + # database. However, I am concerned about the performance of the database + # with even more entries - it already takes 10 seconds to query for a document + # and I am not sure that adding a ton more data is going to make that better + # it is going to be easier to copy entries into the same database instead of + # splitting out, and we already support multiple indices, so I am tempted to put + # it separately. On the other hand, then the load/store from timeseries won't work + # if it is a separate database. Let's do the right thing and change the storage/ + # shift to a different timeseries if we need to self.ts_map = { "background/location": self.timeseries_db, "background/filtered_location": self.timeseries_db, @@ -30,6 +40,12 @@ def __init__(self, user_id): "config/sensor_config": self.timeseries_db, "config/sync_config": self.timeseries_db, "config/consent": self.timeseries_db, + "stats/server_api_time": self.timeseries_db, + "stats/server_api_error": self.timeseries_db, + "stats/pipeline_time": self.timeseries_db, + "stats/pipeline_error": self.timeseries_db, + "stats/client_time": self.timeseries_db, + "stats/client_nav_event": self.timeseries_db, "segmentation/raw_trip": self.analysis_timeseries_db, "segmentation/raw_place": self.analysis_timeseries_db, "segmentation/raw_section": self.analysis_timeseries_db, From ca7ce14fac190db96fda415eb6b6e9c5fcff4203 Mon Sep 17 00:00:00 2001 From: "K. Shankari" Date: Sun, 30 Oct 2016 20:46:04 -0700 Subject: [PATCH 2/3] Add stats for the time taken for each stage of the pipeline This can help us track which areas we need to focus on for performance improvements. --- emission/pipeline/intake_stage.py | 83 +++++++++++++------ emission/pipeline/scheduler.py | 2 +- emission/storage/decorations/stats_queries.py | 31 ++++--- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 4b3fc1dbb..a6bc93f0c 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -3,8 +3,12 @@ import numpy as np import arrow from uuid import UUID +import time import emission.core.get_database as edb +import emission.core.timer as ect + +import emission.core.wrapper.pipelinestate as ecwp import emission.net.usercache.abstract_usercache_handler as euah import emission.net.usercache.abstract_usercache as enua @@ -18,6 +22,8 @@ import emission.analysis.intake.cleaning.clean_and_resample as eaicr import emission.net.ext_service.habitica.executor as autocheck +import emission.storage.decorations.stats_queries as esds + def run_intake_pipeline(process_number, uuid_list): """ @@ -56,16 +62,21 @@ def run_intake_pipeline(process_number, uuid_list): try: run_intake_pipeline_for_user(uuid) except Exception as e: + esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) def run_intake_pipeline_for_user(uuid): uh = euah.UserCacheHandler.getUserCacheHandler(uuid) - logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + with ect.Timer() as uct: + logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) + uh.moveToLongTerm() + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, + time.time(), uct.elapsed) - uh.moveToLongTerm() # Hack until we delete these spurious entries # https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868 @@ -74,36 +85,58 @@ def run_intake_pipeline_for_user(uuid): logging.debug("Found no entries for %s, skipping" % uuid) return - logging.info("*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) - eaicf.filter_accuracy(uuid) + with ect.Timer() as aft: + logging.info("*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: filter accuracy if needed" % uuid + "*" * 10) + eaicf.filter_accuracy(uuid) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.ACCURACY_FILTERING.name, + time.time(), aft.elapsed) + + with ect.Timer() as tst: + logging.info("*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) + eaist.segment_current_trips(uuid) - - logging.info("*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into trips" % uuid + "*" * 10) - eaist.segment_current_trips(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.TRIP_SEGMENTATION.name, + time.time(), tst.elapsed) + with ect.Timer() as sst: + logging.info("*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) + eaiss.segment_current_sections(uuid) - logging.info("*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: segmenting into sections" % uuid + "*" * 10) - eaiss.segment_current_sections(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.SECTION_SEGMENTATION.name, + time.time(), sst.elapsed) + with ect.Timer() as jst: + logging.info("*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) + eaicl.filter_current_sections(uuid) - logging.info("*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: smoothing sections" % uuid + "*" * 10) - eaicl.filter_current_sections(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.JUMP_SMOOTHING.name, + time.time(), jst.elapsed) + with ect.Timer() as crt: + logging.info("*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) + eaicr.clean_and_resample(uuid) - logging.info("*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: cleaning and resampling timeline" % uuid + "*" * 10) - eaicr.clean_and_resample(uuid) + esds.store_pipeline_time(uuid, ecwp.PipelineStages.CLEAN_RESAMPLING.name, + time.time(), crt.elapsed) + with ect.Timer() as act: + logging.info("*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) + autocheck.give_points_for_all_tasks(uuid) - logging.info("*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10) - autocheck.give_points_for_all_tasks(uuid) + esds.store_pipeline_time(uuid, "AUTOCHECK_POINTS", + time.time(), act.elapsed) + with ect.Timer() as ogt: + logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) + uh.storeViewsToCache() - logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - uh.storeViewsToCache() + esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name, + time.time(), ogt.elapsed) diff --git a/emission/pipeline/scheduler.py b/emission/pipeline/scheduler.py index 80095dd67..f3c1f2bdc 100644 --- a/emission/pipeline/scheduler.py +++ b/emission/pipeline/scheduler.py @@ -37,7 +37,7 @@ def get_split_uuid_lists(n_splits, is_public_pipeline): sel_uuids = [u for u in all_uuids if u in estag.TEST_PHONE_IDS] else: sel_uuids = [u for u in all_uuids if u not in estag.TEST_PHONE_IDS] - # Add back the test phones for now so that we can test the data + # Add back the test phones for now so that we can test the data # collection changes before deploying them in the wild sel_uuids.extend(TEMP_HANDLED_PUBLIC_PHONES) diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py index 8f9b4a91b..9f921664f 100644 --- a/emission/storage/decorations/stats_queries.py +++ b/emission/storage/decorations/stats_queries.py @@ -5,26 +5,37 @@ # Our imports import emission.storage.timeseries.abstract_timeseries as esta import emission.core.wrapper.entry as ecwe -from emission.core.get_database import get_client_stats_db_backup, get_server_stats_db_backup, get_result_stats_db_backup # metadata format is def store_server_api_time(user_id, call, ts, reading): - data = { - "name": call, - "ts": ts, - "reading": reading - } - new_entry = ecwe.Entry.create_entry(user_id, "stats/server_api_time", data) - return esta.TimeSeries.get_time_series(user_id).insert(new_entry) + store_stats_entry(user_id, "stats/server_api_time", call, ts, reading) def store_server_api_error(user_id, call, ts, reading): + store_stats_entry(user_id, "stats/server_api_error", call, ts, reading) + +def store_pipeline_time(user_id, stage_string, ts, reading): + """ + + :param user_id: id of the user + :param stage: string representing a particular time. Typically the stage name, + but can also be used for sub sections of a stage + :param ts: timestamp at the time of the reading + :param reading: the duration of the stage in ms + :return: + """ + store_stats_entry(user_id, "stats/pipeline_time", stage_string, ts, reading) + +def store_pipeline_error(user_id, stage_string, ts, reading): + store_stats_entry(user_id, "stats/pipeline_error", stage_string, ts, reading) + +def store_stats_entry(user_id, metadata_key, name, ts, reading): data = { - "name": call, + "name": name, "ts": ts, "reading": reading } - new_entry = ecwe.Entry.create_entry(user_id, "stats/server_api_error", data) + new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data) return esta.TimeSeries.get_time_series(user_id).insert(new_entry) From 4878654bd897da4a5741e88ecb439c177acabc90 Mon Sep 17 00:00:00 2001 From: "K. Shankari" Date: Sun, 30 Oct 2016 20:59:40 -0700 Subject: [PATCH 3/3] Restore some aspects of the previous interface To keep the old, obsolete code happy. Implementations are dummy since they are effectively unused. Will purge them in a giant lump over December break. --- emission/net/api/stats.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/emission/net/api/stats.py b/emission/net/api/stats.py index b162b7723..5f95c9550 100644 --- a/emission/net/api/stats.py +++ b/emission/net/api/stats.py @@ -98,3 +98,30 @@ def createEntry(user, stat, ts, reading): 'ts': float(ts), 'reading': reading} +# Dummy functions to keep the old, obsolete code happy. +# Will do a big purge over winter break + +STAT_TRIP_MGR_PCT_SHOWN = "tripManager.pctShown" +STAT_TRIP_MGR_TRIPS_FOR_DAY = "tripManager.tripsForDay" + +STAT_MY_CARBON_FOOTPRINT = "footprint.my_carbon" +STAT_MY_CARBON_FOOTPRINT_NO_AIR = "footprint.my_carbon.no_air" +STAT_MY_OPTIMAL_FOOTPRINT = "footprint.optimal" +STAT_MY_OPTIMAL_FOOTPRINT_NO_AIR = "footprint.optimal.no_air" +STAT_MY_ALLDRIVE_FOOTPRINT = "footprint.alldrive" + +STAT_PCT_CLASSIFIED = "game.score.pct_classified" +STAT_MINE_MINUS_OPTIMAL = "game.score.mine_minus_optimal" +STAT_ALL_DRIVE_MINUS_MINE = "game.score.all_drive_minus_mine" +STAT_SB375_DAILY_GOAL = "game.score.sb375_daily_goal" + +STAT_MEAN_FOOTPRINT = "footprint.mean" +STAT_MEAN_FOOTPRINT_NO_AIR = "footprint.mean.no_air" +STAT_GAME_SCORE = "game.score" +STAT_VIEW_CHOICE = "view.choice" + +def storeServerEntry(user, stat, ts, reading): + pass + +def storeResultEntry(user, stat, ts, reading): + pass