Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Fine calibration support for more MEG systems #12966

Merged
merged 7 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/devel/12966.newfeature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ability to use :func:`mne.preprocessing.compute_fine_calibration` with non-Neuromag-style systems, as well as options to control the bad-angle and error tolerances, by `Eric Larson`_.
63 changes: 51 additions & 12 deletions mne/preprocessing/_fine_cal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import partial

import numpy as np
from scipy.optimize import fmin_cobyla
from scipy.optimize import minimize

from .._fiff.pick import pick_info, pick_types
from .._fiff.tag import _coil_trans_to_loc, _loc_to_coil_trans
Expand All @@ -16,6 +16,7 @@
from ..utils import (
_check_fname,
_check_option,
_clean_names,
_ensure_int,
_pl,
_reg_pinv,
Expand Down Expand Up @@ -43,6 +44,9 @@ def compute_fine_calibration(
origin=(0.0, 0.0, 0.0),
cross_talk=None,
calibration=None,
*,
angle_limit=5.0,
err_limit=5.0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT err_limit isn't validated anywhere, there should at least be some check that it is non-negative right?

verbose=None,
):
"""Compute fine calibration from empty-room data.
Expand All @@ -68,6 +72,17 @@ def compute_fine_calibration(
Dictionary with existing calibration. If provided, the magnetometer
imbalances and adjusted normals will be used and only the gradiometer
imbalances will be estimated (see step 2 in Notes below).
angle_limit : float
The maximum permitted angle in degrees between the original and adjusted
magnetometer normals. If the angle is exceeded, the segment is treated as
an outlier and discarded.

.. versionadded:: 1.9
err_limit : float
The maximum error (in percent) for each channel in order for a segment to
be used.

.. versionadded:: 1.9
%(verbose)s

Returns
Expand Down Expand Up @@ -114,6 +129,12 @@ def compute_fine_calibration(
ext_order = _ensure_int(ext_order, "ext_order")
origin = _check_origin(origin, raw.info, "meg", disp=True)
_check_option("raw.info['bads']", raw.info["bads"], ([],))
_validate_type(err_limit, "numeric", "err_limit")
_validate_type(angle_limit, "numeric", "angle_limit")
for key, val in dict(err_limit=err_limit, angle_limit=angle_limit).items():
if val < 0:
raise ValueError(f"{key} must be greater than or equal to 0, got {val}")
# Fine cal should not include ref channels
picks = pick_types(raw.info, meg=True, ref_meg=False)
if raw.info["dev_head_t"] is not None:
raise ValueError(
Expand Down Expand Up @@ -144,7 +165,7 @@ def compute_fine_calibration(
locs = np.array([ch["loc"] for ch in info["chs"]])
zs = locs[mag_picks, -3:].copy()
if calibration is not None:
_, calibration, _ = _prep_fine_cal(info, calibration)
_, calibration, _ = _prep_fine_cal(info, calibration, ignore_ref=True)
for pi, pick in enumerate(mag_picks):
idx = calibration["ch_names"].index(info["ch_names"][pick])
cals[pick] = calibration["imb_cals"][idx].item()
Expand All @@ -164,7 +185,14 @@ def compute_fine_calibration(
data = raw[picks, start:stop][0]
if ctc is not None:
data = ctc.dot(data)
z, cal, good = _adjust_mag_normals(info, data, origin, ext_order)
z, cal, good = _adjust_mag_normals(
info,
data,
origin,
ext_order,
angle_limit=angle_limit,
err_limit=err_limit,
)
if good:
z_list.append(z)
cal_list.append(cal)
Expand Down Expand Up @@ -213,7 +241,8 @@ def compute_fine_calibration(
cals[ii : ii + 1] if ii in mag_picks else imb[ii]
for ii in range(len(info["ch_names"]))
]
calibration = dict(ch_names=info["ch_names"], locs=locs, imb_cals=imb_cals)
ch_names = _clean_names(info["ch_names"], remove_whitespace=True)
calibration = dict(ch_names=ch_names, locs=locs, imb_cals=imb_cals)
return calibration, count


Expand Down Expand Up @@ -252,7 +281,7 @@ def _vector_angle(x, y):
)


def _adjust_mag_normals(info, data, origin, ext_order):
def _adjust_mag_normals(info, data, origin, ext_order, *, angle_limit, err_limit):
"""Adjust coil normals using magnetometers and empty-room data."""
# in principle we could allow using just mag or mag+grad, but MF uses
# just mag so let's follow suit
Expand Down Expand Up @@ -317,9 +346,16 @@ def _adjust_mag_normals(info, data, origin, ext_order):
)

# Figure out the additive term for z-component
zs[cal_idx] = fmin_cobyla(
objective, old_z, cons=(), rhobeg=1e-3, rhoend=1e-4, disp=False
)
zs[cal_idx] = minimize(
objective,
old_z,
bounds=[(-2, 2)] * 3,
# BFGS is the default for minimize but COBYLA converges faster
method="COBYLA",
# Start with a small relative step because nominal geometry information
# should be fairly accurate to begin with
options=dict(rhobeg=1e-1),
).x

# Do in-place adjustment to all_coils
cals[cal_idx] = 1.0 / np.linalg.norm(zs[cal_idx])
Expand Down Expand Up @@ -347,12 +383,15 @@ def _adjust_mag_normals(info, data, origin, ext_order):
# Chunk is usable if all angles and errors are both small
reason = list()
max_angle = np.max(angles)
if max_angle >= 5.0:
reason.append(f"max angle {max_angle:0.2f} >= 5°")
if max_angle >= angle_limit:
reason.append(f"max angle {max_angle:0.2f} >= {angle_limit:0.1f}°")
each_err = _data_err(data, S_tot, cals, axis=-1)[picks_mag]
n_bad = (each_err > 5.0).sum()
n_bad = (each_err > err_limit).sum()
if n_bad:
reason.append(f"{n_bad} residual{_pl(n_bad)} > 5%")
reason.append(
f"{n_bad} residual{_pl(n_bad)} > {err_limit:0.1f}% "
f"(max: {each_err.max():0.2f}%)"
)
reason = ", ".join(reason)
if reason:
reason = f" ({reason})"
Expand Down
33 changes: 19 additions & 14 deletions mne/preprocessing/maxwell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

from collections import Counter, OrderedDict
from collections import Counter
from functools import partial
from math import factorial
from os import path as op
Expand Down Expand Up @@ -2070,7 +2070,7 @@ def _overlap_projector(data_int, data_res, corr):
return V_principal


def _prep_fine_cal(info, fine_cal):
def _prep_fine_cal(info, fine_cal, *, ignore_ref):
from ._fine_cal import read_fine_calibration

_validate_type(fine_cal, (dict, "path-like"))
Expand All @@ -2081,17 +2081,18 @@ def _prep_fine_cal(info, fine_cal):
extra = "dict"
logger.info(f" Using fine calibration {extra}")
ch_names = _clean_names(info["ch_names"], remove_whitespace=True)
info_to_cal = OrderedDict()
info_to_cal = dict()
missing = list()
for ci, name in enumerate(fine_cal["ch_names"]):
if name not in ch_names:
names_clean = _clean_names(fine_cal["ch_names"], remove_whitespace=True)
for ci, (name, name_clean) in enumerate(zip(fine_cal["ch_names"], names_clean)):
if name_clean not in ch_names:
missing.append(name)
else:
oi = ch_names.index(name)
oi = ch_names.index(name_clean)
info_to_cal[oi] = ci
meg_picks = pick_types(info, meg=True, exclude=[])
meg_picks = pick_types(info, meg=True, exclude=[], ref_meg=not ignore_ref)
if len(info_to_cal) != len(meg_picks):
bad = sorted({ch_names[pick] for pick in meg_picks} - set(fine_cal["ch_names"]))
bad = sorted({ch_names[pick] for pick in meg_picks} - set(names_clean))
raise RuntimeError(
f"Not all MEG channels found in fine calibration file, missing:\n{bad}"
)
Expand All @@ -2102,9 +2103,9 @@ def _prep_fine_cal(info, fine_cal):

def _update_sensor_geometry(info, fine_cal, ignore_ref):
"""Replace sensor geometry information and reorder cal_chs."""
info_to_cal, fine_cal, ch_names = _prep_fine_cal(info, fine_cal)
grad_picks = pick_types(info, meg="grad", exclude=())
mag_picks = pick_types(info, meg="mag", exclude=())
info_to_cal, fine_cal, _ = _prep_fine_cal(info, fine_cal, ignore_ref=ignore_ref)
grad_picks = pick_types(info, meg="grad", exclude=(), ref_meg=not ignore_ref)
mag_picks = pick_types(info, meg="mag", exclude=(), ref_meg=not ignore_ref)

# Determine gradiometer imbalances and magnetometer calibrations
grad_imbalances = np.array(
Expand Down Expand Up @@ -2134,7 +2135,11 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
assert not used[oi]
used[oi] = True
info_ch = info["chs"][oi]
ch_num = int(fine_cal["ch_names"][ci].lstrip("MEG").lstrip("0"))
# This only works for VV-like names
try:
ch_num = int(fine_cal["ch_names"][ci].lstrip("MEG").lstrip("0"))
except ValueError: # invalid literal for int() with base 10
ch_num = oi
cal_chans.append([ch_num, info_ch["coil_type"]])

# Some .dat files might only rotate EZ, so we must check first that
Expand Down Expand Up @@ -2174,7 +2179,7 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
# Channel positions are not changed
info_ch["loc"][3:] = cal_loc[3:]
assert info_ch["coord_frame"] == FIFF.FIFFV_COORD_DEVICE
meg_picks = pick_types(info, meg=True, exclude=())
meg_picks = pick_types(info, meg=True, exclude=(), ref_meg=not ignore_ref)
assert used[meg_picks].all()
assert not used[np.setdiff1d(np.arange(len(used)), meg_picks)].any()
# This gets written to the Info struct
Expand All @@ -2186,7 +2191,7 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
np.clip(ang_shift, -1.0, 1.0, ang_shift)
np.rad2deg(np.arccos(ang_shift), ang_shift) # Convert to degrees
logger.info(
" Adjusted coil positions by (μ ± σ): "
" Adjusted coil orientations by (μ ± σ): "
f"{np.mean(ang_shift):0.1f}° ± {np.std(ang_shift):0.1f}° "
f"(max: {np.max(np.abs(ang_shift)):0.1f}°)"
)
Expand Down
Loading
Loading