From 2da891874b48960c3cf866c46b683cd8aaa31629 Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Fri, 7 Jun 2024 11:16:09 -0400 Subject: [PATCH 1/4] for yyyy_mm_dd metrics, query by fmt_time instead of local_dt I recently rewrote the local dt queries used by TimeComponentQuery (https://github.com/e-mission/e-mission-server/pull/968) to make them behave as datetime range queries. But this results in some pretty complex queries which could have nested $and and $or conditions. It felt overengineered but that was the logic required if we were to query date ranges by looking at local dt objects where year, month, day, etc are all recorded separately. Unfortunately, those queries run super slowly on production. I think this is because the $and / $or conditions prevented MongoDB from optimizing efficiently via indexing Looked for a different solution, I found something better and simpler. At first I didn't think using fmt_time fields with $lt and $gt comparisons would work becuase they're ISO strings, not numbers. But luckily MongoDB can compare strings this way. it performs a lexicographical comparison where 0-9 < A-Z < a-z (like ASCII). ISO has the standard format 0000-00-00T00:00:00 The dashes and the T will always be in the same places, so effectively, only the numbers will be compared. And the timezone info is at the end, so it doesn't get considered as long as we don't include it in the start and end inputs. The only thing that specifically needs to be handled is making the end range inclusive. If I query from "2024-06-01" to "2024-06-03", an entry like "2024-06-03T23:59:59-04:00" should match. But lexicographically, that comes after "2024-06-03". Appending a "Z" to the end range solves this. The result of all this is that the TimeQuery class (timequery.py) is now able to handle either timestamp or ISO dates, and this is what metrics.py will use for summarize_by_yyyy_mm_dd. --- emission/net/api/metrics.py | 11 +++------- emission/storage/timeseries/timequery.py | 27 +++++++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/emission/net/api/metrics.py b/emission/net/api/metrics.py index a427d9c31..eeb80a358 100644 --- a/emission/net/api/metrics.py +++ b/emission/net/api/metrics.py @@ -12,7 +12,7 @@ import emission.analysis.result.metrics.simple_metrics as earms import emission.storage.decorations.analysis_timeseries_queries as esda import emission.storage.decorations.local_date_queries as esdl -import emission.storage.timeseries.tcquery as esttc +import emission.storage.timeseries.timequery as esttq import emcommon.metrics.metrics_summaries as emcms @@ -25,14 +25,9 @@ def summarize_by_local_date(user_id, start_ld, end_ld, freq_name, metric_list, i return _call_group_fn(earmt.group_by_local_date, user_id, start_ld, end_ld, local_freq, metric_list, include_aggregate) -def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config): - time_query = esttc.TimeComponentQuery( - "data.start_local_dt", - esdl.yyyy_mm_dd_to_local_date(start_ymd), - esdl.yyyy_mm_dd_to_local_date(end_ymd) - ) +def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config): + time_query = esttq.TimeQuery("data.start_fmt_time", start_ymd, end_ymd) trips = esda.get_entries(esda.COMPOSITE_TRIP_KEY, None, time_query) - print('found ' + str([e for e in trips])) return emcms.generate_summaries(metric_list, trips, app_config) def _call_group_fn(group_fn, user_id, start_time, end_time, freq, metric_list, include_aggregate): diff --git a/emission/storage/timeseries/timequery.py b/emission/storage/timeseries/timequery.py index 950e0adb0..1c86be78a 100644 --- a/emission/storage/timeseries/timequery.py +++ b/emission/storage/timeseries/timequery.py @@ -8,19 +8,26 @@ from builtins import object class TimeQuery(object): """ - Object that encapsulates a query for a particular time (read_ts, write_ts, or processed_ts) + Object that encapsulates a query for a range of time [start_time, end_time] + Can query by Unix timestamps with a '*_ts' time_type (like "metadata.write_ts", "data.ts", or "data.start_ts") + e.g. TimeQuery("metadata.write_ts", 1234567890, 1234567900) + Or, can query by ISO datetime strings with a '*_fmt_time' time_type (like "data.fmt_time" or "data.start_fmt_time") + This is useful for querying based on the local date/time at which data was collected + e.g. TimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") """ - def __init__(self, timeType, startTs, endTs): - self.timeType = timeType - self.startTs = startTs - self.endTs = endTs + def __init__(self, time_type, start_time, end_time): + self.time_type = time_type + self.start_time = start_time + # if end_time is an ISO string, append 'Z' to make the end range inclusive + # (because Z is greater than any other character that can appear in an ISO string) + self.end_time = end_time + 'Z' if isinstance(end_time, str) else end_time def get_query(self): - time_key = self.timeType - ret_query = {time_key : {"$lte": self.endTs}} - if (self.startTs is not None): - ret_query[time_key].update({"$gte": self.startTs}) + time_key = self.time_type + ret_query = {time_key : {"$lte": self.end_time}} + if (self.start_time is not None): + ret_query[time_key].update({"$gte": self.start_time}) return ret_query def __repr__(self): - return f"TimeQuery {self.timeType} with range [{self.startTs}, {self.endTs})" + return f"TimeQuery {self.time_type} with range [{self.start_time}, {self.end_time})" From cc3b5ba03e9cd2cf478ef89c05d9fb2105fb43f8 Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Tue, 11 Jun 2024 13:14:43 -0400 Subject: [PATCH 2/4] separate class for fmt_time queries; update metrics.py to use new class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/e-mission/e-mission-server/pull/970#issuecomment-2155267140 > I realized that the TimeQuery on `e-mission-server` (which works w/ MongoDB) mirrors TimeQuery in the native code plugins (which work with SQL dbs). > > For that reason, I don't want to rename anything in `TimeQuery` right now. So instead of extending the functionality of `TimeQuery` to work with either ts or fmt_time, I'm going to make a new class called `FmtTimeQuery` (or similar) > > I briefly read up on SQL queries to make sure they work the same way when comparing strings. They do – so later on, we could implement `FmtTimeQuery` on the native code UserCache plugin too --- emission/net/api/metrics.py | 4 +-- emission/storage/timeseries/fmt_time_query.py | 27 +++++++++++++++++++ emission/storage/timeseries/timequery.py | 27 +++++++------------ 3 files changed, 39 insertions(+), 19 deletions(-) create mode 100644 emission/storage/timeseries/fmt_time_query.py diff --git a/emission/net/api/metrics.py b/emission/net/api/metrics.py index eeb80a358..abec0ccfc 100644 --- a/emission/net/api/metrics.py +++ b/emission/net/api/metrics.py @@ -12,7 +12,7 @@ import emission.analysis.result.metrics.simple_metrics as earms import emission.storage.decorations.analysis_timeseries_queries as esda import emission.storage.decorations.local_date_queries as esdl -import emission.storage.timeseries.timequery as esttq +import emission.storage.timeseries.fmt_time_query as estf import emcommon.metrics.metrics_summaries as emcms @@ -26,7 +26,7 @@ def summarize_by_local_date(user_id, start_ld, end_ld, freq_name, metric_list, i local_freq, metric_list, include_aggregate) def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config): - time_query = esttq.TimeQuery("data.start_fmt_time", start_ymd, end_ymd) + time_query = estf.FmtTimeQuery("data.start_fmt_time", start_ymd, end_ymd) trips = esda.get_entries(esda.COMPOSITE_TRIP_KEY, None, time_query) return emcms.generate_summaries(metric_list, trips, app_config) diff --git a/emission/storage/timeseries/fmt_time_query.py b/emission/storage/timeseries/fmt_time_query.py new file mode 100644 index 000000000..46eb2798e --- /dev/null +++ b/emission/storage/timeseries/fmt_time_query.py @@ -0,0 +1,27 @@ +from builtins import object + +class FmtTimeQuery(object): + """ + Object that encapsulates a query for an inclusive range between two ISO-format strings. + Useful for querying based on the local date/time at which data was collected, + like with timeType of "data.fmt_time" or "data.start_fmt_time". + e.g. TimeQuery("data.fmt_time", "2024-01", "2024-03") # first quarter of 2024 + e.g. TimeQuery("data.fmt_time", "2024-05-01", "2024-05-31") # all of May 2024 + e.g. TimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") # work hours on Jun 3 2024 + """ + def __init__(self, timeType: str, startIso: str, endIso: str) -> None: + self.timeType = timeType + self.startIso = startIso + # append 'Z' to make the end range inclusive + # (because Z is greater than any other character that can appear in an ISO string) + self.endIso = endIso + 'Z' + + def get_query(self) -> dict: + time_key = self.timeType + ret_query = {time_key: {"$lte": self.endIso}} + if (self.startIso is not None): + ret_query[time_key].update({"$gte": self.startIso}) + return ret_query + + def __repr__(self) -> str: + return f"FmtTimeQuery {self.timeType} with range [{self.startIso}, {self.endIso})" diff --git a/emission/storage/timeseries/timequery.py b/emission/storage/timeseries/timequery.py index 1c86be78a..950e0adb0 100644 --- a/emission/storage/timeseries/timequery.py +++ b/emission/storage/timeseries/timequery.py @@ -8,26 +8,19 @@ from builtins import object class TimeQuery(object): """ - Object that encapsulates a query for a range of time [start_time, end_time] - Can query by Unix timestamps with a '*_ts' time_type (like "metadata.write_ts", "data.ts", or "data.start_ts") - e.g. TimeQuery("metadata.write_ts", 1234567890, 1234567900) - Or, can query by ISO datetime strings with a '*_fmt_time' time_type (like "data.fmt_time" or "data.start_fmt_time") - This is useful for querying based on the local date/time at which data was collected - e.g. TimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") + Object that encapsulates a query for a particular time (read_ts, write_ts, or processed_ts) """ - def __init__(self, time_type, start_time, end_time): - self.time_type = time_type - self.start_time = start_time - # if end_time is an ISO string, append 'Z' to make the end range inclusive - # (because Z is greater than any other character that can appear in an ISO string) - self.end_time = end_time + 'Z' if isinstance(end_time, str) else end_time + def __init__(self, timeType, startTs, endTs): + self.timeType = timeType + self.startTs = startTs + self.endTs = endTs def get_query(self): - time_key = self.time_type - ret_query = {time_key : {"$lte": self.end_time}} - if (self.start_time is not None): - ret_query[time_key].update({"$gte": self.start_time}) + time_key = self.timeType + ret_query = {time_key : {"$lte": self.endTs}} + if (self.startTs is not None): + ret_query[time_key].update({"$gte": self.startTs}) return ret_query def __repr__(self): - return f"TimeQuery {self.time_type} with range [{self.start_time}, {self.end_time})" + return f"TimeQuery {self.timeType} with range [{self.startTs}, {self.endTs})" From f9e411cb2433d33be8c00ea17235bf8a2c5338ad Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Fri, 14 Jun 2024 12:25:12 -0400 Subject: [PATCH 3/4] fix docstring in fmt_time_query --- emission/storage/timeseries/fmt_time_query.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/emission/storage/timeseries/fmt_time_query.py b/emission/storage/timeseries/fmt_time_query.py index 46eb2798e..7c6e0b614 100644 --- a/emission/storage/timeseries/fmt_time_query.py +++ b/emission/storage/timeseries/fmt_time_query.py @@ -5,9 +5,9 @@ class FmtTimeQuery(object): Object that encapsulates a query for an inclusive range between two ISO-format strings. Useful for querying based on the local date/time at which data was collected, like with timeType of "data.fmt_time" or "data.start_fmt_time". - e.g. TimeQuery("data.fmt_time", "2024-01", "2024-03") # first quarter of 2024 - e.g. TimeQuery("data.fmt_time", "2024-05-01", "2024-05-31") # all of May 2024 - e.g. TimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") # work hours on Jun 3 2024 + e.g. FmtTimeQuery("data.fmt_time", "2024-01", "2024-03") # first quarter of 2024 + e.g. FmtTimeQuery("data.fmt_time", "2024-05-01", "2024-05-31") # all of May 2024 + e.g. FmtTimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") # work hours on Jun 3 2024 """ def __init__(self, timeType: str, startIso: str, endIso: str) -> None: self.timeType = timeType From 6277d6c1af67a7f75b586a4ca40f145970ab6b1e Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Fri, 14 Jun 2024 12:30:58 -0400 Subject: [PATCH 4/4] revert TimeComponentQuery to the old 'set of filters' behavior https://github.com/e-mission/e-mission-server/pull/970#issuecomment-2155241066 reverts https://github.com/e-mission/e-mission-server/pull/968 back to the original implementation since we now have a faster way to do "range" queries via FmtTimeQuery. To make it clearer that FmtTimeQuery is for ranges and TimeComponentQuery is for filters, I renamed some functions and added more descriptive comments/docstrings --- emission/core/wrapper/localdate.py | 3 - .../storage/decorations/local_date_queries.py | 91 +++++++++---------- emission/storage/timeseries/tcquery.py | 9 +- .../storageTests/TestLocalDateQueries.py | 26 ++++-- 4 files changed, 67 insertions(+), 62 deletions(-) diff --git a/emission/core/wrapper/localdate.py b/emission/core/wrapper/localdate.py index a60eb04e4..1f71e737c 100644 --- a/emission/core/wrapper/localdate.py +++ b/emission/core/wrapper/localdate.py @@ -9,9 +9,6 @@ import arrow import emission.core.wrapper.wrapperbase as ecwb -# specify the order of time units, from largest to smallest -DATETIME_UNITS = ['year', 'month', 'day', 'hour', 'minute', 'second'] - class LocalDate(ecwb.WrapperBase): """ Supporting wrapper class that stores the expansions of the components diff --git a/emission/storage/decorations/local_date_queries.py b/emission/storage/decorations/local_date_queries.py index bc6e8bc53..de5ff8d60 100644 --- a/emission/storage/decorations/local_date_queries.py +++ b/emission/storage/decorations/local_date_queries.py @@ -11,52 +11,47 @@ import emission.core.wrapper.localdate as ecwl -def get_range_query(field_prefix, start_ld, end_ld): - units = [u for u in ecwl.DATETIME_UNITS if u in start_ld and u in end_ld] - logging.debug(f'get_range_query: units = {units}') - try: - gt_query = get_comparison_query(field_prefix, start_ld, end_ld, units, 'gt') - lt_query = get_comparison_query(field_prefix, end_ld, start_ld, units, 'lt') - logging.debug(f'get_range_query: gt_query = {gt_query}, lt_query = {lt_query}') - return { "$and": [gt_query, lt_query] } if gt_query and lt_query else {} - except AssertionError as e: - logging.error(f'Invalid range from {str(start_ld)} to {str(end_ld)}: {str(e)}') - return None - -def get_comparison_query(field_prefix, base_ld, limit_ld, units, gt_or_lt): - field_name = lambda i: f'{field_prefix}.{units[i]}' - and_conditions, or_conditions = [], [] - tiebreaker_index = -1 - for i, unit in enumerate(units): - # the range is inclusive, so if on the last unit we should use $lte / $gte instead of $lt / $gt - op = f'${gt_or_lt}e' if i == len(units)-1 else f'${gt_or_lt}' - if tiebreaker_index >= 0: - tiebreaker_conditions = [{ field_name(j): base_ld[units[j]] } for j in range(tiebreaker_index, i)] - tiebreaker_conditions.append({ field_name(i): { op: base_ld[unit] }}) - or_conditions.append({ "$and": tiebreaker_conditions }) - elif base_ld[unit] == limit_ld[unit]: - and_conditions.append({field_name(i): base_ld[unit]}) +def get_filter_query(field_name, start_local_dt, end_local_dt): + if list(start_local_dt.keys()) != list(end_local_dt.keys()): + raise RuntimeError("start_local_dt.keys() = %s does not match end_local_dt.keys() = %s" % + (list(start_local_dt.keys()), list(end_local_dt.keys()))) + query_result = {} + for key in start_local_dt: + curr_field = "%s.%s" % (field_name, key) + gte_lte_query = {} + try: + start_int = int(start_local_dt[key]) + except: + logging.info("start_local_dt[%s] = %s, not an integer, skipping" % + (key, start_local_dt[key])) + continue + + try: + end_int = int(end_local_dt[key]) + except: + logging.info("end_local_dt[%s] = %s, not an integer, skipping" % + (key, end_local_dt[key])) + continue + + is_rollover = start_int > end_int + + if is_rollover: + gte_lte_query = get_rollover_query(start_int, end_int) + else: + gte_lte_query = get_standard_query(start_int, end_int) + + if len(gte_lte_query) > 0: + query_result.update({curr_field: gte_lte_query}) else: - assert (base_ld[unit] < limit_ld[unit]) if gt_or_lt == 'gt' else (base_ld[unit] > limit_ld[unit]) - or_conditions.append({field_name(i): { op: base_ld[unit] }}) - tiebreaker_index = i - if and_conditions and or_conditions: - return { "$and": and_conditions + [{ "$or": or_conditions }] } - elif and_conditions: - return { "$and": and_conditions } - elif or_conditions: - return { "$or": or_conditions } - else: - return {} - -def yyyy_mm_dd_to_local_date(ymd: str) -> ecwl.LocalDate: - return ecwl.LocalDate({ - 'year': int(ymd[0:4]), - 'month': int(ymd[5:7]), - 'day': int(ymd[8:10]) - }) - -def get_yyyy_mm_dd_range_query(field_name, start_ymd: str, end_ymd: str) -> dict: - start_local_date = yyyy_mm_dd_to_local_date(start_ymd) - end_local_date = yyyy_mm_dd_to_local_date(end_ymd) - return get_range_query(field_name, start_local_date, end_local_date) + logging.info("key %s exists, skipping because upper AND lower bounds are missing" % key) + + logging.debug("In get_filter_query, returning query %s" % query_result) + return query_result + +def get_standard_query(start_int, end_int): + assert(start_int <= end_int) + return {'$gte': start_int, '$lte': end_int} + +def get_rollover_query(start_int, end_int): + assert(start_int > end_int) + return {'$not': {'$gt': end_int, '$lt': start_int}} diff --git a/emission/storage/timeseries/tcquery.py b/emission/storage/timeseries/tcquery.py index 830382dd2..88084bc08 100644 --- a/emission/storage/timeseries/tcquery.py +++ b/emission/storage/timeseries/tcquery.py @@ -11,8 +11,11 @@ class TimeComponentQuery(object): """ - Object that encapsulates a query for a particular time at the local time in - the timezone where the data was generated. + Object that encapsulates a query for filtering based on localdate objects. + This works as a set of filters for each localdate field, e.g. year, month, day, etc. + Useful for filtering on one or more localdate fields + e.g. TimeComponentQuery("data.start_local_dt", {"weekday": 0}, {"weekday": 4}) + For range queries, use FmtTimeQuery instead. """ def __init__(self, timeType, startLD, endLD): self.timeType = timeType @@ -20,4 +23,4 @@ def __init__(self, timeType, startLD, endLD): self.endLD = endLD def get_query(self): - return esdl.get_range_query(self.timeType, self.startLD, self.endLD) + return esdl.get_filter_query(self.timeType, self.startLD, self.endLD) diff --git a/emission/tests/storageTests/TestLocalDateQueries.py b/emission/tests/storageTests/TestLocalDateQueries.py index 68bb3fe42..f169be831 100644 --- a/emission/tests/storageTests/TestLocalDateQueries.py +++ b/emission/tests/storageTests/TestLocalDateQueries.py @@ -64,36 +64,46 @@ def testLocalDateReadWrite(self): self.assertEqual(ret_entry.data.local_dt.weekday, 2) self.assertEqual(ret_entry.data.fmt_time, "2016-04-13T15:32:09-07:00") - def testLocalRangeStandardQuery(self): + def testLocalDateFilterStandardQuery(self): """ Search for all entries between 8:18 and 8:20 local time, both inclusive """ start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18}) end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 20}) final_query = {"user_id": self.testUUID} - final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt)) + final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt)) entriesCnt = edb.get_timeseries_db().count_documents(final_query) self.assertEqual(15, entriesCnt) - def testLocalRangeRolloverQuery(self): + def testLocalDateFilterRolloverQuery(self): """ Search for all entries between 8:18 and 9:08 local time, both inclusive """ start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18}) end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 9, 'minute': 8}) final_query = {"user_id": self.testUUID} - final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt)) - entriesCnt = edb.get_timeseries_db().count_documents(final_query) - self.assertEqual(232, entriesCnt) + final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt)) + entries = edb.get_timeseries_db().find(final_query).sort('data.ts', pymongo.ASCENDING) + self.assertEqual(448, edb.get_timeseries_db().count_documents(final_query)) + + entries_list = list(entries) + + # Note that since this is a set of filters, as opposed to a range, this + # returns all entries between 18 and 8 in both hours. + # so 8:18 is valid, but so is 9:57 + self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.hour, 8) + self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.minute, 18) + self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.hour, 9) + self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.minute, 57) - def testLocalMatchingQuery(self): + def testLocalDateFilterMatchingQuery(self): """ Search for all entries that occur at minute = 8 from any hour """ start_local_dt = ecwl.LocalDate({'minute': 8}) end_local_dt = ecwl.LocalDate({'minute': 8}) final_query = {"user_id": self.testUUID} - final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt)) + final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt)) entries_docs = edb.get_timeseries_db().find(final_query).sort("metadata.write_ts") self.assertEqual(20, edb.get_timeseries_db().count_documents(final_query)) entries = [ecwe.Entry(doc) for doc in entries_docs]