Skip to content

Commit

Permalink
Compute HRV as exponentially weighted moving average
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCBrammer committed Dec 15, 2024
1 parent 81ad261 commit a52613d
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 58 deletions.
12 changes: 5 additions & 7 deletions openhrv/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from math import ceil


HRV_MEAN_WINDOW: Final[float] = 15.0 # seconds
IBI_MEDIAN_WINDOW: Final[int] = 11 # samples
EWMA_WEIGHT_CURRENT_SAMPLE: Final[float] = (
0.1 # set in range [0, 1], 1 being minimal smoothing, see https://en.wikipedia.org/wiki/Exponential_smoothing
)

MIN_BREATHING_RATE: Final[float] = 4.0 # breaths per minute
MAX_BREATHING_RATE: Final[float] = 7.0 # breaths per minute
Expand All @@ -16,16 +18,12 @@
MIN_PLOT_IBI: Final[int] = 300
MAX_PLOT_IBI: Final[int] = 1500


HRV_BUFFER_SIZE: Final[int] = 10 # samples
# IBI buffer must hold enough samples such that even if IBIs (on average) were
# MIN_IBI long, there'd be enough samples to display for IBI_HISTORY_DURATION seconds.
IBI_HISTORY_DURATION: Final[int] = 60 # seconds
IBI_BUFFER_SIZE: Final[int] = ceil(IBI_HISTORY_DURATION / (MIN_IBI / 1000)) # samples
MEANHRV_HISTORY_DURATION: Final[int] = 120 # seconds
MEANHRV_BUFFER_SIZE: Final[int] = ceil(
MEANHRV_HISTORY_DURATION / (MIN_IBI / 1000)
) # samples
HRV_HISTORY_DURATION: Final[int] = 120 # seconds
HRV_BUFFER_SIZE: Final[int] = ceil(HRV_HISTORY_DURATION / (MIN_IBI / 1000)) # samples

COMPATIBLE_SENSORS: Final[list[str]] = ["Polar", "Decathlon Dual HR"]

Expand Down
77 changes: 35 additions & 42 deletions openhrv/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from openhrv.utils import get_sensor_address, sign, NamedSignal
from openhrv.config import (
tick_to_breathing_rate,
MEANHRV_BUFFER_SIZE,
HRV_BUFFER_SIZE,
IBI_BUFFER_SIZE,
MAX_BREATHING_RATE,
Expand All @@ -16,13 +15,13 @@
IBI_MEDIAN_WINDOW,
MIN_HRV_TARGET,
MAX_HRV_TARGET,
HRV_MEAN_WINDOW,
EWMA_WEIGHT_CURRENT_SAMPLE,
)


class Model(QObject):
ibis_buffer_update = Signal(NamedSignal)
mean_hrv_update = Signal(NamedSignal)
hrv_update = Signal(NamedSignal)
addresses_update = Signal(NamedSignal)
pacer_rate_update = Signal(NamedSignal)
hrv_target_update = Signal(NamedSignal)
Expand All @@ -35,14 +34,16 @@ def __init__(self):
self.ibis_seconds: deque[float] = deque(
map(float, range(-IBI_BUFFER_SIZE, 1)), IBI_BUFFER_SIZE
)
self.mean_hrv_buffer: deque[float] = deque(
[-1] * MEANHRV_BUFFER_SIZE, MEANHRV_BUFFER_SIZE
self.hrv_buffer: deque[float] = deque([-1] * HRV_BUFFER_SIZE, HRV_BUFFER_SIZE)
self.hrv_seconds: deque[float] = deque(
map(float, range(-HRV_BUFFER_SIZE, 1)), HRV_BUFFER_SIZE
)
self.mean_hrv_seconds: deque[float] = deque(
map(float, range(-MEANHRV_BUFFER_SIZE, 1)), MEANHRV_BUFFER_SIZE
)
self._hrv_buffer: deque[int] = deque([-1] * HRV_BUFFER_SIZE, HRV_BUFFER_SIZE)

# Exponentially Weighted Moving Average:
# - https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
# - https://en.wikipedia.org/wiki/Exponential_smoothing
# - http://nestedsoftware.com/2018/04/04/exponential-moving-average-on-streaming-data-4hhl.24876.html
self.ewma_hrv: float = 1.0
self.sensors: list[QBluetoothDeviceInfo] = []
self.breathing_rate: float = float(MAX_BREATHING_RATE)
self.hrv_target: int = math.ceil((MIN_HRV_TARGET + MAX_HRV_TARGET) / 2)
Expand Down Expand Up @@ -80,9 +81,9 @@ def update_sensors(self, sensors: list[QBluetoothDeviceInfo]):
)

