Skip to content

Commit

Permalink
Merge pull request #443 from shankari/move_stats_to_timeseries
Browse files Browse the repository at this point in the history
Switch the stats to the timeseries, similar to the other data
  • Loading branch information
shankari authored Oct 31, 2016
2 parents f009ded + 4878654 commit cdfad07
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 129 deletions.
17 changes: 17 additions & 0 deletions emission/core/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from timeit import default_timer

class Timer(object):
def __init__(self, verbose=False):
self.verbose = verbose
self.timer = default_timer

def __enter__(self):
self.start = self.timer()
return self

def __exit__(self, *args):
end = self.timer()
self.elapsed = end - self.start
self.elapsed_ms = self.elapsed * 1000 # millisecs
if self.verbose:
print 'elapsed time: %f ms' % self.elapsed_ms
7 changes: 7 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ def _getData2Wrapper():
"config/sensor_config": "sensorconfig",
"config/sync_config": "syncconfig",
"config/consent": "consentconfig",
"stats/server_api_time": "statsevent",
"stats/server_api_error": "statsevent",
"stats/pipeline_time": "statsevent",
"stats/pipeline_error": "statsevent",
"stats/client_time": "statsevent",
"stats/client_nav_event": "statsevent",
"stats/client_error": "statsevent",
"segmentation/raw_trip": "rawtrip",
"segmentation/raw_place": "rawplace",
"segmentation/raw_section": "section",
Expand Down
35 changes: 35 additions & 0 deletions emission/core/wrapper/statsevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
import emission.core.wrapper.wrapperbase as ecwb
import enum as enum

# class StatsEvent(enum.Enum):
# """
# Indicates that some event happened that we want to record.
# The event can have an associated duration, or can be a
# """
# UNKNOWN = 0
# DISCHARGING = 1
# CHARGING = 2
# FULL = 3
# NOT_CHARGING = 4 # This is an android-only state - unsure how often we will encounter it
#

class Statsevent(ecwb.WrapperBase):
# TODO: should this be a string or an enum
# making it an enum will require us to change code every time we add a new stat, but will
# make it easier to know the list of stats. Let's leave it as a string for now.
props = {"name": ecwb.WrapperBase.Access.WORM, # string representing the stat.
"reading": ecwb.WrapperBase.Access.WORM, # None or -1 if not present
"ts": ecwb.WrapperBase.Access.WORM,
"local_dt": ecwb.WrapperBase.Access.WORM,
"fmt_time": ecwb.WrapperBase.Access.WORM,
"client_app_version": ecwb.WrapperBase.Access.WORM,
"client_os_version": ecwb.WrapperBase.Access.WORM
}
enums = {}
geojson = []
nullable = []
local_dates = ['local_dt']

def _populateDependencies(self):
pass
13 changes: 11 additions & 2 deletions emission/net/api/cfc_webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import emission.core.wrapper.motionactivity as ecwm
import emission.storage.timeseries.timequery as estt
import emission.storage.timeseries.aggregate_timeseries as estag
import emission.core.timer as ect
import emission.core.get_database as edb


config_file = open('conf/net/api/webserver.conf')
config_data = json.load(config_file)
static_path = config_data["paths"]["static_path"]
Expand Down Expand Up @@ -573,16 +573,25 @@ def habiticaProxy():
def before_request():
print("START %s %s %s" % (datetime.now(), request.method, request.path))
request.params.start_ts = time.time()
request.params.timer = ect.Timer()
request.params.timer.__enter__()
logging.debug("START %s %s" % (request.method, request.path))

@app.hook('after_request')
def after_request():
msTimeNow = time.time()
request.params.timer.__exit__()
duration = msTimeNow - request.params.start_ts
new_duration = request.params.timer.elapsed
if round(duration - new_duration, 3) > 0:
logging.error("old style duration %s != timer based duration %s" % (duration, new_duration))
stats.store_server_api_error(request.params.user_uuid, "MISMATCH_%s_%s" %
(request.method, request.path), msTimeNow, duration - new_duration)

