Skip to content

Commit

Permalink
Use the EEG instrument at iEEG base to avoid different behaviours
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincar committed Feb 1, 2024
1 parent 2a4c9fa commit f04a6c5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 183 deletions.
12 changes: 12 additions & 0 deletions libbids/instruments/eeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
record_duration: float = 1.0,
init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
stop_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
is_digital: bool = False,
**kwargs
):
Expand Down Expand Up @@ -69,6 +70,8 @@ def __init__(
callable, the function is simply called
read_fn : Union[Tuple[str, Dict], Callable]
Similar to `init_read_fn`, but used for sampling data from the device
stop_fn: Union[Tuple[str, List, Dict], Callable]
The function used to stop the actual hardware
is_digital : bool
Whether the data recorded from the device is in a digital format or
a physical floating point integer (e.g., µV)
Expand All @@ -91,6 +94,7 @@ def __init__(
self.record_duration: float = record_duration
self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn
self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn
self.stop_fn: Union[Tuple[str, List, Dict], Callable] = stop_fn
self.is_digital: bool = is_digital
self.modality_path.mkdir(exist_ok=True)
self.metadata: Dict = self._fixup_edf_metadata(kwargs)
Expand Down Expand Up @@ -124,6 +128,14 @@ def device_read(self) -> Union[np.ndarray, List]:
fn, args, kwargs = cast(Tuple, self.read_fn)
return self.device.__getattribute__(fn)(*args, **kwargs)

def device_stop(self) -> None:
"""Stop the device"""
if isinstance(self.stop_fn, Callable): # type: ignore
return case(Callable, self.stop_fn)()

fn, args, kwards = cast(Tuple, self.stop_fn)
return self.device.__getattribute__(fn)(*args, **kwargs)

def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()
Expand Down
203 changes: 20 additions & 183 deletions libbids/instruments/ieeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
cast,
)

from .read_instrument import ReadInstrument
from .eeg_instrument import EEGInstrument
from ..enums import Modality

if TYPE_CHECKING:
from ..session import Session # type: ignore


class IEEGInstrument(ReadInstrument):
class IEEGInstrument(EEGInstrument):
"""An instrumentation device capable of recording intracranial
electroencephalograms such as electrocorticograms"""

Expand All @@ -36,6 +36,7 @@ def __init__(
record_duration: float = 1.0,
init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
stop_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
is_digital: bool = False,
**kwargs
):
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
callable, the function is simply called
read_fn : Union[Tuple[str, Dict], Callable]
Similar to `init_read_fn`, but used for sampling data from the device
stop_fn: Union[Tuple[str, List, Dict], Callable]
is_digital : bool
Whether the data recorded from the device is in a digital format or
a physical floating point integer (e.g., µV)
Expand All @@ -78,185 +80,20 @@ def __init__(
edf file header. <See
https://pyedflib.readthedocs.io/en/latest/_modules/pyedflib/edfwriter.html#EdfWriter.setHeader>
"""
super(IEEGInstrument, self).__init__(session, Modality.iEEG, file_ext="edf")
self.sfreqs: List[int] = sfreq if isinstance(sfreq, List) else [sfreq]
assert len(electrodes) > 1, "Must supply electrodes"
assert len(self.sfreqs) == 1 or len(self.sfreqs) == len(
electrodes
), "Must supply same number of sampling rates as electrodes"
self.device: Any = device
self.electrodes: List[str] = electrodes
self.physical_dimension: str = physical_dimension
self.physical_lim: Tuple = physical_lim
self.preamp_filter: str = preamp_filter
self.record_duration: float = record_duration
self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn
self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn
self.is_digital: bool = is_digital
super(IEEGInstrument, self).__init__(
session,
device,
sfreq,
electrodes,
physical_dimension,
physical_lim,
preamp_filter,
record_duration,
init_read_fn,
read_fn,
stop_fn,
is_digital,
**kwargs
)
super(EEGInstrument, self).__init__(session, Modality.iEEG, file_ext="edf")
self.modality_path.mkdir(exist_ok=True)
self.metadata: Dict = self._fixup_edf_metadata(kwargs)
self.buffer: np.ndarray
self.buffers: List[np.ndarray] = [np.array([]) for i in range(len(self.sfreqs))]

def annotate(self, onset: float, duration: float, description: str):
assert self.writer.writeAnnotation(onset, duration, description) == 0

def device_init_read(self):
"""Initializes reading on the device"""
if isinstance(self.init_read_fn, Callable):
self.init_read_fn()
else:
fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)

def device_read(self) -> Union[np.ndarray, List]:
"""read data from the device
Returns
-------
np.ndarray
If all channels share the same sampling rate
List
If not all channels share the same sampling rate
"""
if isinstance(self.read_fn, Callable): # type: ignore
return cast(Callable, self.read_fn)()

fn, args, kwargs = cast(Tuple, self.read_fn)
return self.device.__getattribute__(fn)(*args, **kwargs)

def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()

def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data
Parameters
----------
remainder : bool
Because EDFWriter only allows full seconds to be written, the last
time this function should be called is with remainder set to true
Returns
-------
np.ndarray
A 2D array of data in the shape of (channels, time)
"""
if len(self.sfreqs) == 1:
sfreq: int = self.sfreqs[0]
period: int = int(sfreq * self.record_duration)
samples: np.ndarray = cast(np.ndarray, self.device_read())
self.buffer = np.c_[self.buffer, samples]
if (not remainder) and (self.buffer.shape[1] >= period):
n_periods: int = self.buffer.shape[1] // period
period_boundary: int = n_periods * period
writebuf: np.ndarray = self.buffer[:, :period_boundary]
self.buffer = self.buffer[:, period_boundary:]
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
elif remainder and (self.buffer.shape[1] > 0):
writebuf = self.buffer
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
return samples
else:
periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs]
ch_samples: List = cast(List, self.device_read())
assert len(ch_samples) == len(
self.sfreqs
), "Data must be the same length as the number sfreqs"
self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)]
period_met: np.bool_ = np.all(
[i.shape[0] >= j for i, j in zip(self.buffers, periods)]
)
has_data: np.bool_ = np.any([i.shape[0] > 0 for i in self.buffers])
if (not remainder) and period_met:
n_periods: List = [
i.shape[0] // j for i, j in zip(self.buffers, periods)
]
period_boundaries: List = [i * j for i, j in zip(n_periods, periods)]
writebufs: List = [
i[:j] for i, j in zip(self.buffers, period_boundaries)
]
self.buffers = [i[j:] for i, j in zip(self.buffers, period_boundaries)]
self.writer.writeSamples(writebufs, digital=self.is_digital)
elif remainder and has_data:
writebufs = self.buffers
self.writer.writeSamples(writebufs, digital=self.is_digital)
return ch_samples

