From 608dec0366bb0e40b6b210877350880a343fae09 Mon Sep 17 00:00:00 2001 From: Eric Larson Date: Wed, 29 Jan 2025 16:02:18 -0500 Subject: [PATCH] ENH: Add BDF support --- README.md | 2 +- edfio/_lazy_loading.py | 4 ++-- edfio/edf.py | 21 +++++++++++++++++++-- edfio/edf_annotations.py | 4 +++- edfio/edf_signal.py | 26 +++++++++++++++++--------- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index a7ea792..bb3dd1a 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ It requires Python>=3.9 and NumPy>=1.22 and is available on PyPI: - Slice recordings (by seconds or annotation texts) - Drop individual signals - Anonymize recordings +- BDF file ([BioSemi](https://www.biosemi.com/faq/file_format.htm)) support ## Known limitations @@ -38,7 +39,6 @@ It requires Python>=3.9 and NumPy>=1.22 and is available on PyPI: - The maximum data record size of 61440 bytes recommended by the [EDF specs](https://www.edfplus.info/specs/edf.html) is not enforced. - To write an EDF with a non-integer seconds duration, the data record duration has to be manually set to an appropriate value. - Slicing an EDF to a timespan that is not an integer multiple of the data record duration does not work. -- BDF files ([BioSemi](https://www.biosemi.com/faq/file_format.htm)) are not supported. ## Contributing diff --git a/edfio/_lazy_loading.py b/edfio/_lazy_loading.py index cc0d943..434f02d 100644 --- a/edfio/_lazy_loading.py +++ b/edfio/_lazy_loading.py @@ -20,7 +20,7 @@ class LazyLoader: def __init__( self, - buffer: Union[NDArray[np.int16], np.memmap[Any, np.dtype[np.int16]]], + buffer: Union[NDArray[np.int16 | np.int32], np.memmap[Any, np.dtype[np.int16 | np.int32]]], start_sample: int, end_sample: int, ) -> None: @@ -30,7 +30,7 @@ def __init__( def load( self, start_record: Optional[int] = None, end_record: Optional[int] = None - ) -> NDArray[np.int16]: + ) -> NDArray[np.int16 | np.int32]: """ Load signal data from the buffer. diff --git a/edfio/edf.py b/edfio/edf.py index 7a5f521..a57535f 100644 --- a/edfio/edf.py +++ b/edfio/edf.py @@ -79,6 +79,8 @@ class Edf: annotations : Iterable[EdfAnnotation] | None, default: None The annotations, consisting of onset, duration (optional), and text. If not `None`, an EDF+C file is created. + fmt : str, default "edf" + Can be "edf" or "bdf" to handle EDF or BDF data, respectively. """ _header_fields = ( @@ -104,6 +106,7 @@ def __init__( starttime: datetime.time | None = None, data_record_duration: float | None = None, annotations: Iterable[EdfAnnotation] | None = None, + fmt: Literal["edf", "bdf"] = "edf", ): if not signals and not annotations: raise ValueError("Edf must contain either signals or annotations") @@ -130,6 +133,7 @@ def __init__( self._set_reserved("") if starttime.microsecond and annotations is None: warnings.warn("Creating EDF+C to store microsecond starttime.") + self._fmt = fmt if annotations is not None or starttime.microsecond: signals = ( *signals, @@ -138,6 +142,7 @@ def __init__( num_data_records=self.num_data_records, data_record_duration=self.data_record_duration, subsecond_offset=starttime.microsecond / 1_000_000, + fmt=self._fmt, ), ) self._set_reserved("EDF+C") @@ -213,6 +218,7 @@ def _load_data( lens = [signal.samples_per_data_record for signal in self._signals] datarecord_len = sum(lens) truncated = False + # TODO: THIS NEEDS FIXING FOR BDF if not isinstance(file, Path): data_bytes = file.read() actual_records = len(data_bytes) // (datarecord_len * 2) @@ -272,6 +278,11 @@ def signals(self) -> tuple[EdfSignal, ...]: def _set_signals(self, signals: Sequence[EdfSignal]) -> None: signals = tuple(signals) + for si, signal in enumerate(signals): + if signal._fmt != self._fmt: + raise ValueError( + f"Signal {si} ({signal}) has format {signal._fmt}, but EDF is {self._fmt}" + ) self._set_num_data_records_with_signals(signals) self._signals = signals self._set_bytes_in_header_record(256 * (len(signals) + 1)) @@ -362,10 +373,13 @@ def write(self, target: Path | str | io.BufferedWriter | io.BytesIO) -> None: lens = [signal.samples_per_data_record for signal in self._signals] ends = np.cumsum(lens) starts = ends - lens - data_record = np.empty((num_data_records, sum(lens)), dtype=np.int16) + dtype = " None: num_data_records=self.num_data_records, data_record_duration=self.data_record_duration, subsecond_offset=self.starttime.microsecond / 1_000_000, + fmt=self._fmt, ) self._set_signals((*self.signals, new_annotation_signal)) @@ -1065,6 +1081,7 @@ def _slice_annotations_signal( data_record_duration=self.data_record_duration, with_timestamps=is_timekeeping_signal, subsecond_offset=self._subsecond_offset + start - int(start), + fmt=self._fmt, ) diff --git a/edfio/edf_annotations.py b/edfio/edf_annotations.py index a13071f..6834fd7 100644 --- a/edfio/edf_annotations.py +++ b/edfio/edf_annotations.py @@ -61,6 +61,7 @@ def _create_annotations_signal( data_record_duration: float, with_timestamps: bool = True, subsecond_offset: float = 0, + fmt: Literal["edf", "bdf"] = "edf", ) -> EdfSignal: data_record_starts = np.arange(num_data_records) * data_record_duration annotations = sorted(annotations) @@ -92,7 +93,8 @@ def _create_annotations_signal( signal = EdfSignal( np.arange(1.0), # placeholder signal, as argument `data` is non-optional sampling_frequency=maxlen // 2 / divisor, - physical_range=(-32768, 32767), + physical_range=(-32768, 32767) if fmt == "edf" else (-8388608, 8388607), + fmt=fmt, ) signal._label = b"EDF Annotations " signal._set_samples_per_data_record(maxlen // 2) diff --git a/edfio/edf_signal.py b/edfio/edf_signal.py index 42526db..a785030 100644 --- a/edfio/edf_signal.py +++ b/edfio/edf_signal.py @@ -2,7 +2,7 @@ import math import warnings -from typing import Callable, NamedTuple +from typing import Callable, Literal, NamedTuple import numpy as np import numpy.typing as npt @@ -79,11 +79,14 @@ class EdfSignal: physical_range : tuple[float, float] | None, default: None The physical range given as a tuple of `(physical_min, physical_max)`. If `None`, this is determined from the data. - digital_range : tuple[int, int], default: `(-32768, 32767)` + digital_range : tuple[int, int] | None, default: None The digital range given as a tuple of `(digital_min, digital_max)`. Uses the - maximum resolution of 16-bit integers by default. + maximum resolution of 16-bit integers when fmt is "edf" and for 24-bit + integers when fmt is "bdf" by default. prefiltering : str, default: `""` The signal prefiltering, e.g., `"HP:0.1Hz LP:75Hz"`. + fmt : str, default `"edf"` + The data format. Can be `"edf"` or `"bdf"`. """ _header_fields = ( @@ -99,7 +102,7 @@ class EdfSignal: ("reserved", 32), ) - _digital: npt.NDArray[np.int16] | None = None + _digital: npt.NDArray[np.int16 | np.int32] | None = None _lazy_loader: LazyLoader | None = None def __init__( @@ -111,14 +114,18 @@ def __init__( transducer_type: str = "", physical_dimension: str = "", physical_range: tuple[float, float] | None = None, - digital_range: tuple[int, int] = (-32768, 32767), + digital_range: tuple[int, int] | None = None, prefiltering: str = "", + fmt: Literal["edf", "bdf"] = "edf", ): self._sampling_frequency = sampling_frequency self.label = label self.transducer_type = transducer_type self.physical_dimension = physical_dimension self.prefiltering = prefiltering + self._fmt = fmt + if digital_range is None: + digital_range = (-8388608, 8388607) if fmt == "bdf" else (-32768, 32767) self._set_reserved("") if not np.all(np.isfinite(data)): raise ValueError("Signal data must contain only finite values") @@ -301,7 +308,7 @@ def sampling_frequency(self) -> float: return self._sampling_frequency @property - def digital(self) -> npt.NDArray[np.int16]: + def digital(self) -> npt.NDArray[np.int16 | np.int32]: """ Numpy array containing the digital (uncalibrated) signal values as 16-bit integers. @@ -314,7 +321,7 @@ def digital(self) -> npt.NDArray[np.int16]: self._lazy_loader = None return self._digital - def _calibrate(self, digital: npt.NDArray[np.int16]) -> npt.NDArray[np.float64]: + def _calibrate(self, digital: npt.NDArray[np.int16 | np.int32]) -> npt.NDArray[np.float64]: try: gain, offset = _calculate_gain_and_offset( self.digital_min, @@ -352,7 +359,7 @@ def data(self) -> npt.NDArray[np.float64]: def get_digital_slice( self, start_second: float, stop_second: float - ) -> npt.NDArray[np.int16]: + ) -> npt.NDArray[np.int16 | np.int32]: """ Get a slice of the digital signal values. @@ -499,4 +506,5 @@ def _set_data(self, data: npt.NDArray[np.float64]) -> None: self.physical_min, self.physical_max, ) - self._digital = np.round(data / gain - offset).astype(np.int16) + dtype = np.int32 if self._fmt == "bdf" else np.int16 + self._digital = np.round(data / gain - offset).astype(dtype)