Skip to content

Commit

Permalink
Merge pull request #970 from JGreenlee/faster_localdate_queries
Browse files Browse the repository at this point in the history
for yyyy_mm_dd metrics, query by fmt_time instead of local_dt
  • Loading branch information
shankari authored Jun 25, 2024
2 parents 20acdd9 + 6277d6c commit f55f223
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 70 deletions.
3 changes: 0 additions & 3 deletions emission/core/wrapper/localdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import arrow
import emission.core.wrapper.wrapperbase as ecwb

# specify the order of time units, from largest to smallest
DATETIME_UNITS = ['year', 'month', 'day', 'hour', 'minute', 'second']

class LocalDate(ecwb.WrapperBase):
"""
Supporting wrapper class that stores the expansions of the components
Expand Down
11 changes: 3 additions & 8 deletions emission/net/api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import emission.analysis.result.metrics.simple_metrics as earms
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.storage.decorations.local_date_queries as esdl
import emission.storage.timeseries.tcquery as esttc
import emission.storage.timeseries.fmt_time_query as estf

import emcommon.metrics.metrics_summaries as emcms

Expand All @@ -25,14 +25,9 @@ def summarize_by_local_date(user_id, start_ld, end_ld, freq_name, metric_list, i
return _call_group_fn(earmt.group_by_local_date, user_id, start_ld, end_ld,
local_freq, metric_list, include_aggregate)

def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config):
time_query = esttc.TimeComponentQuery(
"data.start_local_dt",
esdl.yyyy_mm_dd_to_local_date(start_ymd),
esdl.yyyy_mm_dd_to_local_date(end_ymd)
)
def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config):
time_query = estf.FmtTimeQuery("data.start_fmt_time", start_ymd, end_ymd)
trips = esda.get_entries(esda.COMPOSITE_TRIP_KEY, None, time_query)
print('found ' + str([e for e in trips]))
return emcms.generate_summaries(metric_list, trips, app_config)

def _call_group_fn(group_fn, user_id, start_time, end_time, freq, metric_list, include_aggregate):
Expand Down
91 changes: 43 additions & 48 deletions emission/storage/decorations/local_date_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,47 @@

import emission.core.wrapper.localdate as ecwl

def get_range_query(field_prefix, start_ld, end_ld):
units = [u for u in ecwl.DATETIME_UNITS if u in start_ld and u in end_ld]
logging.debug(f'get_range_query: units = {units}')
try:
gt_query = get_comparison_query(field_prefix, start_ld, end_ld, units, 'gt')
lt_query = get_comparison_query(field_prefix, end_ld, start_ld, units, 'lt')
logging.debug(f'get_range_query: gt_query = {gt_query}, lt_query = {lt_query}')
return { "$and": [gt_query, lt_query] } if gt_query and lt_query else {}
except AssertionError as e:
logging.error(f'Invalid range from {str(start_ld)} to {str(end_ld)}: {str(e)}')
return None

def get_comparison_query(field_prefix, base_ld, limit_ld, units, gt_or_lt):
field_name = lambda i: f'{field_prefix}.{units[i]}'
and_conditions, or_conditions = [], []
tiebreaker_index = -1
for i, unit in enumerate(units):
# the range is inclusive, so if on the last unit we should use $lte / $gte instead of $lt / $gt
op = f'${gt_or_lt}e' if i == len(units)-1 else f'${gt_or_lt}'
if tiebreaker_index >= 0:
tiebreaker_conditions = [{ field_name(j): base_ld[units[j]] } for j in range(tiebreaker_index, i)]
tiebreaker_conditions.append({ field_name(i): { op: base_ld[unit] }})
or_conditions.append({ "$and": tiebreaker_conditions })
elif base_ld[unit] == limit_ld[unit]:
and_conditions.append({field_name(i): base_ld[unit]})
def get_filter_query(field_name, start_local_dt, end_local_dt):
if list(start_local_dt.keys()) != list(end_local_dt.keys()):
raise RuntimeError("start_local_dt.keys() = %s does not match end_local_dt.keys() = %s" %
(list(start_local_dt.keys()), list(end_local_dt.keys())))
query_result = {}
for key in start_local_dt:
curr_field = "%s.%s" % (field_name, key)
gte_lte_query = {}
try:
start_int = int(start_local_dt[key])
except:
logging.info("start_local_dt[%s] = %s, not an integer, skipping" %
(key, start_local_dt[key]))
continue

try:
end_int = int(end_local_dt[key])
except:
logging.info("end_local_dt[%s] = %s, not an integer, skipping" %
(key, end_local_dt[key]))
continue

is_rollover = start_int > end_int

if is_rollover:
gte_lte_query = get_rollover_query(start_int, end_int)
else:
gte_lte_query = get_standard_query(start_int, end_int)

if len(gte_lte_query) > 0:
query_result.update({curr_field: gte_lte_query})
else:
assert (base_ld[unit] < limit_ld[unit]) if gt_or_lt == 'gt' else (base_ld[unit] > limit_ld[unit])
or_conditions.append({field_name(i): { op: base_ld[unit] }})
tiebreaker_index = i
if and_conditions and or_conditions:
return { "$and": and_conditions + [{ "$or": or_conditions }] }
elif and_conditions:
return { "$and": and_conditions }
elif or_conditions:
return { "$or": or_conditions }
else:
return {}

def yyyy_mm_dd_to_local_date(ymd: str) -> ecwl.LocalDate:
return ecwl.LocalDate({
'year': int(ymd[0:4]),
'month': int(ymd[5:7]),
'day': int(ymd[8:10])
})

def get_yyyy_mm_dd_range_query(field_name, start_ymd: str, end_ymd: str) -> dict:
start_local_date = yyyy_mm_dd_to_local_date(start_ymd)
end_local_date = yyyy_mm_dd_to_local_date(end_ymd)
return get_range_query(field_name, start_local_date, end_local_date)
logging.info("key %s exists, skipping because upper AND lower bounds are missing" % key)

logging.debug("In get_filter_query, returning query %s" % query_result)
return query_result

def get_standard_query(start_int, end_int):
assert(start_int <= end_int)
return {'$gte': start_int, '$lte': end_int}

def get_rollover_query(start_int, end_int):
assert(start_int > end_int)
return {'$not': {'$gt': end_int, '$lt': start_int}}
27 changes: 27 additions & 0 deletions emission/storage/timeseries/fmt_time_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from builtins import object

class FmtTimeQuery(object):
"""
Object that encapsulates a query for an inclusive range between two ISO-format strings.
Useful for querying based on the local date/time at which data was collected,
like with timeType of "data.fmt_time" or "data.start_fmt_time".
e.g. FmtTimeQuery("data.fmt_time", "2024-01", "2024-03") # first quarter of 2024
e.g. FmtTimeQuery("data.fmt_time", "2024-05-01", "2024-05-31") # all of May 2024
e.g. FmtTimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") # work hours on Jun 3 2024
"""
def __init__(self, timeType: str, startIso: str, endIso: str) -> None:
self.timeType = timeType
self.startIso = startIso
# append 'Z' to make the end range inclusive
# (because Z is greater than any other character that can appear in an ISO string)
self.endIso = endIso + 'Z'

def get_query(self) -> dict:
time_key = self.timeType
ret_query = {time_key: {"$lte": self.endIso}}
if (self.startIso is not None):
ret_query[time_key].update({"$gte": self.startIso})
return ret_query

def __repr__(self) -> str:
return f"FmtTimeQuery {self.timeType} with range [{self.startIso}, {self.endIso})"
9 changes: 6 additions & 3 deletions emission/storage/timeseries/tcquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

class TimeComponentQuery(object):
"""
Object that encapsulates a query for a particular time at the local time in
the timezone where the data was generated.
Object that encapsulates a query for filtering based on localdate objects.
This works as a set of filters for each localdate field, e.g. year, month, day, etc.
Useful for filtering on one or more localdate fields
e.g. TimeComponentQuery("data.start_local_dt", {"weekday": 0}, {"weekday": 4})
For range queries, use FmtTimeQuery instead.
"""
def __init__(self, timeType, startLD, endLD):
self.timeType = timeType
self.startLD = startLD
self.endLD = endLD

def get_query(self):
return esdl.get_range_query(self.timeType, self.startLD, self.endLD)
return esdl.get_filter_query(self.timeType, self.startLD, self.endLD)
26 changes: 18 additions & 8 deletions emission/tests/storageTests/TestLocalDateQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,46 @@ def testLocalDateReadWrite(self):
self.assertEqual(ret_entry.data.local_dt.weekday, 2)
self.assertEqual(ret_entry.data.fmt_time, "2016-04-13T15:32:09-07:00")

def testLocalRangeStandardQuery(self):
def testLocalDateFilterStandardQuery(self):
"""
Search for all entries between 8:18 and 8:20 local time, both inclusive
"""
start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18})
end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 20})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entriesCnt = edb.get_timeseries_db().count_documents(final_query)
self.assertEqual(15, entriesCnt)

def testLocalRangeRolloverQuery(self):
def testLocalDateFilterRolloverQuery(self):
"""
Search for all entries between 8:18 and 9:08 local time, both inclusive
"""
start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18})
end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 9, 'minute': 8})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
entriesCnt = edb.get_timeseries_db().count_documents(final_query)
self.assertEqual(232, entriesCnt)
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entries = edb.get_timeseries_db().find(final_query).sort('data.ts', pymongo.ASCENDING)
self.assertEqual(448, edb.get_timeseries_db().count_documents(final_query))

entries_list = list(entries)

# Note that since this is a set of filters, as opposed to a range, this
# returns all entries between 18 and 8 in both hours.
# so 8:18 is valid, but so is 9:57
self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.hour, 8)
self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.minute, 18)
self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.hour, 9)
self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.minute, 57)

def testLocalMatchingQuery(self):
def testLocalDateFilterMatchingQuery(self):
"""
Search for all entries that occur at minute = 8 from any hour
"""
start_local_dt = ecwl.LocalDate({'minute': 8})
end_local_dt = ecwl.LocalDate({'minute': 8})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entries_docs = edb.get_timeseries_db().find(final_query).sort("metadata.write_ts")
self.assertEqual(20, edb.get_timeseries_db().count_documents(final_query))
entries = [ecwe.Entry(doc) for doc in entries_docs]
Expand Down

0 comments on commit f55f223

Please sign in to comment.