Skip to content

Commit

Permalink
HistoryManager: Improved aggregation of multi-value attributes.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Sep 6, 2023
1 parent aec970d commit d734c5a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
2 changes: 1 addition & 1 deletion dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def insert_datapoints(
raise DatabaseError(f"Update of master record failed: {e}\n{dps}") from e

def update_master_records(self, etype: str, eids: list[str], records: list[dict]) -> None:
"""Replace master record of `etype`:`eid` with the provided `record`.
"""Replace master records of `etype`:`eid` with the provided `records`.
Raises DatabaseError when update fails.
"""
Expand Down
82 changes: 80 additions & 2 deletions dp3/history_management/history_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

from pydantic import BaseModel, Extra, validator

from dp3.common.attrspec import AttrSpecType, AttrType, ObservationsHistoryParams
from dp3.common.attrspec import (
AttrSpecObservations,
AttrSpecType,
AttrType,
ObservationsHistoryParams,
)
from dp3.common.callback_registrar import CallbackRegistrar
from dp3.common.config import CronExpression, PlatformConfig
from dp3.common.utils import parse_time_duration
Expand Down Expand Up @@ -286,7 +291,80 @@ def aggregate_master_doc(attr_specs: dict[str, AttrSpecType], master_document: d
if spec.t != AttrType.OBSERVATIONS or not spec.history_params.aggregate:
continue

master_document[attr] = aggregate_dp_history_on_equal(history, spec.history_params)
if spec.multi_value:
master_document[attr] = aggregate_multivalue_dp_history_on_equal(history, spec)
else:
master_document[attr] = aggregate_dp_history_on_equal(history, spec.history_params)


def aggregate_multivalue_dp_history_on_equal(history: list[dict], spec: AttrSpecObservations):
"""
Merge multivalue datapoints in the history with equal values and overlapping time validity.
Avergages the confidence.
Will keep a pool of "active" datapoints and merge them with the next datapoint
if they have the same value and overlapping time validity.
"""
history = sorted(history, key=lambda x: x["t1"])
aggregated_history = []
pre = spec.history_params.pre_validity
post = spec.history_params.post_validity

if spec.data_type.hashable:
current_dps = {}

for dp in history:
v = dp["v"]
if v in current_dps:
current_dp = current_dps[v]
if current_dp["t2"] + post >= dp["t1"] - pre: # Merge with current_dp
current_dp["t2"] = max(dp["t2"], current_dp["t2"])
current_dp["c"] += dp["c"]
current_dp["cnt"] += 1
else: # No overlap, finalize current_dp and reset
current_dp["c"] /= current_dp["cnt"]
del current_dp["cnt"]
aggregated_history.append(current_dp)
current_dps[v] = dp
current_dps[v]["cnt"] = 1
else: # New value, finalize initialize current_dp
current_dps[v] = dp
current_dps[v]["cnt"] = 1

for _v, current_dp in current_dps.items(): # Finalize remaining dps
current_dp["c"] /= current_dp["cnt"]
del current_dp["cnt"]
aggregated_history.append(current_dp)
return aggregated_history
else:
current_dps = []

for dp in history:
v = dp["v"]
for i, current_dp in enumerate(current_dps):
if current_dp["v"] != v:
continue

if current_dp["t2"] + post >= dp["t1"] - pre: # Merge with current_dp
current_dp["t2"] = max(dp["t2"], current_dp["t2"])
current_dp["c"] += dp["c"]
current_dp["cnt"] += 1
else: # No overlap, finalize current_dp and reset
current_dp["c"] /= current_dp["cnt"]
del current_dp["cnt"]
aggregated_history.append(current_dp)
dp["cnt"] = 1
current_dps[i] = dp
break
else: # New value, finalize initialize current_dp
dp["cnt"] = 1
current_dps.append(dp)

for current_dp in current_dps: # Finalize remaining dps
current_dp["c"] /= current_dp["cnt"]
del current_dp["cnt"]
aggregated_history.append(current_dp)
return aggregated_history


def aggregate_dp_history_on_equal(history: list[dict], spec: ObservationsHistoryParams):
Expand Down

0 comments on commit d734c5a

Please sign in to comment.