diff --git a/logic/biometrics.py b/logic/biometrics.py index c101696..edeb25e 100644 --- a/logic/biometrics.py +++ b/logic/biometrics.py @@ -1,8 +1,8 @@ from logic.base_logic import OptionalBaseLogic from brainflow.board_shim import BoardShim, BrainFlowPresets -from brainflow.data_filter import DataFilter, AggOperations, NoiseTypes, FilterTypes, DetrendOperations, WindowOperations -from scipy.signal import find_peaks +from brainflow.data_filter import DataFilter, WaveletTypes +from scipy.signal import find_peaks, butter, filtfilt import numpy as np import utils @@ -14,7 +14,7 @@ class Biometrics(OptionalBaseLogic): RESP_FREQ = "BreathsPerSecond" RESP_BPM = "BreathsPerMinute" - def __init__(self, board, supported=True, fft_size=1024, ema_decay=0.025): + def __init__(self, board, supported=True, window_seconds=10, ema_decay=0.025): super().__init__(board, supported) if supported: @@ -25,46 +25,46 @@ def __init__(self, board, supported=True, fft_size=1024, ema_decay=0.025): self.ppg_sampling_rate = BoardShim.get_sampling_rate( board_id, BrainFlowPresets.ANCILLARY_PRESET) - self.window_seconds = int(fft_size / self.ppg_sampling_rate) + 1 + self.window_seconds = window_seconds self.max_sample_size = self.ppg_sampling_rate * self.window_seconds - self.fft_size = fft_size + + # heart rate filter params + lowcut = 30 / 60 + highcut = 150 / 60 + order = 2 + b, a = butter(order, (lowcut, highcut), btype="bandpass", fs=self.ppg_sampling_rate) + self.hr_filter = lambda data: filtfilt(b, a, data) + self.min_distance = self.ppg_sampling_rate / highcut # ema smoothing variables self.current_values = None self.ema_decay = ema_decay - def estimate_heart_rate(self, hr_ir, hr_red, ppg_ambient): + def estimate_heart_rate(self, ppg_ir, ppg_red, ppg_ambient): # do not modify data - hr_ir, hr_red, hr_ambient = np.copy(hr_ir), np.copy(hr_red), np.copy(ppg_ambient) - - # Possible min and max heart rate in hz - lowcut = 0.5 - highcut = 4.25 - order = 4 + ppg_ir, ppg_red, ppg_ambient = np.copy(ppg_ir), np.copy(ppg_red), np.copy(ppg_ambient) # remove ambient light - hr_ir = np.clip(hr_ir - hr_ambient, 0, None) - hr_red = np.clip(hr_red - hr_ambient, 0, None) - - # detrend and filter down to possible heart rates - DataFilter.detrend(hr_red, DetrendOperations.LINEAR) - DataFilter.detrend(hr_ir, DetrendOperations.LINEAR) - DataFilter.perform_bandpass(hr_red, self.ppg_sampling_rate, lowcut, highcut, order, FilterTypes.BUTTERWORTH, 0) - DataFilter.perform_bandpass(hr_ir, self.ppg_sampling_rate, lowcut, highcut, order, FilterTypes.BUTTERWORTH, 0) - + ppg_ir -= ppg_ambient + ppg_red -= ppg_ambient + + # Denoise and Filter to possible heart rates + DataFilter.perform_wavelet_denoising(ppg_ir, WaveletTypes.DB4, 4) + DataFilter.perform_wavelet_denoising(ppg_red, WaveletTypes.DB4, 4) + ppg_ir = self.hr_filter(ppg_ir) + ppg_red = self.hr_filter(ppg_red) + # find peaks in signal - red_peaks, _ = find_peaks(hr_red, distance=self.ppg_sampling_rate/2) - ir_peaks, _ = find_peaks(hr_ir, distance=self.ppg_sampling_rate/2) + red_peaks, _ = find_peaks(ppg_red, distance=self.min_distance) + ir_peaks, _ = find_peaks(ppg_ir, distance=self.min_distance) - # get inter-peak intervals - red_ipis = np.diff(red_peaks) / self.ppg_sampling_rate - ir_ipis = np.diff(ir_peaks) / self.ppg_sampling_rate - ipis = np.concatenate((red_ipis, ir_ipis)) + # get inter-peak sample intervals + sample_ipis = np.concatenate((np.diff(red_peaks), np.diff(ir_peaks))) - # get bpm from mean inter-peak interval - average_ipi = np.mean(ipis) + # get bpm from mean inter-peak sample interval + average_ipi = np.mean(sample_ipis) / self.ppg_sampling_rate heart_bpm = 60 / average_ipi - + return heart_bpm def calculate_data_dict(self): diff --git a/main.py b/main.py index 7e0cd07..640b264 100644 --- a/main.py +++ b/main.py @@ -124,9 +124,7 @@ def BoardInit(args: argparse.Namespace) -> tuple[BoardShim, list[BaseLogic], int ### Logic Modules ### has_muse_ppg = master_board_id in (BoardIds.MUSE_2_BOARD, BoardIds.MUSE_S_BOARD) - - fft_size= 64 * 10 # TODO: Make this configurable - biometrics_logic = Biometrics(board, has_muse_ppg, fft_size=fft_size, ema_decay=ema_decay) + biometrics_logic = Biometrics(board, has_muse_ppg, ema_decay=ema_decay) logics = [ Info(board, window_seconds=window_seconds), @@ -146,9 +144,6 @@ def BoardInit(args: argparse.Namespace) -> tuple[BoardShim, list[BaseLogic], int if args.enable_action: logics.append(MLAction(board, ema_decay = ema_decay * args.action_ema_multiplier)) - ### Adding one second to startup time for adaptive filters ### - startup_time += 1 - BoardShim.log_message(LogLevels.LEVEL_INFO.value, 'Intializing (wait {}s)'.format(startup_time)) board.start_stream(streamer_params=args.streamer_params) time.sleep(startup_time)