Skip to content

Commit

Permalink
Support for non-axial NifTI annotation imports and exports
Browse files Browse the repository at this point in the history
  • Loading branch information
JBWilkie committed Jan 15, 2025
1 parent 90c3fc4 commit 3913727
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 52 deletions.
76 changes: 37 additions & 39 deletions darwin/exporter/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _console_theme() -> Theme:
import numpy as np

import darwin.datatypes as dt
from darwin.utils import convert_polygons_to_mask
from darwin.utils import convert_polygons_to_mask, get_primary_plane_from_nifti


class Plane(Enum):
Expand All @@ -54,6 +54,7 @@ class Volume:
class_name: str
series_instance_uid: str
from_raster_layer: bool
primary_plane: str


def export(
Expand Down Expand Up @@ -197,6 +198,10 @@ def build_output_volumes(
class_names_to_export = [
""
] # If there are no annotations to export, we still need to create an empty volume

# Determine primary plane from affine matrix
primary_plane = get_primary_plane_from_nifti(affine)

output_volumes[series_instance_uid] = {
class_name: Volume(
pixel_array=np.zeros(volume_dims),
Expand All @@ -207,6 +212,7 @@ def build_output_volumes(
series_instance_uid=series_instance_uid,
class_name=class_name,
from_raster_layer=from_raster_layer,
primary_plane=primary_plane,
)
for class_name in class_names_to_export
}
Expand Down Expand Up @@ -289,7 +295,7 @@ def update_pixel_array(
volume: Dict,
annotation_class_name: str,
im_mask: np.ndarray,
plane: Plane,
primary_plane: str,
frame_idx: int,
) -> Dict:
"""Updates the pixel array of the given volume with the given mask.
Expand All @@ -302,7 +308,7 @@ def update_pixel_array(
Name of the annotation class
im_mask : np.ndarray
Mask to be added to the pixel array
plane : Plane
primary_plane : str
Plane of the mask
frame_idx : int
Frame index of the mask
Expand All @@ -313,12 +319,12 @@ def update_pixel_array(
Updated volume
"""
plane_to_slice = {
Plane.XY: np.s_[:, :, frame_idx],
Plane.XZ: np.s_[:, frame_idx, :],
Plane.YZ: np.s_[frame_idx, :, :],
"AXIAL": np.s_[:, :, frame_idx],
"CORONAL": np.s_[:, frame_idx, :],
"SAGITTAL": np.s_[frame_idx, :, :],
}
if plane in plane_to_slice:
slice_ = plane_to_slice[plane]
if primary_plane in plane_to_slice:
slice_ = plane_to_slice[primary_plane]
volume[annotation_class_name].pixel_array[slice_] = np.logical_or(
im_mask,
volume[annotation_class_name].pixel_array[slice_],
Expand Down Expand Up @@ -358,22 +364,27 @@ def populate_output_volumes_from_polygons(
frames = annotation.frames

for frame_idx in frames.keys():
plane = get_plane_from_slot_name(
slot_name, slot.metadata.get("orientation")
)
primary_plane = volume[annotation.annotation_class.name].primary_plane
dims = volume[annotation.annotation_class.name].dims
if plane == Plane.XY:
if primary_plane == "AXIAL":
height, width = dims[0], dims[1]
elif plane == Plane.XZ:
elif primary_plane == "CORONAL":
height, width = dims[0], dims[2]
elif plane == Plane.YZ:
elif primary_plane == "SAGITTAL":
height, width = dims[1], dims[2]
pixdims = volume[annotation.annotation_class.name].pixdims
frame_data = frames[frame_idx].data
if "paths" in frame_data:
# Dealing with a complex polygon
polygons = [
shift_polygon_coords(polygon_path, pixdims, legacy=legacy)
shift_polygon_coords(
polygon_path,
pixdims,
primary_plane=volume[
annotation.annotation_class.name
].primary_plane,
legacy=legacy,
)
for polygon_path in frame_data["paths"]
]
else:
Expand All @@ -383,7 +394,7 @@ def populate_output_volumes_from_polygons(
output_volumes[series_instance_uid],
annotation.annotation_class.name,
im_mask,
plane,
primary_plane,
frame_idx,
)

Expand Down Expand Up @@ -538,8 +549,17 @@ def _get_reoriented_nifti_image(


def shift_polygon_coords(
polygon: List[Dict], pixdim: List[Number], legacy: bool = False
polygon: List[Dict],
pixdim: List[Number],
primary_plane: str,
legacy: bool = False,
) -> List:
if primary_plane == "AXIAL":
pixdim = [pixdim[0], pixdim[1]]
elif primary_plane == "CORONAL":
pixdim = [pixdim[0], pixdim[2]]
elif primary_plane == "SAGITTAL":
pixdim = [pixdim[1], pixdim[2]]
if legacy:
# Need to make it clear that we flip x/y because we need to take the transpose later.
if pixdim[1] > pixdim[0]:
Expand Down Expand Up @@ -574,28 +594,6 @@ def get_view_idx(frame_idx: int, groups: List) -> int:
return view_idx


def get_plane_from_slot_name(slot_name: str, orientation: Union[str, None]) -> Plane:
"""Returns the plane from the given slot name and orientation.
Parameters
----------
slot_name : str
Slot name
orientation : Union[str, None]
Orientation
Returns
-------
Plane
Enum representing the plane
"""
if orientation is None:
orientation_dict = {"0.1": 0, "0.2": 1, "0.3": 2}
return Plane(orientation_dict.get(slot_name, 0))
orientation_dict = {"AXIAL": 0, "SAGITTAL": 1, "CORONAL": 2}
return Plane(orientation_dict.get(orientation, 0))


def process_metadata(metadata: Dict) -> Tuple:
"""Processes the metadata and returns the volume dimensions, pixel dimensions, affine and original affine.
Expand Down
31 changes: 18 additions & 13 deletions darwin/importer/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from rich.console import Console

from darwin.utils import attempt_decode
from darwin.utils import attempt_decode, get_primary_plane_from_nifti

console = Console()
try:
Expand Down Expand Up @@ -106,7 +106,7 @@ def _parse_nifti(
remote_file_path: Path,
remote_files_that_require_legacy_scaling: Dict[Path, Dict[str, Any]] = {},
) -> dt.AnnotationFile:
img, pixdims = process_nifti(
img, pixdims, primary_plane = process_nifti(
nib.load(nifti_path),
remote_file_path=remote_file_path,
remote_files_that_require_legacy_scaling=remote_files_that_require_legacy_scaling,
Expand All @@ -129,6 +129,7 @@ def _parse_nifti(
slot_names=slot_names,
is_mpr=is_mpr,
pixdims=pixdims,
primary_plane=primary_plane,
legacy=legacy,
)
if _video_annotations:
Expand All @@ -144,6 +145,7 @@ def _parse_nifti(
slot_names=slot_names,
is_mpr=is_mpr,
pixdims=pixdims,
primary_plane=primary_plane,
legacy=legacy,
)
if _video_annotations is None:
Expand Down Expand Up @@ -193,6 +195,7 @@ def get_polygon_video_annotations(
slot_names: List[str],
is_mpr: bool,
pixdims: Tuple[float],
primary_plane: str,
legacy: bool = False,
) -> Optional[List[dt.VideoAnnotation]]:
if not is_mpr:
Expand All @@ -201,7 +204,7 @@ def get_polygon_video_annotations(
class_name,
class_idxs,
slot_names,
view_idx=2,
primary_plane=primary_plane,
pixdims=pixdims,
legacy=legacy,
)
Expand All @@ -213,7 +216,7 @@ def get_polygon_video_annotations(
class_name,
class_idxs,
[slot_name],
view_idx=view_idx,
primary_plane=primary_plane,
pixdims=pixdims,
legacy=legacy,
)
Expand Down Expand Up @@ -329,19 +332,20 @@ def nifti_to_video_polygon_annotation(
class_name: str,
class_idxs: List[int],
slot_names: List[str],
view_idx: int = 2,
pixdims: Tuple[int, int, int] = (1, 1, 1),
primary_plane: str,
pixdims: Tuple[float, float, float] = (1, 1, 1),
legacy: bool = False,
) -> Optional[List[dt.VideoAnnotation]]:
frame_annotations = OrderedDict()
for i in range(volume.shape[view_idx]):
if view_idx == 2:
view_idxs = {"AXIAL": 2, "CORONAL": 1, "SAGITTAL": 0}
for i in range(volume.shape[view_idxs[primary_plane]]):
if primary_plane == "AXIAL":
slice_mask = volume[:, :, i].astype(np.uint8)
_pixdims = [pixdims[0], pixdims[1]]
elif view_idx == 1:
elif primary_plane == "CORONAL":
slice_mask = volume[:, i, :].astype(np.uint8)
_pixdims = [pixdims[0], pixdims[2]]
elif view_idx == 0:
elif primary_plane == "SAGITTAL":
slice_mask = volume[i, :, :].astype(np.uint8)
_pixdims = [pixdims[1], pixdims[2]]
class_mask = np.isin(slice_mask, class_idxs).astype(np.uint8).copy()
Expand Down Expand Up @@ -524,7 +528,7 @@ def process_nifti(
ornt: Optional[List[List[float]]] = [[0.0, -1.0], [1.0, -1.0], [2.0, -1.0]],
remote_file_path: Path = Path("/"),
remote_files_that_require_legacy_scaling: Dict[Path, Dict[str, Any]] = {},
) -> Tuple[np.ndarray, Tuple[float]]:
) -> Tuple[np.ndarray, Tuple[float], str]:
"""
Converts a NifTI object of any orientation to the passed ornt orientation.
The default ornt is LPI.
Expand Down Expand Up @@ -553,6 +557,7 @@ def process_nifti(
Returns:
data_array: pixel array with orientation ornt.
pixdims: tuple of nifti header zoom values.
primary_plane: string indicating the primary anatomical plane.
"""
img = correct_nifti_header_if_necessary(input_data)
orig_ax_codes = nib.orientations.aff2axcodes(img.affine)
Expand All @@ -567,8 +572,8 @@ def process_nifti(
reoriented_img = img.as_reoriented(transform)
data_array = reoriented_img.get_fdata()
pixdims = reoriented_img.header.get_zooms()

return data_array, pixdims
primary_plane = get_primary_plane_from_nifti(reoriented_img.affine)
return data_array, pixdims, primary_plane


def convert_to_dense_rle(raster: np.ndarray) -> List[int]:
Expand Down
34 changes: 34 additions & 0 deletions darwin/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1643,3 +1643,37 @@ def grouped(iterable, n):
path.append({"x": x, "y": y})
polygons.append(path)
return {"path": polygons}


def get_primary_plane_from_nifti(affine: np.ndarray) -> str:
"""
Determine the primary orientation of a NIfTI image from its affine matrix.
Parameters
----------
affine : numpy.ndarray
4x4 affine matrix from NIfTI file
Returns
-------
str
The primary anatomical plane - 'AXIAL', 'SAGITTAL', or 'CORONAL'
"""
# Extract the rotation/scaling matrix (first 3x3 elements)
rotation_scale = affine[:3, :3]

# Get absolute values to determine magnitude of each direction
abs_rotation = np.abs(rotation_scale)

# Find which axis has the largest magnitude for each dimension
primary_directions = np.argmax(abs_rotation, axis=1)

# Find which dimension has the largest overall magnitude
magnitudes = [abs_rotation[i, primary_directions[i]] for i in range(3)]
primary_plane = np.argmax(magnitudes)
if primary_plane == 0:
return "SAGITTAL"
elif primary_plane == 1:
return "CORONAL"
else:
return "AXIAL"

0 comments on commit 3913727

Please sign in to comment.