def validate_ibi(self, ibi: int) -> int:
validated_ibi = ibi
validated_ibi: int = ibi
if ibi < MIN_IBI or ibi > MAX_IBI:
median_ibi = math.ceil(
median_ibi: int = math.ceil(
statistics.median(
islice(
self.ibis_buffer,
Expand All @@ -97,11 +98,20 @@ def validate_ibi(self, ibi: int) -> int:
validated_ibi = MAX_IBI
else:
validated_ibi = median_ibi
print(f"Correcting invalid IBI: {ibi} to {validated_ibi}")
print(f"Correcting outlier IBI {ibi} to {validated_ibi}")

return validated_ibi

def validate_hrv(self, hrv: int) -> int:
validated_hrv: int = hrv
if hrv > MAX_HRV_TARGET:
validated_hrv = min(math.ceil(self.ewma_hrv), MAX_HRV_TARGET)
print(f"Correcting outlier HRV {hrv} to {validated_hrv}")

return validated_hrv

def compute_local_hrv(self):
"""https://doi.org/10.1038/s41598-019-44201-7 (Figure 2)"""
self._duration_current_phase += self.ibis_buffer[-1]
# 1: IBI rises, -1: IBI falls, 0: IBI constant
current_ibi_phase: int = sign(self.ibis_buffer[-1] - self.ibis_buffer[-2])
Expand All @@ -115,38 +125,21 @@ def compute_local_hrv(self):
self.update_hrv_buffer(local_hrv)

seconds_current_phase: float = self._duration_current_phase / 1000
self.update_mean_hrv_seconds(seconds_current_phase)
self.update_hrv_seconds(seconds_current_phase)
self._duration_current_phase = 0

self._last_ibi_extreme = current_ibi_extreme
self._last_ibi_phase = current_ibi_phase

def update_hrv_buffer(self, local_hrv):
if not self._hrv_buffer[0]:
return # wait until buffer is full
threshold = max(map(abs, self._hrv_buffer)) * 4
if local_hrv > threshold:
print(f"Correcting outlier HRV {local_hrv} to {threshold}")
local_hrv = threshold
self._hrv_buffer.append(local_hrv)
self.update_mean_hrv_buffer()

def update_mean_hrv_buffer(self):
seconds = self.ibis_seconds
hrv_buffer_seconds = islice(seconds, len(seconds) - HRV_BUFFER_SIZE, None)
start_idx_mean_window = next(
(
i
for i, second in enumerate(hrv_buffer_seconds)
if second >= -HRV_MEAN_WINDOW
),
-1,
def update_hrv_buffer(self, local_hrv: int):
self.ewma_hrv = (
EWMA_WEIGHT_CURRENT_SAMPLE * self.validate_hrv(local_hrv)
+ (1 - EWMA_WEIGHT_CURRENT_SAMPLE) * self.ewma_hrv
)
self.mean_hrv_buffer.append(
statistics.mean(islice(self._hrv_buffer, start_idx_mean_window, None))
)
self.mean_hrv_update.emit(
NamedSignal("MeanHrv", (self.mean_hrv_seconds, self.mean_hrv_buffer))

self.hrv_buffer.append(self.ewma_hrv)
self.hrv_update.emit(
NamedSignal("HeartRateVariability", (self.hrv_seconds, self.hrv_buffer))
)

def update_ibis_seconds(self, seconds: float):
Expand All @@ -155,8 +148,8 @@ def update_ibis_seconds(self, seconds: float):
)
self.ibis_seconds.append(0.0)

def update_mean_hrv_seconds(self, seconds: float):
self.mean_hrv_seconds = deque(
[i - seconds for i in self.mean_hrv_seconds], MEANHRV_BUFFER_SIZE
def update_hrv_seconds(self, seconds: float):
self.hrv_seconds = deque(
[i - seconds for i in self.hrv_seconds], HRV_BUFFER_SIZE
)
self.mean_hrv_seconds.append(0.0)
self.hrv_seconds.append(0.0)
14 changes: 7 additions & 7 deletions openhrv/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from openhrv.model import Model
from openhrv.config import (
breathing_rate_to_tick,
MEANHRV_HISTORY_DURATION,
HRV_HISTORY_DURATION,
IBI_HISTORY_DURATION,
MAX_BREATHING_RATE,
MIN_BREATHING_RATE,
Expand Down Expand Up @@ -162,7 +162,7 @@ def __init__(self, model: Model):

self.model = model
self.model.ibis_buffer_update.connect(self.plot_ibis)
self.model.mean_hrv_update.connect(self.plot_hrv)
self.model.hrv_update.connect(self.plot_hrv)
self.model.addresses_update.connect(self.list_addresses)
self.model.pacer_rate_update.connect(self.update_pacer_label)
self.model.hrv_target_update.connect(self.update_hrv_target)
Expand Down Expand Up @@ -194,7 +194,7 @@ def __init__(self, model: Model):
self.model.addresses_update.connect(self.logger.write_to_file)
self.model.pacer_rate_update.connect(self.logger.write_to_file)
self.model.hrv_target_update.connect(self.logger.write_to_file)
self.model.mean_hrv_update.connect(self.logger.write_to_file)
self.model.hrv_update.connect(self.logger.write_to_file)
self.signals.annotation.connect(self.logger.write_to_file)

self.ibis_widget = XYSeriesWidget(
Expand All @@ -211,13 +211,13 @@ def __init__(self, model: Model):
self.ibis_widget.y_axis.setRange(MIN_PLOT_IBI, MAX_PLOT_IBI)

self.hrv_widget = XYSeriesWidget(
self.model.mean_hrv_seconds, self.model.mean_hrv_buffer, WHITE
self.model.hrv_seconds, self.model.hrv_buffer, WHITE
)
self.hrv_widget.x_axis.setTitleText("Seconds")
# The time series displays only the samples within the last
# MEANHRV_HISTORY_DURATION seconds,
# even though there are more samples in self.model.mean_hrv_seconds.
self.hrv_widget.x_axis.setRange(-MEANHRV_HISTORY_DURATION, 0)
# HRV_HISTORY_DURATION seconds,
# even though there are more samples in self.model.hrv_seconds.
self.hrv_widget.x_axis.setRange(-HRV_HISTORY_DURATION, 0)
self.hrv_widget.y_axis.setTitleText("HRV (msec)")
self.hrv_widget.y_axis.setRange(0, self.model.hrv_target)
colorgrad = QLinearGradient(0, 0, 0, 1) # horizontal gradient
Expand Down
10 changes: 8 additions & 2 deletions test/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self):
super().__init__()
# Polar sensor emits a (package of) IBI(s) about every second.
# Here we "emit" / simulate IBI(s) in quicker succession in order to push the rendering.
self.mean_ibi = 300
self.mean_ibi = 900
self.timer = QTimer()
self.timer.setInterval(self.mean_ibi)
self.timer.timeout.connect(self.simulate_ibi)
Expand All @@ -78,10 +78,16 @@ def simulate_ibi(self):
# in a sinusoidal pattern around `mean_ibi`,
# in a range of `range_ibi`.
breathing_rate = 6
range_ibi = 40 # HRV must settle at this value
range_ibi = 100 # without noise, HRV settles at this value
ibi = self.mean_ibi + (range_ibi / 2) * math.sin(
2 * math.pi * breathing_rate / 60 * time.time()
)
# add noise spikes
if randint(1, 30) == 1:
if randint(1, 2) == 1:
ibi += 500
else:
ibi -= 500
self.ibi_update.emit(ibi)


Expand Down

0 comments on commit a52613d

Please sign in to comment.