Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add BDF support #63

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ It requires Python>=3.9 and NumPy>=1.22 and is available on PyPI:
- Slice recordings (by seconds or annotation texts)
- Drop individual signals
- Anonymize recordings
- BDF file ([BioSemi](https://www.biosemi.com/faq/file_format.htm)) support


## Known limitations
- Discontiguous files (EDF+D) are treated as contiguous ones.
- The maximum data record size of 61440 bytes recommended by the [EDF specs](https://www.edfplus.info/specs/edf.html) is not enforced.
- To write an EDF with a non-integer seconds duration, the data record duration has to be manually set to an appropriate value.
- Slicing an EDF to a timespan that is not an integer multiple of the data record duration does not work.
- BDF files ([BioSemi](https://www.biosemi.com/faq/file_format.htm)) are not supported.


## Contributing
Expand Down
4 changes: 2 additions & 2 deletions edfio/_lazy_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class LazyLoader:

def __init__(
self,
buffer: Union[NDArray[np.int16], np.memmap[Any, np.dtype[np.int16]]],
buffer: Union[NDArray[np.int16 | np.int32], np.memmap[Any, np.dtype[np.int16 | np.int32]]],
start_sample: int,
end_sample: int,
) -> None:
Expand All @@ -30,7 +30,7 @@ def __init__(

def load(
self, start_record: Optional[int] = None, end_record: Optional[int] = None
) -> NDArray[np.int16]:
) -> NDArray[np.int16 | np.int32]:
"""
Load signal data from the buffer.

Expand Down
21 changes: 19 additions & 2 deletions edfio/edf.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Edf:
annotations : Iterable[EdfAnnotation] | None, default: None
The annotations, consisting of onset, duration (optional), and text. If not
`None`, an EDF+C file is created.
fmt : str, default "edf"
Can be "edf" or "bdf" to handle EDF or BDF data, respectively.
"""

_header_fields = (
Expand All @@ -104,6 +106,7 @@ def __init__(
starttime: datetime.time | None = None,
data_record_duration: float | None = None,
annotations: Iterable[EdfAnnotation] | None = None,
fmt: Literal["edf", "bdf"] = "edf",
):
if not signals and not annotations:
raise ValueError("Edf must contain either signals or annotations")
Expand All @@ -130,6 +133,7 @@ def __init__(
self._set_reserved("")
if starttime.microsecond and annotations is None:
warnings.warn("Creating EDF+C to store microsecond starttime.")
self._fmt = fmt
if annotations is not None or starttime.microsecond:
signals = (
*signals,
Expand All @@ -138,6 +142,7 @@ def __init__(
num_data_records=self.num_data_records,
data_record_duration=self.data_record_duration,
subsecond_offset=starttime.microsecond / 1_000_000,
fmt=self._fmt,
),
)
self._set_reserved("EDF+C")
Expand Down Expand Up @@ -213,6 +218,7 @@ def _load_data(
lens = [signal.samples_per_data_record for signal in self._signals]
datarecord_len = sum(lens)
truncated = False
# TODO: THIS NEEDS FIXING FOR BDF
if not isinstance(file, Path):
data_bytes = file.read()
actual_records = len(data_bytes) // (datarecord_len * 2)
Expand Down Expand Up @@ -272,6 +278,11 @@ def signals(self) -> tuple[EdfSignal, ...]:

def _set_signals(self, signals: Sequence[EdfSignal]) -> None:
signals = tuple(signals)
for si, signal in enumerate(signals):
if signal._fmt != self._fmt:
raise ValueError(
f"Signal {si} ({signal}) has format {signal._fmt}, but EDF is {self._fmt}"
)
self._set_num_data_records_with_signals(signals)
self._signals = signals
self._set_bytes_in_header_record(256 * (len(signals) + 1))
Expand Down Expand Up @@ -362,10 +373,13 @@ def write(self, target: Path | str | io.BufferedWriter | io.BytesIO) -> None:
lens = [signal.samples_per_data_record for signal in self._signals]
ends = np.cumsum(lens)
starts = ends - lens
data_record = np.empty((num_data_records, sum(lens)), dtype=np.int16)
dtype = "<i2" if self._fmt == "edf" else "<i4"
data_record = np.empty((num_data_records, sum(lens)), dtype=dtype)
for signal, start, end in zip(self._signals, starts, ends):
data_record[:, start:end] = signal.digital.reshape((-1, end - start))

if self._fmt == "bdf":
data_record[data_record < 0] += 1 << 24
data_record = data_record.view(np.uint8).reshape(-1, 4)[:, :3]
if isinstance(target, str):
target = Path(target)
if isinstance(target, io.BufferedWriter):
Expand Down Expand Up @@ -659,6 +673,7 @@ def _update_record_duration_in_annotation_signals(
data_record_duration=data_record_duration,
with_timestamps=signal is self._timekeeping_signal,
subsecond_offset=self._subsecond_offset,
fmt=self._fmt,
)
self._signals = tuple(signals)

Expand Down Expand Up @@ -877,6 +892,7 @@ def set_annotations(self, annotations: Iterable[EdfAnnotation]) -> None:
num_data_records=self.num_data_records,
data_record_duration=self.data_record_duration,
subsecond_offset=self.starttime.microsecond / 1_000_000,
fmt=self._fmt,
)
self._set_signals((*self.signals, new_annotation_signal))

Expand Down Expand Up @@ -1065,6 +1081,7 @@ def _slice_annotations_signal(
data_record_duration=self.data_record_duration,
with_timestamps=is_timekeeping_signal,
subsecond_offset=self._subsecond_offset + start - int(start),
fmt=self._fmt,
)


Expand Down
4 changes: 3 additions & 1 deletion edfio/edf_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _create_annotations_signal(
data_record_duration: float,
with_timestamps: bool = True,
subsecond_offset: float = 0,
fmt: Literal["edf", "bdf"] = "edf",
) -> EdfSignal:
data_record_starts = np.arange(num_data_records) * data_record_duration
annotations = sorted(annotations)
Expand Down Expand Up @@ -92,7 +93,8 @@ def _create_annotations_signal(
signal = EdfSignal(
np.arange(1.0), # placeholder signal, as argument `data` is non-optional
sampling_frequency=maxlen // 2 / divisor,
physical_range=(-32768, 32767),
physical_range=(-32768, 32767) if fmt == "edf" else (-8388608, 8388607),
fmt=fmt,
)
signal._label = b"EDF Annotations "
signal._set_samples_per_data_record(maxlen // 2)
Expand Down
26 changes: 17 additions & 9 deletions edfio/edf_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import math
import warnings
from typing import Callable, NamedTuple
from typing import Callable, Literal, NamedTuple

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -79,11 +79,14 @@ class EdfSignal:
physical_range : tuple[float, float] | None, default: None
The physical range given as a tuple of `(physical_min, physical_max)`. If
`None`, this is determined from the data.
digital_range : tuple[int, int], default: `(-32768, 32767)`
digital_range : tuple[int, int] | None, default: None
The digital range given as a tuple of `(digital_min, digital_max)`. Uses the
maximum resolution of 16-bit integers by default.
maximum resolution of 16-bit integers when fmt is "edf" and for 24-bit
integers when fmt is "bdf" by default.
prefiltering : str, default: `""`
The signal prefiltering, e.g., `"HP:0.1Hz LP:75Hz"`.
fmt : str, default `"edf"`
The data format. Can be `"edf"` or `"bdf"`.
"""

_header_fields = (
Expand All @@ -99,7 +102,7 @@ class EdfSignal:
("reserved", 32),
)

_digital: npt.NDArray[np.int16] | None = None
_digital: npt.NDArray[np.int16 | np.int32] | None = None
_lazy_loader: LazyLoader | None = None

def __init__(
Expand All @@ -111,14 +114,18 @@ def __init__(
transducer_type: str = "",
physical_dimension: str = "",
physical_range: tuple[float, float] | None = None,
digital_range: tuple[int, int] = (-32768, 32767),
digital_range: tuple[int, int] | None = None,
prefiltering: str = "",
fmt: Literal["edf", "bdf"] = "edf",
):
self._sampling_frequency = sampling_frequency
self.label = label
self.transducer_type = transducer_type
self.physical_dimension = physical_dimension
self.prefiltering = prefiltering
self._fmt = fmt
if digital_range is None:
digital_range = (-8388608, 8388607) if fmt == "bdf" else (-32768, 32767)
self._set_reserved("")
if not np.all(np.isfinite(data)):
raise ValueError("Signal data must contain only finite values")
Expand Down Expand Up @@ -301,7 +308,7 @@ def sampling_frequency(self) -> float:
return self._sampling_frequency

@property
def digital(self) -> npt.NDArray[np.int16]:
def digital(self) -> npt.NDArray[np.int16 | np.int32]:
"""
Numpy array containing the digital (uncalibrated) signal values as 16-bit integers.

Expand All @@ -314,7 +321,7 @@ def digital(self) -> npt.NDArray[np.int16]:
self._lazy_loader = None
return self._digital

def _calibrate(self, digital: npt.NDArray[np.int16]) -> npt.NDArray[np.float64]:
def _calibrate(self, digital: npt.NDArray[np.int16 | np.int32]) -> npt.NDArray[np.float64]:
try:
gain, offset = _calculate_gain_and_offset(
self.digital_min,
Expand Down Expand Up @@ -352,7 +359,7 @@ def data(self) -> npt.NDArray[np.float64]:

def get_digital_slice(
self, start_second: float, stop_second: float
) -> npt.NDArray[np.int16]:
) -> npt.NDArray[np.int16 | np.int32]:
"""
Get a slice of the digital signal values.

Expand Down Expand Up @@ -499,4 +506,5 @@ def _set_data(self, data: npt.NDArray[np.float64]) -> None:
self.physical_min,
self.physical_max,
)
self._digital = np.round(data / gain - offset).astype(np.int16)
dtype = np.int32 if self._fmt == "bdf" else np.int16
self._digital = np.round(data / gain - offset).astype(dtype)
Loading