Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-5421] Support for non-axial NifTI annotation imports and exports #992

Merged
merged 9 commits into from
Feb 18, 2025
18 changes: 0 additions & 18 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,24 +990,6 @@ def import_annotation(self, item_id: ItemId, payload: Dict[str, Any]) -> None:
"""
...

@abstractmethod
def _get_remote_files_that_require_legacy_scaling(self) -> List[Path]:
"""
Get all remote files that have been scaled upon upload. These files require that
NifTI annotations are similarly scaled during import

Parameters
----------
dataset : RemoteDataset
The remote dataset to get the files from

Returns
-------
List[Path]
A list of full remote paths of dataset items that require NifTI annotations to be scaled
"""
...

@property
def remote_path(self) -> Path:
"""Returns an URL specifying the location of the remote dataset."""
Expand Down
46 changes: 0 additions & 46 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Tuple,
Union,
)
import numpy as np
from pydantic import ValidationError
from requests.models import Response

Expand Down Expand Up @@ -873,51 +872,6 @@ def register_multi_slotted(
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results

def _get_remote_files_that_require_legacy_scaling(
self,
) -> Dict[str, Dict[str, Any]]:
"""
Get all remote files that have been scaled upon upload. These files require that
NifTI annotations are similarly scaled during import.

The in-platform affines are returned for each legacy file, as this is required
to properly re-orient the annotations during import.

Parameters
----------
dataset : RemoteDataset
The remote dataset to get the files from

Returns
-------
Dict[str, Dict[str, Any]]
A dictionary of remote file full paths to their slot affine maps
"""
remote_files_that_require_legacy_scaling = {}
remote_files = self.fetch_remote_files(
filters={"statuses": ["new", "annotate", "review", "complete", "archived"]}
)
for remote_file in remote_files:
if not remote_file.slots[0].get("metadata", {}).get("medical", {}):
continue
if not (
remote_file.slots[0]
.get("metadata", {})
.get("medical", {})
.get("handler")
):
slot_affine_map = {}
for slot in remote_file.slots:
slot_affine_map[slot["slot_name"]] = np.array(
slot["metadata"]["medical"]["affine"],
dtype=np.float64,
)
remote_files_that_require_legacy_scaling[
Path(remote_file.full_path)
] = slot_affine_map

return remote_files_that_require_legacy_scaling


def _find_files_to_upload_as_multi_file_items(
search_files: List[PathLike],
Expand Down
80 changes: 42 additions & 38 deletions darwin/exporter/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Volume:
class_name: str
series_instance_uid: str
from_raster_layer: bool
primary_plane: str


def export(
Expand All @@ -78,11 +79,15 @@ def export(
"""
video_annotations = list(annotation_files)
for video_annotation in video_annotations:
slot_name = video_annotation.slots[0].name
try:
medical_metadata = video_annotation.slots[0].metadata
legacy = not medical_metadata.get("handler") == "MONAI" # type: ignore
plane_map = medical_metadata.get("plane_map", {slot_name: "AXIAL"})
primary_plane = plane_map.get(slot_name, "AXIAL")
except (KeyError, AttributeError):
legacy = True
primary_plane = "AXIAL"

