diff --git a/.gitignore b/.gitignore index e1a0cf045..1b467ec07 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.swp *debug.log .DS_Store +.vscode CFC_WebApp/config.json CFC_WebApp/keys.json diff --git a/bin/build_label_model.py b/bin/build_label_model.py index 7ba3fe066..3afd62901 100644 --- a/bin/build_label_model.py +++ b/bin/build_label_model.py @@ -5,13 +5,13 @@ import argparse import uuid -import copy import emission.pipeline.reset as epr import emission.core.get_database as edb import emission.core.wrapper.user as ecwu import emission.storage.timeseries.abstract_timeseries as esta -import emission.analysis.modelling.tour_model_first_only.build_save_model as eamtb +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.config as eamtc def _get_user_list(args): if args.all: @@ -64,4 +64,10 @@ def _email_2_user_list(email_list): logging.info("received list with %s users" % user_list) for user_id in user_list: logging.info("building model for user %s" % user_id) - eamtb.build_user_model(user_id) + # these can come from the application config as default values + + model_type = eamtc.get_model_type() + model_storage = eamtc.get_model_storage() + min_trips = eamtc.get_minimum_trips() + ## Rebuild and save the trip model with the specified parameters + eamur.update_trip_model(user_id, model_type, model_storage, min_trips) diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample new file mode 100644 index 000000000..845e67a6a --- /dev/null +++ b/conf/analysis/trip_model.conf.json.sample @@ -0,0 +1,13 @@ +{ + "model_type": "greedy", + "model_storage": "document_database", + "minimum_trips": 14, + "model_parameters": { + "greedy": { + "metric": "od_similarity", + "similarity_threshold_meters": 500, + "apply_cutoff": false, + "incremental_evaluation": false + } + } +} \ No newline at end of file diff --git a/emission/analysis/classification/inference/labels/inferrers.py b/emission/analysis/classification/inference/labels/inferrers.py index 6ce4c7702..c6b939671 100644 --- a/emission/analysis/classification/inference/labels/inferrers.py +++ b/emission/analysis/classification/inference/labels/inferrers.py @@ -6,6 +6,8 @@ import copy import emission.analysis.modelling.tour_model_first_only.load_predict as lp +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.config as eamtc # A set of placeholder predictors to allow pipeline development without a real inference algorithm. # For the moment, the system is configured to work with two labels, "mode_confirm" and @@ -140,7 +142,10 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide # predict_two_stage_bin_cluster but with the above reduction in confidence def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): - labels, n = lp.predict_labels_with_n(trip) + # load application config + model_type = eamtc.get_model_type() + model_storage = eamtc.get_model_storage() + labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage) if n <= 0: # No model data or trip didn't match a cluster logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") return labels diff --git a/emission/analysis/modelling/similarity/__init__.py b/emission/analysis/modelling/similarity/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/analysis/modelling/similarity/confirmed_trip_feature_extraction.py b/emission/analysis/modelling/similarity/confirmed_trip_feature_extraction.py new file mode 100644 index 000000000..029359424 --- /dev/null +++ b/emission/analysis/modelling/similarity/confirmed_trip_feature_extraction.py @@ -0,0 +1,64 @@ +from typing import List +import emission.core.wrapper.confirmedtrip as ecwc + + +def origin_features(trip: ecwc.Confirmedtrip) -> List[float]: + """extract the trip origin coordinates. + + :param trip: trip to extract features from + :return: origin coordinates + """ + try: + origin = trip['data']['start_loc']["coordinates"] + return origin + except KeyError as e: + msg = 'Confirmedtrip expected to have path data.start_loc.coordinates' + raise KeyError(msg) from e + +def destination_features(trip: ecwc.Confirmedtrip) -> List[float]: + """extract the trip destination coordinates. + + :param trip: trip to extract features from + :return: destination coordinates + """ + try: + destination = trip['data']['end_loc']["coordinates"] + return destination + except KeyError as e: + msg = 'Confirmedtrip expected to have path data.end_loc.coordinates' + raise KeyError(msg) from e + + +def od_features(trip: ecwc.Confirmedtrip) -> List[float]: + """extract both origin and destination coordinates. + + :param trip: trip to extract features from + :return: od coordinates + """ + o_lat, o_lon = origin_features(trip) + d_lat, d_lon = destination_features(trip) + return [o_lat, o_lon, d_lat, d_lon] + +def distance_feature(trip: ecwc.Confirmedtrip) -> List[float]: + """provided for forward compatibility. + + :param trip: trip to extract features from + :return: distance feature + """ + try: + return [trip['data']['distance']] + except KeyError as e: + msg = 'Confirmedtrip expected to have path data.distance' + raise KeyError(msg) from e + +def duration_feature(trip: ecwc.Confirmedtrip) -> List[float]: + """provided for forward compatibility. + + :param trip: trip to extract features from + :return: duration feature + """ + try: + return [trip['data']['duration']] + except KeyError as e: + msg = 'Confirmedtrip expected to have path data.duration' + raise KeyError(msg) from e diff --git a/emission/analysis/modelling/similarity/od_similarity.py b/emission/analysis/modelling/similarity/od_similarity.py new file mode 100644 index 000000000..3b84bd764 --- /dev/null +++ b/emission/analysis/modelling/similarity/od_similarity.py @@ -0,0 +1,21 @@ +from typing import List +import emission.analysis.modelling.similarity.similarity_metric as eamss +import emission.analysis.modelling.similarity.confirmed_trip_feature_extraction as ctfe +import emission.core.wrapper.confirmedtrip as ecwc +import emission.core.common as ecc + + +class OriginDestinationSimilarity(eamss.SimilarityMetric): + """ + similarity metric which compares, for two trips, + the distance for origin to origin, and destination to destination, + in meters. + """ + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + return ctfe.od_features(trip) + + def similarity(self, a: List[float], b: List[float]) -> List[float]: + o_dist = ecc.calDistance([a[0], a[1]], [b[0], b[1]]) + d_dist = ecc.calDistance([a[2], a[3]], [b[2], b[3]]) + return [o_dist, d_dist] \ No newline at end of file diff --git a/emission/analysis/modelling/similarity/similarity_metric.py b/emission/analysis/modelling/similarity/similarity_metric.py new file mode 100644 index 000000000..6be00216f --- /dev/null +++ b/emission/analysis/modelling/similarity/similarity_metric.py @@ -0,0 +1,41 @@ +from abc import ABCMeta, abstractmethod +from typing import List +import logging + +import emission.core.wrapper.confirmedtrip as ecwc + + +class SimilarityMetric(metaclass=ABCMeta): + + @abstractmethod + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + """extracts the features we want to compare for similarity + + :param trip: a confirmed trip + :return: the features to compare + """ + pass + + @abstractmethod + def similarity(self, a: List[float], b: List[float]) -> List[float]: + """compares the features, producing their similarity + as computed by this similarity metric + + :param a: features for a trip + :param b: features for another trip + :return: for each feature, the similarity of these features + """ + pass + + def similar(self, a: List[float], b: List[float], thresh: float) -> bool: + """compares the features, returning true if they are similar + within some threshold + + :param a: features for a trip + :param b: features for another trip + :param thresh: threshold for similarity + :return: true if the feature similarity is within some threshold + """ + similarity_values = self.similarity(a, b) + is_similar = all(map(lambda sim: sim <= thresh, similarity_values)) + return is_similar diff --git a/emission/analysis/modelling/similarity/similarity_metric_type.py b/emission/analysis/modelling/similarity/similarity_metric_type.py new file mode 100644 index 000000000..6f3f4c776 --- /dev/null +++ b/emission/analysis/modelling/similarity/similarity_metric_type.py @@ -0,0 +1,49 @@ +from __future__ import annotations +import enum + + +import emission.analysis.modelling.similarity.od_similarity as eamso +import emission.analysis.modelling.similarity.similarity_metric as eamss + +class SimilarityMetricType(enum.Enum): + OD_SIMILARITY = 0 + + def build(self) -> eamss.SimilarityMetric: + """ + + hey YOU! add future similarity metric types here please! + + :raises KeyError: if the SimilarityMetricType isn't found in the below dictionary + :return: the associated similarity metric + """ + metrics = { + SimilarityMetricType.OD_SIMILARITY: eamso.OriginDestinationSimilarity() + } + + metric = metrics.get(self) + if metric is None: + names = "{" + ",".join(SimilarityMetricType.names) + "}" + msg = f"unknown metric type {metric}, must be one of {names}" + raise KeyError(msg) + else: + return metric + + + @classmethod + def names(cls): + return list(map(lambda e: e.name, list(cls))) + + @classmethod + def from_str(cls, str): + """attempts to match the provided string to a known SimilarityMetricType. + not case sensitive. + + :param str: a string name of a SimilarityMetricType + """ + try: + str_caps = str.upper() + return cls[str_caps] + except KeyError: + names = "{" + ",".join(cls.names) + "}" + msg = f"{str} is not a known SimilarityMetricType, must be one of {names}" + raise KeyError(msg) \ No newline at end of file diff --git a/emission/analysis/modelling/tour_model/label_processing.py b/emission/analysis/modelling/tour_model/label_processing.py index e69707305..1384b6ebb 100644 --- a/emission/analysis/modelling/tour_model/label_processing.py +++ b/emission/analysis/modelling/tour_model/label_processing.py @@ -34,14 +34,14 @@ def map_labels_mode(user_input_df): # convert mode if "replaced_mode" in user_input_df.columns: same_mode_df = user_input_df[user_input_df.replaced_mode == "same_mode"] - logging.debug("The following rows will be changed %s" % - same_mode_df.index) - for a in range(len(user_input_df)): - if user_input_df.iloc[a]["replaced_mode"] == "same_mode": - # to see which row will be converted - # logging.debug("The following rows will be changed: %s", user_input_df.iloc[a]) - user_input_df.iloc[a]["replaced_mode"] = user_input_df.iloc[a]['mode_confirm'] - logging.debug("Finished changing all rows") + if len(same_mode_df) > 0: + logging.debug("The following rows will be changed %s" % same_mode_df.index) + for a in range(len(user_input_df)): + if user_input_df.iloc[a]["replaced_mode"] == "same_mode": + # to see which row will be converted + # logging.debug("The following rows will be changed: %s", user_input_df.iloc[a]) + user_input_df.iloc[a]["replaced_mode"] = user_input_df.iloc[a]['mode_confirm'] + logging.debug("Finished changing all rows") else: logging.info("map_labels_mode: no replaced mode column found, early return") return user_input_df diff --git a/emission/analysis/modelling/trip_model/__init__.py b/emission/analysis/modelling/trip_model/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/analysis/modelling/trip_model/config.py b/emission/analysis/modelling/trip_model/config.py new file mode 100644 index 000000000..76b3c6e6d --- /dev/null +++ b/emission/analysis/modelling/trip_model/config.py @@ -0,0 +1,79 @@ +import json +import re +from this import d +from typing import Optional +import logging +from numpy import isin + +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt + +config_filename = "" + +def load_config(): + global config_filename + try: + config_filename = 'conf/analysis/trip_model.conf.json' + config_file = open(config_filename) + except: + print("analysis.trip_model.conf.json not configured, falling back to sample, default configuration") + config_filename = 'conf/analysis/trip_model.conf.json.sample' + config_file = open('conf/analysis/trip_model.conf.json.sample') + ret_val = json.load(config_file) + config_file.close() + return ret_val + +config_data = load_config() + +def reload_config(): + global config_data + config_data = load_config() + +def get_config(): + return config_data + +def get_optional_config_value(key) -> Optional[str]: + """ + get a config value at the provided path/key + + :param key: a key name or a dot-delimited path to some key within the config object + :return: the value at the key, or, None if not found + """ + cursor = config_data + path = key.split(".") + for k in path: + cursor = cursor.get(k) + if cursor is None: + return None + return cursor + +def get_config_value_or_raise(key): + logging.debug(f'getting key {key} in config') + value = get_optional_config_value(key) + if value is None: + logging.debug('config object:') + logging.debug(json.dumps(config_data, indent=2)) + msg = f"expected config key {key} not found in config file {config_filename}" + raise KeyError(msg) + else: + return value + +def get_model_type(): + model_type_str = get_config_value_or_raise('model_type') + model_type = eamumt.ModelType.from_str(model_type_str) + return model_type + +def get_model_storage(): + model_storage_str = get_config_value_or_raise('model_storage') + model_storage = eamums.ModelStorage.from_str(model_storage_str) + return model_storage + +def get_minimum_trips(): + minimum_trips = get_config_value_or_raise('minimum_trips') + if not isinstance(minimum_trips, int): + msg = f"config key 'minimum_trips' not an integer in config file {config_filename}" + raise TypeError(msg) + return minimum_trips + + + diff --git a/emission/analysis/modelling/trip_model/greedy_similarity_binning.py b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py new file mode 100644 index 000000000..d750a451e --- /dev/null +++ b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py @@ -0,0 +1,302 @@ +import logging +from tokenize import group +from typing import Dict, List, Optional, Tuple + +import emission.analysis.modelling.similarity.similarity_metric_type as eamssmt +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.util as util +import emission.analysis.modelling.trip_model.config as eamtc +import emission.core.wrapper.confirmedtrip as ecwc +import pandas as pd + + +class GreedySimilarityBinning(eamuu.TripModel): + + is_incremental: bool = False # overwritten during __init__ + + def __init__(self, config=None): + """ + instantiate a clustering model for a user. + + replaces the original similarity class + [https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L67] + + this technique employs a greedy similarity heuristic to associate + trips with collections of probabilistic class labels. new bins are + created when the next feature vector is not similar to any existing bins. + for a new feature vector to be similar to an existing bin, it must be + similar to all of the previous feature vectors found in that bin, by way + of a provided similarity metric and threshold value. + + in pseudocode: + + # fit + for each bin_id, bin in bins: + for each bin_feature_row in bin.feature_rows: + if not similar(trip.feature_row, bin_feature_row): + return + append trip to bin + + the prediction of labels for some input trip takes a similar form, + where the first bin that is found to be similar is treated as the + class label to apply: + + # prediction + for each bin_id, bin in bins: + for each bin_feature_row in bin.feature_rows: + if not similar(trip.feature_row, bin_feature_row): + break + return bin_id + + to train the predictions, label sets are aggregated within a bin so that + the occurences of some unique label combination is counted. the probability + of a specific unique label combination is assigned by the proportion + of counts of this unique label set to the total number of trips stored at + this bin. the set of unique label sets and their prediction value are then + returned during prediction. + + in terms of the data structure of the model, each bin is a Dictionary with + three fields, "feature_rows", "labels", and "predictions", each a list. + whereas the number and index of "feature_rows" and "labels" are assumed to + match and be idempotent across multiple training calls, the "predictions" + are over-written at each call of "fit" and are not assumed to match the number + of "feature_rows" or "labels" stored in a bin. + + historical note: the original similarity class (link above) used a nested list data + structure to capture the notion of binning. this was then copied into + a Dict when the model needed to be saved. the same technique can be re-written to + work directly on Dictionaries with no loss in the algorithm's time complexity. this + also helps when running in incremental mode to persist relevant training data and to + minimize codec + serialization errors. + + the data takes the form: + { + bin_id: { + "feature_rows": [ + [f1, f2, .., fn], + ... + ], + "labels": [ + { label1: value1, ... } + ], + "predictions": [ + { "labels": { label1: value1, ... }, 'p': p_val } + ] + } + } + where + - bin_id: str index of a bin containing similar trips, as a string + (string type for bin_id comes from mongodb object key type requirements) + - f_x: float feature value (an ordinate such as origin.x) + - label_x: str OpenPATH user label category such as "mode_confirm" + - value_x: str user-provided label for a category + - p_val: float probability of a prediction, real number in [0, 1] + + :param config: if provided, a manual configuration for testing purposes. these + values should be provided by the config file when running OpenPATH. + see config.py for more details. + """ + + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.greedy') + logging.debug(f'GreedySimilarityBinning loaded model config from file') + else: + logging.debug(f'GreedySimilarityBinning using model config argument') + + expected_keys = [ + 'metric', + 'similarity_threshold_meters', + 'apply_cutoff', + 'incremental_evaluation' + ] + for k in expected_keys: + if config.get(k) is None: + msg = f"greedy trip model config missing expected key {k}" + raise KeyError(msg) + + self.metric = eamssmt.SimilarityMetricType.from_str(config['metric']).build() + self.sim_thresh = config['similarity_threshold_meters'] + self.apply_cutoff = config['apply_cutoff'] + self.is_incremental = config['incremental_evaluation'] + + self.bins: Dict[str, Dict] = {} + + + def fit(self, trips: List[ecwc.Confirmedtrip]): + """train the model by passing data, where each row in the data + corresponds to a label at the matching index of the label input + + :param trips: 2D array of features to train from + """ + + logging.debug(f'fit called with {len(trips)} trips') + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + self._assign_bins(trips) + if len(self.bins) > 1 and self.apply_cutoff: + self._apply_cutoff() + self._generate_predictions() + + logging.info(f"greedy binning model fit to {len(trips)} rows of trip data") + + def predict(self, trip: ecwc.Confirmedtrip) -> Tuple[List[Dict], int]: + + logging.debug(f"running greedy similarity clustering") + predicted_bin_id, predicted_bin_record = self._nearest_bin(trip) + if predicted_bin_id is None: + logging.debug(f"unable to predict bin for trip {trip}") + return [], 0 + else: + predictions = predicted_bin_record['predictions'] + n_features = len(predicted_bin_record['feature_rows']) + logging.debug(f"found cluster {predicted_bin_id} with predictions {predictions}") + return predictions, n_features + + def to_dict(self) -> Dict: + return self.bins + + def from_dict(self, model: Dict): + self.bins = model + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + features = self.metric.extract_features(trip) + return features + + def _assign_bins(self, trips: List[ecwc.Confirmedtrip]): + """ + assigns each trip to a bin by greedy similarity search + [see https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L118] + + :param data: trips to assign to bins + :type data: List[Confirmedtrip] + """ + logging.debug(f"_assign_bins called with {len(trips)} trips") + for trip in trips: + trip_features = self.extract_features(trip) + trip_labels = trip['data']['user_input'] + + bin_id = self._find_matching_bin_id(trip_features) + if bin_id is not None: + # add to existing bin + logging.debug(f"adding trip to bin {bin_id} with features {trip_features}") + self.bins[bin_id]['feature_rows'].append(trip_features) + self.bins[bin_id]['labels'].append(trip_labels) + else: + # create new bin + new_bin_id = str(len(self.bins)) + new_bin_record = { + 'feature_rows': [trip_features], + 'labels': [trip_labels], + 'predictions': [] + } + logging.debug(f"creating new bin {new_bin_id} at location {trip_features}") + self.bins[new_bin_id] = new_bin_record + + def _find_matching_bin_id(self, trip_features: List[float]) -> Optional[str]: + """ + finds an existing bin where all bin features are "similar" to the incoming + trip features. + + :param trip_features: feature row for the incoming trip + :return: the id of a bin if a match was found, otherwise None + """ + for bin_id, bin_record in self.bins.items(): + matches_bin = all([self.metric.similar(trip_features, bin_sample, self.sim_thresh) + for bin_sample in bin_record['feature_rows']]) + if matches_bin: + return bin_id + return None + + def _nearest_bin(self, trip: ecwc.Confirmedtrip) -> Tuple[Optional[int], Optional[Dict]]: + """ + finds a bin which contains at least all matching features. the + first record matching by similarity measure is returned. if + none are found, (None, None) is returned. + + [see https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/load_predict.py#L46] + + :param trip: incoming trip features to test with + :return: nearest bin record, if found + """ + logging.debug(f"_nearest_bin called") + + trip_features = self.extract_features(trip) + + for bin_id, bin_record in self.bins.items(): + for bin_features in bin_record['feature_rows']: + if self.metric.similar(trip_features, bin_features, self.sim_thresh): + logging.debug(f"found nearest bin id {bin_id}") + logging.debug(f"similar: {trip_features}, {bin_features}") + return bin_id, bin_record + + return None, None + + def _apply_cutoff(self): + """ + removes small clusters by an "elbow search" heuristic. see + https://stackoverflow.com/a/2022348/4803266. + Copied over from https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L158 + """ + # the cutoff point is an index along the sorted bins. any bin with a gte + # index value is removed, as that bin has been found to be smaller than the cutoff. + # This was the last line of calc_cutoff_bins in the old code, and is moved to the equivalent of delete_bins in the new code + bins_sorted = self.bins.sort(key=lambda bin: len(bin['feature_rows']), reverse=True) + + + +# The first two lines below correspond to the original lines below in the original elbow_distance +# y = [0] * len(self.bins) +# for i in range(len(self.bins)): +# y[i] = len(self.bins[i]) + num_bins = len(bins_sorted) + bin_sizes = [len(bin_rec['feature_rows']) for bin_rec in bins_sorted.values()] + _, cutoff_bin_size = util.find_knee_point(bin_sizes) + logging.debug( + "bins = %s, elbow distance = %s" % (num_bins, cutoff_bin_size) + ) + + updated_bins = {bin_id: bin_rec + for bin_id, bin_rec in bins_sorted.items() + if len(bin_rec['feature_rows']) >= cutoff_bin_size} + + removed = len(bins_sorted) - len(updated_bins) + logging.debug( + f"removed %s bins with less than %s entries" + % (removed, cutoff_bin_size) + ) + # previous version held onto the removed bins for analysis, + # we could do that here if that use case is still relevant + self.bins = updated_bins + + def _generate_predictions(self): + """ + helper function to transform binned features and labels into predictions. + taken from [https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/build_save_model.py#L40] + + for each bin, the unique label combinations are counted. their + probability is estimated with label_count / total_labels. + """ + for _, bin_record in self.bins.items(): + user_label_df = pd.DataFrame(bin_record['labels']) + user_label_df = lp.map_labels(user_label_df).dropna() + # compute the sum of trips in this cluster + sum_trips = len(user_label_df) + # compute unique label sets and their probabilities in one cluster + # 'p' refers to probability + group_cols = user_label_df.columns.tolist() + unique_labels = user_label_df.groupby(group_cols).size().reset_index(name='uniqcount') + unique_labels['p'] = unique_labels.uniqcount / sum_trips + labels_columns = user_label_df.columns.to_list() + bin_label_combo_list = [] + for i in range(len(unique_labels)): + one_set_labels = {} + # e.g. labels_only={'mode_confirm': 'pilot_ebike', 'purpose_confirm': 'work', 'replaced_mode': 'walk'} + labels_only = {column: unique_labels.iloc[i][column] for column in labels_columns} + one_set_labels["labels"] = labels_only + one_set_labels['p'] = unique_labels.iloc[i]['p'] + # e.g. one_set_labels = {'labels': {'mode_confirm': 'walk', 'replaced_mode': 'walk', 'purpose_confirm': 'exercise'}, 'p': 1.0} + bin_label_combo_list.append(one_set_labels) + bin_record['predictions'] = bin_label_combo_list diff --git a/emission/analysis/modelling/trip_model/model_storage.py b/emission/analysis/modelling/trip_model/model_storage.py new file mode 100644 index 000000000..8e89f2419 --- /dev/null +++ b/emission/analysis/modelling/trip_model/model_storage.py @@ -0,0 +1,143 @@ +from enum import Enum +from typing import Dict, Optional +import logging +import json + +import emission.analysis.modelling.trip_model.model_type as eamum +import emission.core.wrapper.tripmodel as ecwu +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.timeseries.builtin_timeseries as estb +import pymongo + + +class ModelStorage(Enum): + """ + enumeration of model storage destinations. currently restricted to + DOCUMENT_DATABASE only. + """ + DOCUMENT_DATABASE = 0 + @classmethod + def names(cls): + return list(map(lambda e: e.name, list(cls))) + + @classmethod + def from_str(cls, str): + """ + attempts to match the provided string to a known ModelStorage type. + not case sensitive. + + :param str: a string name of a ModelType + """ + try: + str_caps = str.upper() + return cls[str_caps] + except KeyError: + names = "{" + ",".join(cls.names) + "}" + msg = f"{str} is not a known ModelStorage, must be one of {names}" + raise KeyError(msg) + +def load_model(user_id, model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]: + """load a user label model from a model storage location + + :param user_id: the user to request a model for + :param model_type: expected type of model stored + :param model_storage: storage format + :return: the model representation as a Python Dict or None + :raises: TypeError if loaded model has different type than expected type + KeyError if the ModelType is not known + """ + if model_storage == ModelStorage.DOCUMENT_DATABASE: + + # retrieve stored model with timestamp that matches/exceeds the most + # recent PipelineState.TRIP_MODEL entry + ts = esda.get_timeseries_for_user(user_id) + if not isinstance(ts, estb.BuiltinTimeSeries): + raise Exception('user model storage requires BuiltInTimeSeries') + latest_model_entry = ts.get_first_entry( + key=esda.TRIP_MODEL_STORE_KEY, + field='metadata.write_ts', + sort_order=pymongo.DESCENDING + ) + + if latest_model_entry is None: + logging.debug(f'no {model_type.name} model found for user {user_id}') + return None + + write_ts = latest_model_entry['metadata']['write_ts'] + logging.debug(f'retrieved latest trip model recorded at timestamp {write_ts}') + logging.debug(latest_model_entry) + + # parse str to enum for ModelType + latest_model_type_str = latest_model_entry.get('data', {}).get('model_type') + if latest_model_type_str is None: + raise TypeError('stored model does not have a model type') + latest_model_type = eamum.ModelType.from_str(latest_model_type_str) + + # validate and return + if latest_model_entry is None: + return None + elif latest_model_type != model_type: + msg = ( + f"loading model for user {user_id} has model type '{latest_model_type.name}' " + f"but was expected to have model type {model_type.name}" + ) + raise TypeError(msg) + else: + return latest_model_entry['data']['model'] + + else: + storage_types_str = ",".join(ModelStorage.names()) + msg = ( + f"unknown model storage type {model_storage}, must be one of " + f"{{{storage_types_str}}}" + ) + raise TypeError(msg) + +def save_model( + user_id, + model_type: eamum.ModelType, + model_data: Dict, + model_timestamp: int, + model_storage: ModelStorage = ModelStorage.DOCUMENT_DATABASE): + """saves a model to storage + + :param user_id: user associated with this model + :param model_type: type of model stored + :param model_data: data for this model to store, should be a dict + :param model_timestamp: time that model is current to + :param model_storage: type of storage to load from, defaults to ModelStorage.DATABASE + :raises TypeError: unknown ModelType + :raises IOError: failure when writing to storage medium + """ + if len(model_data) == 0: + # this wouldn't be good, esp for incremental models, because it can + # wipe out all of a model's history. save_model should be avoided at the + # call site when the model is empty. + msg = f'trip model for user {user_id} is empty but save_model called' + raise Exception(msg) + + if model_storage == ModelStorage.DOCUMENT_DATABASE: + + row = ecwu.Tripmodel() + row.model_ts = model_timestamp + row.model_type = model_type + row.model = model_data + + try: + ts = esta.TimeSeries.get_time_series(user_id) + ts.insert_data(user_id, esda.TRIP_MODEL_STORE_KEY, row) + except Exception as e: + msg = ( + f"failure storing model for user {user_id}, model {model_type.name} " + f"to the database" + ) + raise IOError(msg) from e + + else: + storage_types_str = ",".join(ModelStorage.names()) + msg = ( + f"unknown model storage type {model_storage}, must be one of " + f"{{{storage_types_str}}}" + ) + raise TypeError(msg) diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py new file mode 100644 index 000000000..b5e761fb0 --- /dev/null +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -0,0 +1,75 @@ +from __future__ import annotations +from enum import Enum +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.similarity.od_similarity as eamso +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug + + +SIMILARITY_THRESHOLD_METERS=500 + + +class ModelType(Enum): + # ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS' + GREEDY_SIMILARITY_BINNING = 'GREEDY' + + def build(self, config=None) -> eamuu.TripModel: + """ + instantiates the requested user model type with the configured + parameters. + + hey YOU! if future model types are created, they should be added here! + + :param model_type: internally-used model name (an enum) + :return: a user label prediction model + :raises KeyError: if the requested model name does not exist + """ + # Dict[ModelType, TripModel] + MODELS = { + ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config) + } + model = MODELS.get(self) + if model is None: + model_names = list(lambda e: e.name, MODELS.keys()) + models = ",".join(model_names) + raise KeyError(f"ModelType {self.name} not found in factory, please add to build method") + + return model + + @classmethod + def names(cls): + return list(map(lambda e: e.name, list(cls))) + + @property + def model_name(self): + """ + used in filenames, database tables, etc. should be + a POSIX-compliant name. + + when adding new model types, this should be set on the + right-hand side of the enum, above. + + :return: a simple name for this model type + :rtype: str + """ + return self.value + + @classmethod + def from_str(cls, str): + """attempts to match the provided string to a known ModelType + since a short name is 'nicer', we attempt to match on the enum + value first (for example, 'greedy'). as a fallback, we attempt + to match on the full ModelType name (for example, + 'GREEDY_SIMILARITY_BINNING'). not case sensitive. + + :param str: a string name of a ModelType + """ + try: + str_caps = str.upper() + return cls(str_caps) + except ValueError: + try: + return cls[str_caps] + except KeyError: + names_list = '{' + ','.join(cls.names) + '}' + msg = f'{str} is not a known ModelType, should be one of {names_list}' + raise KeyError(msg) \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py new file mode 100644 index 000000000..e3e2b1c4e --- /dev/null +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -0,0 +1,170 @@ +import logging +from typing import List, Optional +from uuid import UUID + +import time +import emission.storage.timeseries.timequery as estt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.core.wrapper.confirmedtrip as ecwc +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.pipeline_queries as epq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.timeseries.timequery as estt + + +def update_trip_model( + user_id, + model_type: eamumt.ModelType, + model_storage: eamums.ModelStorage = eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips: int = 14, + model_config = None + ): + """ + create/update a user label model for a user. + + updating the user label model occurs as a background task for the server. + trips for the user are collected and the data is fit to the requested model type. + if the model type is "incremental", only the newest trips are used. + + :param user_id: id of user + :param model_type: type of model to build. this is also stored on the database. if + there is a mismatch, an exception is thrown + :param model_storage: storage destination for built model (default DATABASE) + :param min_trips: minimum number of labeled trips per user to apply prediction (default 14) + :param model_config: optional configuration for model, for debugging purposes + """ + try: + # this timestamp is used for recording the state of the updated model + timestamp = time.time() + model = model_type.build(model_config) + + # if a previous model exists, deserialize the stored model + model_data_prev = eamums.load_model(user_id, model_type, model_storage) + stored_model_exists = model_data_prev is not None + if stored_model_exists: + model.from_dict(model_data_prev) + logging.debug(f"loaded {model_type.name} user label model for user {user_id}") + else: + logging.debug(f"building first {model_type.name} user label model for user {user_id}") + + # must call this regardless of whether model is incremental or not as it has + # the additional effect of marking the start state of the pipeline execution + time_query_from_pipeline = epq.get_time_query_for_trip_model(user_id) + time_query = time_query_from_pipeline if model.is_incremental else None + logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}') + logging.debug(f'time query for training data collection: {time_query}') + + trips = _get_training_data(user_id, time_query) + + # don't start training for a user that doesn't have at least $trips many trips + # (assume if a stored model exists for the user, that they met this requirement previously) + if len(trips) == 0: + msg = f"no new confirmed trips found in database to train model for user {user_id}" + logging.debug(msg) + epq.mark_trip_model_failed(user_id) + elif not stored_model_exists and not len(trips) >= min_trips: + msg = ( + f"Total: {len(trips)}, labeled: {len(trips)}, user " + f"{user_id} doesn't have enough valid trips for further analysis." + ) + logging.debug(msg) + epq.mark_trip_model_failed(user_id) + else: + + # train and store the model + model.fit(trips) + model_data_next = model.to_dict() + + if len(model_data_next) == 0: + epq.mark_trip_model_failed(user_id) + msg = f"trip model for user {user_id} is empty" + raise Exception(msg) + + last_done_ts = _latest_timestamp(trips) + eamums.save_model(user_id, model_type, model_data_next, last_done_ts, model_storage) + logging.debug(f"{model_type.name} label prediction model built for user {user_id} with timestamp {last_done_ts}") + + epq.mark_trip_model_done(user_id, last_done_ts) + + except Exception as e: + epq.mark_trip_model_failed(user_id) + msg = ( + f"failure updating user label pipeline state for user {user_id}" + ) + raise IOError(msg) from e + + +def predict_labels_with_n( + trip: ecwc.Confirmedtrip, + model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage = eamums.ModelStorage.DOCUMENT_DATABASE, + model_config = None): + """ + invoke the user label prediction model to predict labels for a trip. + + :param trip: the trip to predict labels for + :param model_type: type of prediction model to run + :param model_storage: location to read/write models + :param model_config: optional configuration for model, for debugging purposes + :return: a list of predictions + """ + user_id = trip['user_id'] + model = _load_stored_trip_model(user_id, model_type, model_storage, model_config) + if model is None: + return [], -1 + else: + predictions, n = model.predict(trip) + return predictions, n + + +def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]): + """ + load the labeled trip data for this user, subject to a time query. if the user + does not have at least $min_trips trips with labels, then return an empty list. + + :param user_id: user to collect trips from + :param time_query: query to restrict the time (optional) + """ + + ts = esta.TimeSeries.get_time_series(user_id) + trips = list(ts.find_entries([esda.CONFIRMED_TRIP_KEY], time_query=time_query)) + print(f'found {len(trips)} training rows') + labeled_trips = [trip for trip in trips if trip['data']['user_input'] != {}] + + logging.debug(f'found {len(labeled_trips)} labeled trips for user {user_id}') + return labeled_trips + + +def _load_stored_trip_model( + user_id, + model_type: eamumt.ModelType, + model_storage: eamums.ModelStorage, + model_config = None) -> Optional[eamuu.TripModel]: + """helper to build a user label prediction model class with the + contents of a stored model for some user. + + :param user_id: user to retrieve the model for + :param model_type: TripModel type configured for this OpenPATH server + :param model_storage: storage type + :param model_config: optional configuration for model, for debugging purposes + :return: model, or None if no model is stored for this user + """ + model_dict = eamums.load_model(user_id, model_type, model_storage) + if model_dict is None: + return None + else: + model = model_type.build(model_config) + model.from_dict(model_dict) + return model + + +def _latest_timestamp(trips: List[ecwc.Confirmedtrip]) -> float: + """extract the latest timestamp observed from a list of trips + + :param trips: the trips to review + :return: the latest timestamp + """ + ts = sorted(trips, key=lambda r: r['data']['end_ts'], reverse=True)[0]['data']['end_ts'] + return ts \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/trip_model.py b/emission/analysis/modelling/trip_model/trip_model.py new file mode 100644 index 000000000..b0ad2ab3a --- /dev/null +++ b/emission/analysis/modelling/trip_model/trip_model.py @@ -0,0 +1,80 @@ +from abc import ABCMeta, abstractmethod +from typing import Dict, List, Tuple + +import emission.core.wrapper.confirmedtrip as ecwc + + +class TripModel(metaclass=ABCMeta): + + @abstractmethod + def fit(data: List[List[float]]): + """ + train the model on data in an unsupervised learning setting. + + :param data: 2D array of features to train from + :type data: List[List[float]] + """ + pass + + @abstractmethod + def predict(self, data: List[float]) -> Tuple[List[Dict], int]: + """use this model to predict labels for some data + + :param data: a single row of features in the model's feature space + :type data: List[float] + :return: the predictions and the total count of observations + :rtype: Tuple[List[Prediction], int] + """ + pass + + @abstractmethod + def to_dict(self) -> Dict: + """ + export the model as a python Dict, to be stored via the file + system or a document database. + + should be serializable. supported types at this time + (2022-05-19) include all built-in Python types and Numpy types. + + :return: the model as a Dict + :rtype: Dict + """ + pass + + @abstractmethod + def from_dict(self, model: Dict): + """ + import the model from a python Dict that was stored in the file + system or a database. forms a codec which should be idempotent + when composed with to_dict. + + :param model: the model as a python Dict + :type model: Dict + """ + pass + + @property + @abstractmethod + def is_incremental(self) -> bool: + """ + whether this model requires the complete user history to build (False), + or, if only the incremental data since last execution is required (True). + + :return: if the model is incremental. the current timestamp will be recorded + in the analysis pipeline. the next call to this model will only include + trip data for trips later than the recorded timestamp. + :rtype: bool + """ + pass + + @abstractmethod + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + """ + extract the relevant features for learning from a trip for this model instance + + :param trip: the trip to extract features from + :type trip: Confirmedtrip + :return: a vector containing features to predict from + :rtype: List[float] + """ + pass diff --git a/emission/analysis/modelling/trip_model/util.py b/emission/analysis/modelling/trip_model/util.py new file mode 100644 index 000000000..7d22b5d22 --- /dev/null +++ b/emission/analysis/modelling/trip_model/util.py @@ -0,0 +1,41 @@ +from typing import List, Tuple +from past.utils import old_div +import numpy +from numpy.linalg import norm + + +def find_knee_point(values: List[float]) -> Tuple[float, int]: + """for a list of values, find the value which represents the cut-off point + or "elbow" in the function when values are sorted. + + copied from original similarity algorithm. permalink: + [https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L256] + + with `y` passed in as `values` + based on this stack overflow answer: https://stackoverflow.com/a/2022348/4803266 + And summarized by the statement: "A quick way of finding the elbow is to draw a + line from the first to the last point of the curve and then find the data point + that is farthest away from that line." + + :param values: list of values from which to select a cut-off + :type values: List[float] + :return: the index and value to use as a cutoff + :rtype: Tuple[int, float] + """ + N = len(values) + x = list(range(N)) + max = 0 + index = -1 + a = numpy.array([x[0], values[0]]) + b = numpy.array([x[-1], values[-1]]) + n = norm(b - a) + new_y = [] + for i in range(0, N): + p = numpy.array([x[i], values[i]]) + dist = old_div(norm(numpy.cross(p - a, p - b)), n) + new_y.append(dist) + if dist > max: + max = dist + index = i + value = values[index] + return [index, value] diff --git a/emission/core/common.py b/emission/core/common.py index a5d7c777b..4d97ce681 100644 --- a/emission/core/common.py +++ b/emission/core/common.py @@ -40,8 +40,14 @@ def travel_date_time(time1,time2): return travel_time.seconds def calDistance(point1, point2, coordinates=False): + """haversine distance - earthRadius = 6371000 + :param point1: a coordinate in degrees WGS84 + :param point2: another coordinate in degrees WGS84 + :param coordinates: if false, expect a list of coordinates, defaults to False + :return: distance approximately in meters + """ + earthRadius = 6371000 # meters # SHANKARI: Why do we have two calDistance() functions? # Need to combine into one # points are now in geojson format (lng,lat) diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index a421279c5..635a03947 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -131,6 +131,8 @@ def _getData2Wrapper(): "inference/prediction": "modeprediction", # the predicted labels for a particular trip (one entry per algorithm) "inference/labels": "labelprediction", + # the serialized trip model for user label prediction + "inference/trip_model": "tripmodel", # equivalent of cleaned_section, but with the mode set to the # inferred mode instead of just walk/bike/motorized # used for consistency and to make the client work whether or not we were diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index dc5d882ba..1622675a9 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -19,6 +19,7 @@ class PipelineStages(enum.Enum): CLEAN_RESAMPLING = 11 MODE_INFERENCE = 4 LABEL_INFERENCE = 14 + TRIP_MODEL = 16 EXPECTATION_POPULATION = 15 CREATE_CONFIRMED_OBJECTS = 13 TOUR_MODEL = 5 diff --git a/emission/core/wrapper/tripmodel.py b/emission/core/wrapper/tripmodel.py new file mode 100644 index 000000000..d2c16198a --- /dev/null +++ b/emission/core/wrapper/tripmodel.py @@ -0,0 +1,20 @@ +# Based on modeprediction.py +from emission.analysis.modelling.trip_model.model_type import ModelType +import emission.core.wrapper.wrapperbase as ecwb + + +class Tripmodel(ecwb.WrapperBase): + props = { + "model_type": ecwb.WrapperBase.Access.WORM, # emission.analysis.modelling.trip_model.model_type.py + "model": ecwb.WrapperBase.Access.WORM, # the (serialized) state of the model for this trip + "model_ts": ecwb.WrapperBase.Access.WORM, # timestamp that model is "current" to wrt input data + } + + enums = { + "model_type": ModelType + } + geojson = {} + local_dates = {} + + def _populateDependencies(self): + pass diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index 839ebedfc..9f8ab6a70 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -37,6 +37,7 @@ METRICS_DAILY_USER_MEDIAN_SPEED = "metrics/daily_user_median_speed" METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed" INFERRED_LABELS_KEY = "inference/labels" +TRIP_MODEL_STORE_KEY = "inference/trip_model" # General methods diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index ae77199bd..b154b5f47 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -121,6 +121,22 @@ def mark_mode_inference_complete(user_id): def mark_mode_inference_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.MODE_INFERENCE) +def get_time_query_for_trip_model(user_id): + tq = get_time_range_for_stage(user_id, ps.PipelineStages.TRIP_MODEL) + if tq.startTs is None: + # time_query=None, request all confirmed trips for user + return None + else: + # key off of Confirmedtrip end timestamp for the provided time range + tq.timeType = 'data.end_ts' + return tq + +def mark_trip_model_done(user_id, last_ts): + mark_stage_done(user_id, ps.PipelineStages.TRIP_MODEL, last_ts + END_FUZZ_AVOID_LTE) + +def mark_trip_model_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.TRIP_MODEL) + def get_time_range_for_confirmed_object_creation(user_id): tq = get_time_range_for_stage(user_id, ps.PipelineStages.CREATE_CONFIRMED_OBJECTS) tq.timeType = "data.end_ts" diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index ac95aa0a1..9a6f756c3 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -86,6 +86,7 @@ def __init__(self, user_id): "metrics/daily_mean_median_speed": self.analysis_timeseries_db, "inference/prediction": self.analysis_timeseries_db, "inference/labels": self.analysis_timeseries_db, + "inference/trip_model": self.analysis_timeseries_db, "analysis/inferred_section": self.analysis_timeseries_db, "analysis/inferred_labels": self.analysis_timeseries_db, "analysis/inferred_trip": self.analysis_timeseries_db, @@ -298,6 +299,26 @@ def to_data_df(key, entry_it, map_fn = None): return deduped_df.reset_index(drop=True) + def get_first_entry(self, key, field, sort_order, time_query=None): + """gets the first entry with the provided key when sorted by some field + + :param key: the metadata key for the entries, used to identify the stream + :param field: the field in the stream whose max value we want. + :param sort_order: pymongo.ASCENDING or pymongon.DESCENDING + :param time_query: the time range in which to search the stream + :return: a database row, or None if no match is found + """ + find_query = self._get_query([key], time_query) + result_it = self.get_timeseries_db(key).find(find_query).sort(field, sort_order).limit(1) + result_list = list(result_it) + if len(result_list) == 0: + return None + else: + first_entry = result_list[0] + del first_entry['_id'] + return first_entry + + def get_first_value_for_field(self, key, field, sort_order, time_query=None): """ Currently used to get the max value of the location values so that we can send data @@ -310,13 +331,11 @@ def get_first_value_for_field(self, key, field, sort_order, time_query=None): It is assumed that the values for the field are sortable. :return: the max value for the field in the stream identified by key. -1 if there are no entries for the key. """ - result_it = self.get_timeseries_db(key).find(self._get_query([key], time_query), - {"_id": False, field: True}).sort(field, sort_order).limit(1) - result_list = list(result_it) - if len(result_list) == 0: + retVal = self.get_first_entry(key, field, sort_order, time_query) + if retVal is None: return -1 - retVal = result_list[0] + # extract the specified field from the entry that was found field_parts = field.split(".") for part in field_parts: retVal = retVal[part] diff --git a/emission/storage/timeseries/timequery.py b/emission/storage/timeseries/timequery.py index c3378417b..950e0adb0 100644 --- a/emission/storage/timeseries/timequery.py +++ b/emission/storage/timeseries/timequery.py @@ -22,3 +22,5 @@ def get_query(self): ret_query[time_key].update({"$gte": self.startTs}) return ret_query + def __repr__(self): + return f"TimeQuery {self.timeType} with range [{self.startTs}, {self.endTs})" diff --git a/emission/tests/modellingTests/TestBackwardsCompat.py b/emission/tests/modellingTests/TestBackwardsCompat.py new file mode 100644 index 000000000..b81b5f529 --- /dev/null +++ b/emission/tests/modellingTests/TestBackwardsCompat.py @@ -0,0 +1,207 @@ +import unittest +import emission.analysis.modelling.tour_model_first_only.load_predict as lp +import emission.analysis.modelling.tour_model.similarity as oursim +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.analysis.modelling.similarity.od_similarity as eamso +import emission.analysis.modelling.tour_model_first_only.build_save_model as eamtb +import emission.analysis.modelling.tour_model_first_only.load_predict as eamtl +import json +import logging +import numpy as np +import pandas as pd +import emission.core.common as ecc +import emission.core.wrapper.entry as ecwe + +# +# Test to see if the new implementations are consistent with the old implementations +# + +class TestBackwardsCompat(unittest.TestCase): + def setUp(self) -> None: + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + def testAnyVsAllWhilePredicting(self): + trip_coords = (8,12) + trips = [] + for i in range(trip_coords[0], trip_coords[1], 1): + trips.append(ecwe.Entry({"data": {"start_loc": {"coordinates": [i/10,i/10]}, + "end_loc": {"coordinates": [i/10+0.1, i/10+0.1]}, + "user_input": {"mode_confirm": "walk", "purpose_confirm": "exercise"}}, + "metadata": {"key": "analysis/confirmed_trip"}})) + distanceMatrix = np.zeros((len(trips), len(trips))) + for i, trip1 in enumerate(trips): + for j, trip2 in enumerate(trips): + distanceMatrix[i][j] = ecc.calDistance( + trip1.data.start_loc["coordinates"], + trip2.data.start_loc["coordinates"]) + logging.debug("For the test trips, distance matrix is") + logging.debug("%s" % pd.DataFrame(distanceMatrix)) + +# 0 1 2 3 4 +# 0 0.000000 15724.471142 31448.726739 47172.742840 62896.495491 +# 1 15724.471142 0.000000 15724.255604 31448.271720 47172.024395 +# 2 31448.726739 15724.255604 0.000000 15724.016124 31447.768817 +# 3 47172.742840 31448.271720 15724.016124 0.000000 15723.752703 +# 4 62896.495491 47172.024395 31447.768817 15723.752703 0.000000 +# . +# So let's pick a threshold of 16000. With the "any" approach, all of them will +# be in one bin, with the "all" approach, we will end up with multiple bins + old_builder = oursim.similarity(trips, 16000, + shouldFilter=False, cutoff=False) + old_builder.bin_data() + old_bins = old_builder.bins + logging.debug("old bins = %s" % old_bins) +# old bins = [[0, 1], [2, 3]] + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 16000, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + new_builder = eamtg.GreedySimilarityBinning(model_config) + new_builder.fit(trips) + new_bins = new_builder.bins + logging.debug("new bins = %s" % new_bins) + self.assertEqual(len(old_bins), len(new_bins), + f"old bins = {old_bins} but new_bins = {new_bins}") + + @staticmethod + def old_predict_with_n(trip, bin_locations, user_labels, cluster_sizes, RADIUS): + logging.debug(f"At stage: first round prediction") + pred_bin = eamtl.find_bin(trip, bin_locations, RADIUS) + logging.debug(f"At stage: matched with bin {pred_bin}") + + if pred_bin == -1: + logging.info(f"No match found for {trip['data']['start_loc']} early return") + return [], 0 + + user_input_pred_list = user_labels[pred_bin] + this_cluster_size = cluster_sizes[pred_bin] + logging.debug(f"At stage: looked up user input {user_input_pred_list}") + return user_input_pred_list, this_cluster_size + + def testRandomTripsWithinTheSameThreshold(self): + label_data = { + "mode_confirm": ['walk', 'bike', 'transit'], + "purpose_confirm": ['work', 'home', 'school'], + "replaced_mode": ['drive'] + } + + n = 60 + trips = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(0, 0), + destination=(1, 1), + label_data=label_data, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + + # These fields should ignored for the first round, but are extracted anyway + # So let's fill them in with dummy values + for t in trips: + t["data"]["distance"] = 1000 + t["data"]["duration"] = 10 + + train = trips[0:50] + test = trips[50:60] + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + new_model = eamtg.GreedySimilarityBinning(model_config) + new_model.fit(train) + + old_builder = oursim.similarity(train, 500, + shouldFilter=False, cutoff=False) + old_builder.fit() + + self.assertEqual(len(old_builder.bins), len(new_model.bins), + f"old bins = {old_builder.bins} but new_bins = {new_model.bins}") + + self.assertEqual(len(old_builder.bins), 1, + f"all trips within threshold, so expected one bin, found {len(old_builder.bins)}") + + old_user_inputs = eamtb.create_user_input_map(train, old_builder.bins) + old_location_map = eamtb.create_location_map(train, old_builder.bins) + old_cluster_sizes = {k: len(old_location_map[k]) for k in old_location_map} + + for test_trip in test: + new_results, new_n = new_model.predict(test_trip) + old_results, old_n = TestBackwardsCompat.old_predict_with_n(test_trip, + old_location_map, old_user_inputs, old_cluster_sizes, 500) + + self.assertEqual(old_n, new_n, + f"for test trip {test_trip} old n = {old_n} and new_n = {new_n}") + + self.assertEqual(old_results, new_results, + f"for test trip {test_trip} old result = {old_results} and new result = {new_results}") + + def testRandomTripsOutsideTheSameThreshold(self): + label_data = { + "mode_confirm": ['walk', 'bike', 'transit'], + "purpose_confirm": ['work', 'home', 'school'], + "replaced_mode": ['drive'] + } + + n = 60 + trips = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(0, 0), + destination=(1, 1), + label_data=label_data, + threshold=0.1, # Much bigger than the 500m threshold, so we will get multiple bins + ) + + # These fields should ignored for the first round, but are extracted anyway + # So let's fill them in with dummy values + for t in trips: + t["data"]["distance"] = 1000 + t["data"]["duration"] = 10 + + train = trips[0:50] + test = trips[50:60] + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + new_model = eamtg.GreedySimilarityBinning(model_config) + new_model.fit(train) + + old_builder = oursim.similarity(train, 500, + shouldFilter=False, cutoff=False) + old_builder.fit() + + logging.debug(f"old bins = {len(old_builder.bins)} but new_bins = {len(new_model.bins)}") + + self.assertEqual(len(old_builder.bins), len(new_model.bins), + f"old bins = {old_builder.bins} but new_bins = {new_model.bins}") + + old_user_inputs = eamtb.create_user_input_map(train, old_builder.bins) + old_location_map = eamtb.create_location_map(train, old_builder.bins) + old_cluster_sizes = {k: len(old_location_map[k]) for k in old_location_map} + + for test_trip in test: + new_results, new_n = new_model.predict(test_trip) + old_results, old_n = TestBackwardsCompat.old_predict_with_n(test_trip, + old_location_map, old_user_inputs, old_cluster_sizes, 500) + + self.assertEqual(old_n, new_n, + f"for test trip {test_trip} old n = {old_n} and new_n = {new_n}") + + self.assertEqual(old_results, new_results, + f"for test trip {test_trip} old result = {old_results} and new result = {new_results}") +if __name__ == '__main__': + unittest.main() + + diff --git a/emission/tests/modellingTests/TestGreedySimilarityBinning.py b/emission/tests/modellingTests/TestGreedySimilarityBinning.py new file mode 100644 index 000000000..32bed47aa --- /dev/null +++ b/emission/tests/modellingTests/TestGreedySimilarityBinning.py @@ -0,0 +1,129 @@ +import unittest +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg +import emission.tests.modellingTests.modellingTestAssets as etmm +import logging + + +class TestGreedySimilarityBinning(unittest.TestCase): + + def setUp(self) -> None: + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + def testBinning(self): + """ + when $should_be_grouped trips are the same, they should appear in a bin + """ + label_data = { + "mode_confirm": ['walk', 'bike', 'transit'], + "purpose_confirm": ['work', 'home', 'school'], + "replaced_mode": ['drive'] + } + + # generate $n trips. $m of them should have origin and destinations sampled + # within a radius that should have them binned. + n = 20 + m = 5 + trips = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(0, 0), + destination=(1, 1), + label_data=label_data, + within_threshold=m, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + + # pass in a test configuration to the binning algorithm + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + model = eamtg.GreedySimilarityBinning(model_config) + + model.fit(trips) + + # $m trip features should appear together in one bin + at_least_one_large_bin = any(map(lambda b: len(b['feature_rows']) == m, model.bins.values())) + self.assertTrue(at_least_one_large_bin, "at least one bin should have at least 5 features in it") + + def testPrediction(self): + """ + training and testing with similar trips should lead to a positive bin match + """ + label_data = { + "mode_confirm": ['skipping'], + "purpose_confirm": ['pizza_party'], + "replaced_mode": ['crabwalking'] + } + + n = 6 + trips = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(0, 0), + destination=(1, 1), + label_data=label_data, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + model = eamtg.GreedySimilarityBinning(model_config) + + train = trips[0:5] + test = trips[5] + + model.fit(train) + results, n = model.predict(test) + + self.assertEqual(len(results), 1, "should have found a matching bin") + self.assertEqual(n, len(train), "that bin should have had the whole train set in it") + + def testNoPrediction(self): + """ + when trained on trips in Colorado, shouldn't have a prediction for a trip in Alaska + """ + label_data = { + "mode_confirm": ['skipping'], + "purpose_confirm": ['pizza_party'], + "replaced_mode": ['crabwalking'] + } + + n = 5 + train = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(39.7645187, -104.9951944), # Denver, CO + destination=(39.7435206, -105.2369292), # Golden, CO + label_data=label_data, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + test = etmm.generate_mock_trips( + user_id="joe", + trips=1, + origin=(61.1042262, -150.5611644), # Anchorage, AK + destination=(62.2721466, -150.3233046), # Talkeetna, AK + label_data=label_data, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, # meters, + "apply_cutoff": False, + "incremental_evaluation": False + } + model = eamtg.GreedySimilarityBinning(model_config) + + model.fit(train) + results, n = model.predict(test[0]) + + self.assertEqual(len(results), 0, "should not have found a matching bin") + self.assertEqual(n, 0, "the number of features in an empty bin is zero") diff --git a/emission/tests/modellingTests/TestRunGreedyIncrementalModel.py b/emission/tests/modellingTests/TestRunGreedyIncrementalModel.py new file mode 100644 index 000000000..9f8b78254 --- /dev/null +++ b/emission/tests/modellingTests/TestRunGreedyIncrementalModel.py @@ -0,0 +1,198 @@ +import unittest +import logging +import json +import numpy as np +import uuid +import time +import pandas as pd +import bson.json_util as bju + +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.similarity.od_similarity as eamso +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esdatq +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.core.wrapper.entry as ecwe +import emission.core.get_database as edb + + +class TestRunGreedyModel(unittest.TestCase): + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data from a + test set of Confirmedtrip entries + """ + logging.basicConfig( + format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips + self.user_id = uuid.UUID('aa9fdec9-2944-446c-8ee2-50d79b3044d3') + self.ts = esta.TimeSeries.get_time_series(self.user_id) + self.new_trips_per_invocation = 3 + self.model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING + self.model_storage = eamums.ModelStorage.DOCUMENT_DATABASE + sim_threshold = 500 # meters + self.greedy_model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": sim_threshold, + "apply_cutoff": False, + "incremental_evaluation": True + } + + existing_entries_for_user = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY])) + if len(existing_entries_for_user) != 0: + raise Exception(f"test invariant failed, there should be no entries for user {self.user_id}") + + # load in trips from a test file source + input_file = 'emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips' + with open(input_file, 'r') as f: + trips_json = json.loads(f.read(), object_hook=bju.object_hook) + trips = [ecwe.Entry(r) for r in trips_json] + logging.debug(f'loaded {len(trips)} trips from {input_file}') + self.ts.bulk_insert(trips) + + # confirm write to database succeeded + self.initial_data = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY])) + if len(self.initial_data) == 0: + logging.debug(f'test setup failed while loading trips from file') + self.fail() + + logging.debug('writing initial trip model') + # there are 4 labelled trips in the file. 2 of these trips are "similar" + # within 500 meters, the other two are not. + eamur.update_trip_model( + user_id=self.user_id, + model_type=self.model_type, + model_storage=self.model_storage, + min_trips=4, + model_config=self.greedy_model_config + ) + + logging.debug(f'setup: found {len(self.initial_data)} trips in database') + + # determine which trips are similar, and find the + # centroid of their origins and destinations to build + # new similar trips from + metric = eamso.OriginDestinationSimilarity() + features = [] + for trip in self.initial_data: + f = metric.extract_features(trip) + features.append(f) + + # 2022-07-07 rjf: the Confirmedtrip dataset used here has 6 trips (initially) + # but only 2 are "similar" within 500 meters. here we dynamically dis- + # include trip 6. set up like this in case we have to switch datasets + # in the future (as long as the outliers are not similar!) + # 2022-07-11 rjf: ooh, let's remove the ones without labels too + similarity_matrix = [[metric.similar(t1, t2, sim_threshold) + for t1 in features] + for t2 in features] + + # let's see what's going on here + trips_df = pd.DataFrame(similarity_matrix) + trips_df['labels?'] = [len(t['data']['user_input']) > 0 for t in self.initial_data] + logging.debug("test data similarity matrix") + logging.debug("\n%s" % trips_df) + + # 0 1 2 3 4 5 labels? + # 0 True True True True False False True + # 1 True True True True False False False + # 2 True True True True False False False + # 3 True True True True False False True + # 4 False False False False True False True + # 5 False False False False False True True + + # trip 0 and 3 are similar and will form bin 0 + # trip 1 and 2 have no labels and will be ignored + # trips 4 and 5 are both dis-similar from the rest and will form singleton bins + + self.similar_trips = [] + self.similar_features = [] + for idx, f in enumerate(self.initial_data): + has_labels = len(self.initial_data[idx]['data']['user_input']) > 0 + sim = [similarity_matrix[idx][i] for i in range(len(features)) if i != idx] + similar = any(sim) + if has_labels and similar: + self.similar_trips.append(self.initial_data[idx]) + self.similar_features.append(features[idx]) + + # after running, how many trips should be stored together in a similar bin? + self.initial_similar_trips = len(self.similar_trips) + self.expected_trips = self.initial_similar_trips + self.new_trips_per_invocation + logging.debug(f"end of test, expecting {self.expected_trips} trips") + + # find the centroid of the similar trip data + src_x, src_y, dst_x, dst_y = np.mean(self.similar_features, axis=0) + self.origin = [src_x, src_y] + self.destination = [dst_x, dst_y] + + + def tearDown(self): + """ + clean up database entries related to this test + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testIncrementalRun(self): + """ + incremental trip models train from Confirmedtrip entries at most + once. to test this behavior, a model is built based on a small + Confirmedtrip dataset stored at a file location (See setUp, above). + this happens once and is not repeated when the test is re-run, + unless a new database instance is spun up. within the test method, + an additional few mock trips are generated with a later timestamp. + the training model should 1) only see the new trips, 2) have been + trained on the expected number of trips at completion. + """ + # create a new trip sampling from the centroid of the trips that + # are in bin '0', which has two similar and labeled trips. + label_data = etmm.extract_trip_labels(self.similar_trips) + new_trips = etmm.generate_mock_trips( + user_id=self.user_id, + trips=self.new_trips_per_invocation, + origin=self.origin, + destination=self.destination, + label_data=label_data, + threshold=0.0001, # ~10m, + start_ts=time.time() - 20, + end_ts=time.time() - 10 + ) + + self.ts.bulk_insert(new_trips) + all_trips = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY])) + logging.debug(f'total of {len(all_trips)} now stored in database') + + # train the new model on the complete collection of trips + eamur.update_trip_model( + user_id=self.user_id, + model_type=self.model_type, + model_storage=self.model_storage, + min_trips=self.initial_similar_trips, + model_config=self.greedy_model_config + ) + updated_model = eamur._load_stored_trip_model( + self.user_id, + model_type=self.model_type, + model_storage=self.model_storage, + model_config=self.greedy_model_config + ) + + # the 5th and 6th trip in the original dataset were outliers and should form their own cluster + self.assertEqual(len(updated_model.bins), 3, + 'there should be three bins, one with 2 similar trips, and two singleton bins') + + trips_in_bin = len(updated_model.bins['0']['feature_rows']) + print(f'trips in bins: {[len(x["feature_rows"]) for x in updated_model.bins.values()]}') + self.assertEqual(trips_in_bin, self.expected_trips, + 'expected number of trips stored in bin') + + self.assertEqual(len(updated_model.bins['1']['feature_rows']), 1, + 'the second bin should have exactly one entry (an outlier)') + self.assertEqual(len(updated_model.bins['2']['feature_rows']), 1, + 'the third bin should have exactly one entry (an outlier)') + \ No newline at end of file diff --git a/emission/tests/modellingTests/TestRunGreedyModel.py b/emission/tests/modellingTests/TestRunGreedyModel.py new file mode 100644 index 000000000..c5b20deb0 --- /dev/null +++ b/emission/tests/modellingTests/TestRunGreedyModel.py @@ -0,0 +1,172 @@ +import unittest +import logging + +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.storage.pipeline_queries as epq +import emission.core.wrapper.pipelinestate as ecwp + + +class TestRunGreedyModel(unittest.TestCase): + """these tests were copied forward during a refactor of the tour model + [https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/load_predict.py#L114] + + it's uncertain what condition they are in besides having been refactored to + use the more recent tour modeling code. + """ + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data + """ + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunGreedyModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # bins must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + # for a negative test, below + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + + # test data can be saved between test invocations, check if data exists before generating + ts = esta.TimeSeries.get_time_series(user_id) + test_data = list(ts.find_entries(["analysis/confirmed_trip"])) + if len(test_data) == 0: + # generate test data for the database + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + """ + clean up database + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testBuildGreedyModelFromConfig(self): + """ + greedy model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + eamumt.ModelType.GREEDY_SIMILARITY_BINNING.build() + # success if it didn't throw + + def testTrainGreedyModelWithZeroTrips(self): + """ + greedy model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + # pass along debug model configuration + greedy_model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, + "apply_cutoff": False, + "incremental_evaluation": False + } + + logging.debug(f'~~~~ do nothing ~~~~') + eamur.update_trip_model( + user_id=self.unused_user_id, + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=greedy_model_config + ) + + # user had no entries so their pipeline state should not have been set + # if it was set, the time query here would + stage = ecwp.PipelineStages.TRIP_MODEL + pipeline_state = epq.get_current_state(self.unused_user_id, stage) + self.assertIsNone( + pipeline_state['curr_run_ts'], + "pipeline should not have a current timestamp for the test user") + + + def test1RoundTripGreedySimilarityBinning(self): + """ + train a model, save it, load it, and use it for prediction, using + the high-level training/testing API provided via + run_model.py:update_trip_model() # train + run_model.py:predict_labels_with_n() # test + + for clustering, use the default greedy similarity binning model + """ + + # pass along debug model configuration + greedy_model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, + "apply_cutoff": False, + "incremental_evaluation": False + } + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=greedy_model_config + ) + + logging.debug(f'(TEST) testing prediction of stored model') + test = etmm.build_mock_trip( + user_id=self.user_id, + origin=self.origin, + destination=self.destination + ) + prediction, n = eamur.predict_labels_with_n( + trip = test, + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=greedy_model_config + ) + + [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] + + self.assertNotEqual(len(prediction), 0, "should have a prediction") + diff --git a/emission/tests/modellingTests/TestSimilarityMetric.py b/emission/tests/modellingTests/TestSimilarityMetric.py new file mode 100644 index 000000000..ae37fc39a --- /dev/null +++ b/emission/tests/modellingTests/TestSimilarityMetric.py @@ -0,0 +1,31 @@ +import unittest +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.analysis.modelling.similarity.od_similarity as eamso + +class TestSimilarityMetric(unittest.TestCase): + + def testODsAreSimilar(self): + generate_points_thresh = 0.001 # approx. 111 meters + similarity_threshold = 500 # + # random, but, points are sampled within a circle and should always be < sim threshold + trips = etmm.generate_mock_trips('bob', 2, [0, 0], [1, 1], threshold=generate_points_thresh) + metric = eamso.OriginDestinationSimilarity() + coords0 = metric.extract_features(trips[0]) + coords1 = metric.extract_features(trips[1]) + similar = metric.similar(coords0, coords1, similarity_threshold) + self.assertTrue(similar) + + def testODsAreNotSimilar(self): + generate_points_thresh = 0.001 # approx. 111 meters + similarity_threshold = 500 # + + trips0 = etmm.generate_mock_trips('bob', 1, [0, 0], [1, 1], threshold=generate_points_thresh) + trips1 = etmm.generate_mock_trips('alice', 1, [2, 2], [3, 3], threshold=generate_points_thresh) + metric = eamso.OriginDestinationSimilarity() + coords0 = metric.extract_features(trips0[0]) + coords1 = metric.extract_features(trips1[0]) + similar = metric.similar(coords0, coords1, similarity_threshold) + self.assertFalse(similar) + +if __name__ == '__main__': + unittest.main() diff --git a/emission/tests/modellingTests/__init__.py b/emission/tests/modellingTests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/tests/modellingTests/modellingTestAssets.py b/emission/tests/modellingTests/modellingTestAssets.py new file mode 100644 index 000000000..879a3a2ca --- /dev/null +++ b/emission/tests/modellingTests/modellingTestAssets.py @@ -0,0 +1,204 @@ +import random +from typing import Optional, Tuple, List, Dict +from uuid import UUID +import emission.analysis.modelling.trip_model.trip_model as eamtm +import emission.core.wrapper.confirmedtrip as ecwc + +import emission.core.wrapper.entry as ecwe +import time +import math + + +def generate_trip_coordinates( + ref_coords: Tuple[float, float], + within_threshold: bool, + threshold: float, + max: float = 0.1 # approx. 10km in WGS84 + ) -> Tuple[float, float]: + """generates trip coordinate data to use when mocking a set of trip data. + + :param ref_coords: reference coordinates to use as the center of the sampling circle + :param within_threshold: how many of these trips are within some distance threshold + :param threshold: the distance threshold, in WGS84 + :param max: max distance, in WGS84, defaults to 0.1 (approx. 10km) + :return: generated coordinate pairs sampled in a + circle from some coordinates up to some threshold + """ + angle = 2 * math.pi * random.random() + radius_threshold = threshold / 2 + radius = random.uniform(0, radius_threshold) if within_threshold else random.uniform(radius_threshold, max) + x = radius * math.cos(angle) + ref_coords[0] + y = radius * math.sin(angle) + ref_coords[1] + return (x, y) + + +def extract_trip_labels(trips: List[ecwc.Confirmedtrip]) -> Dict: + """ + helper to build the `label_data` argument for the generate_mock_trips + function below. reads all entries from a list of Confirmedtrip entries. + + :param trips: the trips to read from + :return: label_data + """ + keys = ['mode_confirm', 'purpose_confirm', 'replaced_mode'] + result = {k: set() for k in keys} + for k in keys: + for t in trips: + entry = t['data']['user_input'].get(k) + if entry is not None: + result[k].add(entry) + for k in result.keys(): + result[k] = list(result[k]) + return result + + +def sample_trip_labels( + mode_labels, + purpose_labels, + replaced_mode_labels, + mode_weights=None, + purpose_weights=None, + replaced_mode_weights=None): + """samples trip labels + + :param mode_labels: labels for mode_confirm + :param purpose_labels: labels for purpose_confirm + :param replaced_mode_labels: labels for replaced_mode + :param mode_weights: sample weights, defaults to None, see random.choices "weights" + :param purpose_weights: sample weights, defaults to None for uniform sampling + :param replaced_mode_weights: sample weights, defaults to None + :return: sampled trip labels + """ + user_inputs = [ + ('mode_confirm', mode_labels, mode_weights), + ('replaced_mode', replaced_mode_labels, replaced_mode_weights), + ('purpose_confirm', purpose_labels, purpose_weights) + ] + + result = {} + for key, labels, weights in user_inputs: + if len(labels) > 0: + if weights is None: + weights = [1.0 / len(labels) for i in range(len(labels))] + samples = random.choices(population=labels,k=1,weights=weights) + result[key] = samples[0] + + return result + + +def build_mock_trip( + user_id: UUID, + origin, + destination, + labels: Optional[Dict] = {}, + start_ts: Optional[float] = None, + end_ts: Optional[float] = None) -> ecwc.Confirmedtrip: + """repackages mock data as a Confirmedtrip Entry type + + NOTE: these mock objects **do not** include all fields. see Trip and Confirmedtrip + classes for the complete list and expand if necessary. + + :param user_id: the user id UUID + :param origin: trip origin coordinates + :param destination: trip destination coordinates + :param labels: user labels for the trip, optional, default none + :param start_ts: optional timestamp for trip start, otherwise NOW + :param end_ts: optional timestamp for trip end, otherwise NOW + :return: a mock Confirmedtrip entry + """ + start_ts = start_ts if start_ts is not None else time.time() + end_ts = end_ts if end_ts is not None else time.time() + key = "analysis/confirmed_trip" + data = { + "start_ts": start_ts, + "start_loc": { + "type": "Point", + "coordinates": origin + }, + "end_ts": end_ts, + "end_loc": { + "type": "Point", + "coordinates": destination + }, + "user_input": labels + } + + return ecwe.Entry.create_fake_entry(user_id, key, data, write_ts=time.time()) + + +def generate_mock_trips( + user_id, + trips, + origin, + destination, + label_data = None, + within_threshold = None, + start_ts: None = None, + end_ts: None = None, + threshold = 0.01, + max = 0.1, + has_label_p = 1.0, + seed = 0): + """mocking function that generates multiple trips for a user. some are sampled + within a threshold from the provided o/d pair, and some have labels. some other + ones can be sampled to appear outside of the threshold of the o/d locations. + + label_data is an optional dictionary with labels and sample weights, for example: + { + "mode_confirm": ['walk', 'bike'], + "replaced_mode": ['drive', 'tnc'], + "purpose_confirm": ['home', 'work'], + "mode_weights": [0.8, 0.2], + "replaced_mode_weights": [0.4, 0.6], + "purpose_weights": [0.1, 0.9] + } + + weights entries are optional and result in uniform sampling. + + :param user_id: user UUID + :param trips: number of trips + :param origin: origin coordinates + :param destination: destination coordinates + :param label_data: dictionary of label data, see above, defaults to None + :param within_threshold: number of trips that should fall within the provided + distance threshold in degrees WGS84, defaults to None + :param threshold: distance threshold in WGS84 for sampling, defaults to 0.01 + :param max: maximum distance beyond the threshold for trips sampled that + are not within the threshold, defaults to 0.1 degrees WGS84 + :param has_label_p: probability a trip has labels, defaults to 1.0 + :param seed: random seed, defaults to 0 + :return: randomly sampled trips + """ + + random.seed(seed) + within = within_threshold if within_threshold is not None else trips + trips_within_threshold = [i < within for i in range(trips)] + result = [] + for within in trips_within_threshold: + o = generate_trip_coordinates(origin, within, threshold, max) + d = generate_trip_coordinates(destination, within, threshold, max) + labels = {} if label_data is None or random.random() > has_label_p \ + else sample_trip_labels( + mode_labels=label_data.get('mode_confirm'), + replaced_mode_labels=label_data.get('replaced_mode'), + purpose_labels=label_data.get('purpose_confirm'), + mode_weights=label_data.get('mode_weights'), + replaced_mode_weights=label_data.get('replaced_mode_weights'), + purpose_weights=label_data.get('purpose_weights') + ) + trip = build_mock_trip(user_id, o, d, labels, start_ts, end_ts) + result.append(trip) + + random.shuffle(result) + return result + + +if __name__ == '__main__': + label_data = { + "mode_confirm": ['walk', 'bike', 'drive'], + "purpose_confirm": ['work', 'home', 'school'], + "replaced_mode": ['walk', 'bike', 'drive'] + } + result = generate_mock_trips('joe-bob', 14, [0, 0], [1,1], label_data, 6) + for r in result: + print(r) \ No newline at end of file