def start(self, task: str, run_id: str):
"""Begin recording a run
Parameters
----------
task : str
The name of the task to be applied to the recorded file data
run_id : str
The id of the run that will be appended to the file
"""
super().start(task, run_id)
n_electrodes: int = len(self.electrodes)
self._initialize_edf_file()
self.buffer = np.empty(shape=(n_electrodes, 0))
self.writer.setStartdatetime(datetime.now())
self.device_init_read()

def stop(self):
"""Stop the run"""
super().stop()
self.writer.close()

def _fixup_edf_metadata(self, metadata: Dict):
"""A dictionary of values that will be used to store edf metadata
Parameters
----------
metadata : Dict
The dictionary that will be parsed
"""
required_keys: List[str] = [
"technician",
"recording_additional",
"patientname",
"patient_additional",
"patientcode",
"equipment",
"admincode",
"gender",
"sex",
"startdate",
"birthdate",
]
result: Dict = {}
for required_key in required_keys:
if required_key in metadata.keys():
value = metadata[required_key]
else:
value = ""
if required_key == "startdate":
value = datetime.now()
result.update({required_key: value})
return result

def _initialize_edf_file(self) -> None:
"""Initialize the EDF file that will save the data collected from this
instrument"""
edf_fp: str = str(self.filepath)
n_electrodes: int = len(self.electrodes)
self.writer: EdfWriter = EdfWriter(edf_fp, n_electrodes)
self.writer.setHeader(self.metadata)
self.writer.setDatarecordDuration(self.record_duration)
for i, el in enumerate(self.electrodes):
self.writer.setLabel(i, el)
self.writer.setPhysicalDimension(i, self.physical_dimension)
self.writer.setPhysicalMaximum(i, self.physical_lim[0])
self.writer.setPhysicalMinimum(i, self.physical_lim[1])
self.writer.setSamplefrequency(
i, self.sfreqs[0] if len(self.sfreqs) == 1 else self.sfreqs[i]
)
if "AUX" not in el:
self.writer.setPrefilter(i, self.preamp_filter)

0 comments on commit f04a6c5

Please sign in to comment.