image_id = check_for_error_and_return_imageid(video_annotation, output_dir)
if not isinstance(image_id, str):
Expand All @@ -103,6 +108,7 @@ def export(
class_names_to_export=polygon_class_names,
from_raster_layer=False,
mask_present=mask_present,
primary_plane=primary_plane,
)
slot_map = {slot.name: slot for slot in video_annotation.slots}
polygon_annotations = [
Expand Down Expand Up @@ -132,6 +138,7 @@ def export(
video_annotation,
class_names_to_export=list(mask_id_to_classname.values()),
from_raster_layer=True,
primary_plane=primary_plane,
)

# This assumes only one raster_layer annotation. If we allow multiple raster layers per annotation file we need to change this.
Expand Down Expand Up @@ -161,6 +168,7 @@ def build_output_volumes(
from_raster_layer: bool = False,
class_names_to_export: List[str] = None,
mask_present: Optional[bool] = False,
primary_plane: str = "AXIAL",
) -> Dict:
"""
This is a function to create the output volumes based on the whole annotation file
Expand All @@ -175,6 +183,8 @@ def build_output_volumes(
The list of class names to export
mask_present: bool
If mask annotations are present in the annotation
primary_plane: str
The primary plane of the annotation
Returns
-------
output_volumes: Dict
Expand All @@ -197,6 +207,7 @@ def build_output_volumes(
class_names_to_export = [
""
] # If there are no annotations to export, we still need to create an empty volume

output_volumes[series_instance_uid] = {
class_name: Volume(
pixel_array=np.zeros(volume_dims),
Expand All @@ -207,6 +218,7 @@ def build_output_volumes(
series_instance_uid=series_instance_uid,
class_name=class_name,
from_raster_layer=from_raster_layer,
primary_plane=primary_plane,
)
for class_name in class_names_to_export
}
Expand Down Expand Up @@ -289,7 +301,7 @@ def update_pixel_array(
volume: Dict,
annotation_class_name: str,
im_mask: np.ndarray,
plane: Plane,
primary_plane: str,
frame_idx: int,
) -> Dict:
"""Updates the pixel array of the given volume with the given mask.
Expand All @@ -302,7 +314,7 @@ def update_pixel_array(
Name of the annotation class
im_mask : np.ndarray
Mask to be added to the pixel array
plane : Plane
primary_plane : str
Plane of the mask
frame_idx : int
Frame index of the mask
Expand All @@ -313,12 +325,12 @@ def update_pixel_array(
Updated volume
"""
plane_to_slice = {
Plane.XY: np.s_[:, :, frame_idx],
Plane.XZ: np.s_[:, frame_idx, :],
Plane.YZ: np.s_[frame_idx, :, :],
"AXIAL": np.s_[:, :, frame_idx],
"CORONAL": np.s_[:, frame_idx, :],
"SAGITTAL": np.s_[frame_idx, :, :],
}
if plane in plane_to_slice:
slice_ = plane_to_slice[plane]
if primary_plane in plane_to_slice:
slice_ = plane_to_slice[primary_plane]
volume[annotation_class_name].pixel_array[slice_] = np.logical_or(
im_mask,
volume[annotation_class_name].pixel_array[slice_],
Expand Down Expand Up @@ -358,22 +370,27 @@ def populate_output_volumes_from_polygons(
frames = annotation.frames

for frame_idx in frames.keys():
plane = get_plane_from_slot_name(
slot_name, slot.metadata.get("orientation")
)
primary_plane = volume[annotation.annotation_class.name].primary_plane
dims = volume[annotation.annotation_class.name].dims
if plane == Plane.XY:
if primary_plane == "AXIAL":
height, width = dims[0], dims[1]
elif plane == Plane.XZ:
elif primary_plane == "CORONAL":
height, width = dims[0], dims[2]
elif plane == Plane.YZ:
elif primary_plane == "SAGITTAL":
height, width = dims[1], dims[2]
pixdims = volume[annotation.annotation_class.name].pixdims
frame_data = frames[frame_idx].data
if "paths" in frame_data:
# Dealing with a complex polygon
polygons = [
shift_polygon_coords(polygon_path, pixdims, legacy=legacy)
shift_polygon_coords(
polygon_path,
pixdims,
primary_plane=volume[
annotation.annotation_class.name
].primary_plane,
legacy=legacy,
)
for polygon_path in frame_data["paths"]
]
else:
Expand All @@ -383,7 +400,7 @@ def populate_output_volumes_from_polygons(
output_volumes[series_instance_uid],
annotation.annotation_class.name,
im_mask,
plane,
primary_plane,
frame_idx,
)

Expand Down Expand Up @@ -538,8 +555,17 @@ def _get_reoriented_nifti_image(


def shift_polygon_coords(
polygon: List[Dict], pixdim: List[Number], legacy: bool = False
polygon: List[Dict],
pixdim: List[Number],
primary_plane: str,
legacy: bool = False,
) -> List:
if primary_plane == "AXIAL":
pixdim = [pixdim[0], pixdim[1]]
elif primary_plane == "CORONAL":
pixdim = [pixdim[0], pixdim[2]]
elif primary_plane == "SAGITTAL":
pixdim = [pixdim[1], pixdim[2]]
if legacy:
# Need to make it clear that we flip x/y because we need to take the transpose later.
if pixdim[1] > pixdim[0]:
Expand Down Expand Up @@ -574,28 +600,6 @@ def get_view_idx(frame_idx: int, groups: List) -> int:
return view_idx


def get_plane_from_slot_name(slot_name: str, orientation: Union[str, None]) -> Plane:
"""Returns the plane from the given slot name and orientation.

Parameters
----------
slot_name : str
Slot name
orientation : Union[str, None]
Orientation

Returns
-------
Plane
Enum representing the plane
"""
if orientation is None:
orientation_dict = {"0.1": 0, "0.2": 1, "0.3": 2}
return Plane(orientation_dict.get(slot_name, 0))
orientation_dict = {"AXIAL": 0, "SAGITTAL": 1, "CORONAL": 2}
return Plane(orientation_dict.get(orientation, 0))


def process_metadata(metadata: Dict) -> Tuple:
"""Processes the metadata and returns the volume dimensions, pixel dimensions, affine and original affine.

Expand Down
Loading
Loading