print("END %s %s %s %s %s " % (datetime.now(), request.method, request.path, request.params.user_uuid, duration))
logging.debug("END %s %s %s %s " % (request.method, request.path, request.params.user_uuid, duration))
# Keep track of the time and duration for each call
stats.storeServerEntry(request.params.user_uuid, "%s %s" % (request.method, request.path),
stats.store_server_api_time(request.params.user_uuid, "%s_%s" % (request.method, request.path),
msTimeNow, duration)

# Auth helpers BEGIN
Expand Down
205 changes: 104 additions & 101 deletions emission/net/api/stats.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,105 @@
# Standard imports
import logging
import time
import emission.storage.decorations.stats_queries as esds

def store_server_api_time(user_id, call, ts, reading):
esds.store_server_api_time(user_id, call, ts, reading)

def store_server_api_error(user_id, call, ts, reading):
esds.store_server_api_error(user_id, call, ts, reading)

# Backward compat to store old-style client stats until the phone upgrade
# has been pushed out to all phones
def setClientMeasurements(user_id, reportedVals):
logging.info("Received %d client keys and %d client readings for user_id %s" % (len(reportedVals['Readings']),
getClientMeasurementCount(reportedVals['Readings']), user_id))
logging.debug("reportedVals = %s" % reportedVals)
metadata = reportedVals['Metadata']
stats = reportedVals['Readings']
for key in stats:
values = stats[key]
for value in values:
storeClientEntry(user_id, key, value[0], value[1], metadata)

# Our imports
from emission.core.get_database import get_client_stats_db_backup, get_server_stats_db_backup, get_result_stats_db_backup
def getClientMeasurementCount(readings):
retSum = 0
for currReading in readings:
currArray = readings[currReading]
# logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray)))
retSum += len(currArray)
return retSum

def storeClientEntry(user_id, key, ts, reading, metadata):
logging.debug("storing client entry for user_id %s, key %s at timestamp %s" % (user_id, key, ts))

old_style_data = createEntry(user_id, key, ts, reading)
old_style_data.update(metadata)
save_to_timeseries(old_style_data)

def save_to_timeseries(old_style_data):
import emission.storage.timeseries.abstract_timeseries as esta

user_id = old_style_data["user"]
new_entry = old2new(old_style_data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)

def old2new(old_style_data):
import emission.core.wrapper.entry as ecwe
import emission.core.wrapper.statsevent as ecws
import emission.core.wrapper.battery as ecwb

int_with_none = lambda s: float(s) if s is not None else None

user_id = old_style_data["user"]
del old_style_data["user"]
if old_style_data["stat"] == "battery_level":
new_style_data = ecwb.Battery({
"battery_level_pct" : int_with_none(old_style_data["reading"]),
"ts": old_style_data["ts"]
})
new_key = "background/battery"
else:
new_style_data = ecws.Statsevent()
new_style_data.name = old_style_data["stat"]
new_style_data.ts = old_style_data["ts"]
new_style_data.reading = int_with_none(old_style_data["reading"])
new_style_data.client_app_version = old_style_data["client_app_version"]
new_style_data.client_os_version = old_style_data["client_os_version"]
new_key = stat2key(old_style_data["stat"])

new_entry = ecwe.Entry.create_entry(user_id, new_key, new_style_data)
# For legacy entries, make sure that the write_ts doesn't become the conversion
# time or the server arrival time
new_entry["metadata"]["write_ts"] = new_style_data.ts
del new_entry["metadata"]["write_local_dt"]
del new_entry["metadata"]["write_fmt_time"]
# We are not going to fill in the local_date and fmt_time entries because
# a) we don't know what they are for legacy entries
# b) we don't even know whether we need to use them
# c) even if we do, we don't know if we need to use them for older entries
# So let's leave the hacky reconstruction algorithm until we know that we really need it
return new_entry

def stat2key(stat_name):
stat_name_mapping = {
"app_launched": "stats/client_nav_event",
"push_stats_duration": "stats/client_time",
"sync_duration": "stats/client_time",
"sync_launched": "stats/client_nav_event",
"button_sync_forced": "stats/client_nav_event",
"sync_pull_list_size": "stats/client_time",
"pull_duration": "stats/client_time"
}
return stat_name_mapping[stat_name]


def createEntry(user, stat, ts, reading):
return {'user': user,
'stat': stat,
'ts': float(ts),
'reading': reading}

# Dummy functions to keep the old, obsolete code happy.
# Will do a big purge over winter break

STAT_TRIP_MGR_PCT_SHOWN = "tripManager.pctShown"
STAT_TRIP_MGR_TRIPS_FOR_DAY = "tripManager.tripsForDay"
Expand All @@ -24,101 +120,8 @@
STAT_GAME_SCORE = "game.score"
STAT_VIEW_CHOICE = "view.choice"

# Store client measurements (currently into the database, but maybe in a log
# file in the future). The format of the stats received from the client is very
# similar to the input to SMAP, to make it easier to store them in a SMAP
# database in the future
def setClientMeasurements(user, reportedVals):
logging.info("Received %d client keys and %d client readings for user %s" % (len(reportedVals['Readings']),
getClientMeasurementCount(reportedVals['Readings']), user))
logging.debug("reportedVals = %s" % reportedVals)
metadata = reportedVals['Metadata']
metadata['reported_ts'] = time.time()
stats = reportedVals['Readings']
for key in stats:
values = stats[key]
for value in values:
storeClientEntry(user, key, value[0], value[1], metadata)

# metadata format is
def storeClientEntry(user, key, ts, reading, metadata):
logging.debug("storing client entry for user %s, key %s at timestamp %s" % (user, key, ts))
response = None
# float first, because int doesn't recognize floats represented as strings.
# Android timestamps are in milliseconds, while Giles expects timestamps to be
# in seconds, so divide by 1000 when you hit this case.
# ios timestamps are in seconds.
ts = float(ts)

if ts > 9999999999:
ts = ts/1000

currEntry = createEntry(user, key, ts, reading)
# Add the os and app versions from the metadata dict
currEntry.update(metadata)

try:
# response = get_client_stats_db().insert(currEntry)
get_client_stats_db_backup().insert(currEntry)
except Exception as e:
logging.debug("failed to store client entry for user %s, key %s at timestamp %s" % (user, key, ts))
logging.debug("exception was: %s" % (e))
get_client_stats_db_backup().insert(currEntry)
return response != None

# server measurements will call this directly since there's not much point in
# batching in a different location and then making a call here since it runs on
# the same server. Might change if we move engagement stats to a different server
# Note also that there is no server metadata since we currently have no
# versioning on the server. Should probably add some soon

def storeServerEntry(user, key, ts, reading):
logging.debug("storing server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts))
response = None
currEntry = createEntry(user, key, ts, reading)

try:
# response = get_server_stats_db().insert(currEntry)
get_server_stats_db_backup().insert(currEntry)
except Exception as e:
logging.debug("failed to store server entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts))
logging.debug("exception was: %s" % (e))
get_server_stats_db_backup().insert(currEntry)

# Return boolean that tells you whether the insertion was successful or not
return response != None

def storeResultEntry(user, key, ts, reading):
logging.debug("storing result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts))
response = None

# Sometimes timestamp comes in as a float, represented as seconds.[somethign else]; truncate digits after the
# decimal
ts = int(ts)
currEntry = createEntry(user, key, ts, reading)

try:
get_result_stats_db_backup().insert(currEntry)
# response = get_result_stats_db().insert(currEntry)
except Exception as e:
logging.debug("failed to store result entry %s for user %s, key %s at timestamp %s" % (reading, user, key, ts))
logging.debug("exception was: %s" % (e))
get_result_stats_db_backup().insert(currEntry)

# Return boolean that tells you whether the insertion was successful or not
return response != None
def storeServerEntry(user, stat, ts, reading):
pass


def getClientMeasurementCount(readings):
retSum = 0
for currReading in readings:
currArray = readings[currReading]
# logging.debug("currArray for reading %s is %s and its length is %d" % (currReading, currArray, len(currArray)))
retSum = retSum + len(currArray)
return retSum

def createEntry(user, stat, ts, reading):
return {'user': user,
'stat': stat,
'ts': ts,
'reading': reading}
def storeResultEntry(user, stat, ts, reading):
pass
Loading

0 comments on commit cdfad07

Please sign in to comment.