Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-5370][External] Axially-agnostic pixdim scaling on import for medical files that require it #991

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,24 +996,6 @@ def import_annotation(self, item_id: ItemId, payload: Dict[str, Any]) -> None:
"""
...

@abstractmethod
def _get_remote_files_that_require_legacy_scaling(self) -> List[Path]:
"""
Get all remote files that have been scaled upon upload. These files require that
NifTI annotations are similarly scaled during import
Parameters
----------
dataset : RemoteDataset
The remote dataset to get the files from
Returns
-------
List[Path]
A list of full remote paths of dataset items that require NifTI annotations to be scaled
"""
...

@property
def remote_path(self) -> Path:
"""Returns an URL specifying the location of the remote dataset."""
Expand Down
46 changes: 0 additions & 46 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Tuple,
Union,
)
import numpy as np
from pydantic import ValidationError
from requests.models import Response

Expand Down Expand Up @@ -873,51 +872,6 @@ def register_multi_slotted(
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results

def _get_remote_files_that_require_legacy_scaling(
self,
) -> Dict[str, Dict[str, Any]]:
"""
Get all remote files that have been scaled upon upload. These files require that
NifTI annotations are similarly scaled during import.

The in-platform affines are returned for each legacy file, as this is required
to properly re-orient the annotations during import.

Parameters
----------
dataset : RemoteDataset
The remote dataset to get the files from

Returns
-------
Dict[str, Dict[str, Any]]
A dictionary of remote file full paths to their slot affine maps
"""
remote_files_that_require_legacy_scaling = {}
remote_files = self.fetch_remote_files(
filters={"statuses": ["new", "annotate", "review", "complete", "archived"]}
)
for remote_file in remote_files:
if not remote_file.slots[0].get("metadata", {}).get("medical", {}):
continue
if not (
remote_file.slots[0]
.get("metadata", {})
.get("medical", {})
.get("handler")
):
slot_affine_map = {}
for slot in remote_file.slots:
slot_affine_map[slot["slot_name"]] = np.array(
slot["metadata"]["medical"]["affine"],
dtype=np.float64,
)
remote_files_that_require_legacy_scaling[
Path(remote_file.full_path)
] = slot_affine_map

return remote_files_that_require_legacy_scaling


def _find_files_to_upload_as_multi_file_items(
search_files: List[PathLike],
Expand Down
49 changes: 49 additions & 0 deletions darwin/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,55 @@ def get_sub(self, annotation_type: str) -> Optional[SubAnnotation]:
return sub
return None

def scale_coordinates(self, x_scale: float, y_scale: float) -> None:
"""
Multiplies the coordinates of the annotation by the given values.
Parameters
----------
x_scale : float
Scale factor for x coordinates
y_scale : float
Scale factor for y coordinates
"""
annotation_type = (
self.annotation_class.annotation_type
if hasattr(self, "annotation_class")
else None
)

if annotation_type == "bounding_box":
self.data["x"] *= x_scale
self.data["y"] *= y_scale
self.data["w"] *= x_scale
self.data["h"] *= y_scale

elif annotation_type == "polygon":
for path in self.data["paths"]:
for point in path:
point["x"] *= x_scale
point["y"] *= y_scale

elif annotation_type == "ellipse":
self.data["center"]["x"] *= x_scale
self.data["center"]["y"] *= y_scale
self.data["radius"]["x"] *= x_scale
self.data["radius"]["y"] *= y_scale

elif annotation_type == "line":
for point in self.data["path"]:
point["x"] *= x_scale
point["y"] *= y_scale

elif annotation_type == "keypoint":
self.data["x"] *= x_scale
self.data["y"] *= y_scale

elif annotation_type == "skeleton":
for node in self.data["nodes"]:
node["x"] *= x_scale
node["y"] *= y_scale


@dataclass(frozen=False, eq=True)
class VideoAnnotation:
Expand Down
211 changes: 204 additions & 7 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Tuple,
Union,
)

import numpy as np

from darwin.datatypes import (
AnnotationFile,
Expand Down Expand Up @@ -117,7 +117,9 @@ def _find_and_parse( # noqa: C901
console: Optional[Console] = None,
use_multi_cpu: bool = True,
cpu_limit: int = 1,
remote_files_that_require_legacy_scaling: Optional[List[Path]] = None,
remote_files_that_require_legacy_scaling: Optional[
Dict[str, Dict[str, Any]]
] = None,
) -> Optional[Iterable[dt.AnnotationFile]]:
is_console = console is not None

Expand Down Expand Up @@ -185,6 +187,9 @@ def maybe_console(*args: Union[str, int, float]) -> None:
maybe_console("Finished.")
# Sometimes we have a list of lists of AnnotationFile, sometimes we have a list of AnnotationFile
# We flatten the list of lists
if not parsed_files:
return None

if isinstance(parsed_files, list):
if isinstance(parsed_files[0], list):
parsed_files = [item for sublist in parsed_files for item in sublist]
Expand Down Expand Up @@ -1252,21 +1257,34 @@ def import_annotations( # noqa: C901
console.print("Retrieving local annotations ...", style="info")
local_files = []
local_files_missing_remotely = []

remote_files_targeted_by_import = _get_remote_files_targeted_by_import(
importer, file_paths, dataset, console, use_multi_cpu, cpu_limit
)
(
remote_files_that_require_legacy_nifti_scaling,
remote_files_that_require_pixel_to_mm_transform,
) = _get_remote_medical_file_transform_requirements(remote_files_targeted_by_import)

if importer.__module__ == "darwin.importer.formats.nifti":
remote_files_that_require_legacy_scaling = (
dataset._get_remote_files_that_require_legacy_scaling()
)
maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse(
importer,
file_paths,
console,
use_multi_cpu,
cpu_limit,
remote_files_that_require_legacy_scaling,
remote_files_that_require_legacy_nifti_scaling,
)
else:
maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse(
importer, file_paths, console, use_multi_cpu, cpu_limit
importer,
file_paths,
console,
use_multi_cpu,
cpu_limit,
)
maybe_parsed_files = _scale_coordinates_by_pixdims(
maybe_parsed_files, remote_files_that_require_pixel_to_mm_transform
)

if not maybe_parsed_files:
Expand Down Expand Up @@ -2312,3 +2330,182 @@ def _split_payloads(
payloads.append(current_payload)

return payloads


def _get_remote_files_targeted_by_import(
importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]],
file_paths: List[PathLike],
dataset: "RemoteDataset",
console: Optional[Console] = None,
use_multi_cpu: bool = True,
cpu_limit: int = 1,
) -> List[DatasetItem]:
"""
Parses local annotations files for import and returns a list of remote dataset items
targeted by the import. Handles chunking of requests if there are many files to
avoid URL length issues.
Parameters
----------
importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]]
The importer used to parse local annotation files
file_paths: List[PathLike]
A list of local annotation files to be uploaded
dataset: RemoteDataset
The remote dataset to fetch files from
console: Optional[Console]
The console object
use_multi_cpu: bool
Whether to use multi-CPU processing
cpu_limit: int
The number of CPUs to use for processing
Returns
-------
List[DatasetItem]
A list of remote dataset items targeted by the import
Raises
------
ValueError
If no files could be parsed or if the URL becomes too long even with minimum chunk size
"""
maybe_parsed_files = _find_and_parse(
importer, file_paths, console, use_multi_cpu, cpu_limit
)
if not maybe_parsed_files:
raise ValueError("Not able to parse any files.")

remote_filenames = list({file.filename for file in maybe_parsed_files})
remote_filepaths = [file.full_path for file in maybe_parsed_files]

chunk_size = 100
all_remote_files: List[DatasetItem] = []
while chunk_size > 0:
try:
for i in range(0, len(remote_filenames), chunk_size):
chunk = remote_filenames[i : i + chunk_size]
remote_files = dataset.fetch_remote_files(filters={"item_names": chunk})
all_remote_files.extend(remote_files)
break
except RequestEntitySizeExceeded:
chunk_size -= 8
if chunk_size <= 0:
raise ValueError(
"Unable to fetch remote file list - URL too long even with minimum chunk size."
)
return [
remote_file
for remote_file in all_remote_files
if remote_file.full_path in remote_filepaths
]


def _get_remote_medical_file_transform_requirements(
remote_files_targeted_by_import: List[DatasetItem],
) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, List[str]]]:
"""
This function parses the remote files targeted by the import. If the remote file is
a medical file, it checks if it requires legacy NifTI scaling or a pixel to mm transform.
If the file requires a pixel to mm transform, it select the correct pixdims values
based on the axis of acquisition.
Parameters
----------
remote_files_targeted_by_import: List[DatasetItem]
The remote files targeted by the import
Returns
-------
Tuple[Dict[str, Dict[Path, Any]], Dict[Path, Any]]
A tuple of 2 dictionaries:
- remote_files_that_require_legacy_nifti_scaling: A dictionary of remote files
that require legacy NifTI scaling and the slot name to affine matrix mapping
- remote_files_that_require_pixel_to_mm_transform: A dictionary of remote files
that require a pixel to mm transform and the pixdims of the (x, y) axes
"""
remote_files_that_require_legacy_nifti_scaling = {}
remote_files_that_require_pixel_to_mm_transform = {}
for remote_file in remote_files_targeted_by_import:
if not remote_file.slots:
continue
slot_pixdim_map = {}
slot_affine_map = {}
for slot in remote_file.slots:
if not slot_is_medical(slot):
continue
if slot_is_handled_by_monai(slot):
slot_name = slot["slot_name"]
primary_plane = slot["metadata"]["medical"]["plane_map"][slot_name]
pixdims = slot["metadata"]["medical"]["pixdims"]
if primary_plane == "AXIAL":
pixdims = [pixdims[0], pixdims[1]]
elif primary_plane == "CORONAL":
pixdims = [pixdims[0], pixdims[2]]
elif primary_plane == "SAGITTAL":
pixdims = [pixdims[1], pixdims[2]]
slot_pixdim_map[slot_name] = pixdims
else:
slot_affine_map[slot["slot_name"]] = np.array(
slot["metadata"]["medical"]["affine"], dtype=np.float64
)
if slot_pixdim_map:
remote_files_that_require_pixel_to_mm_transform[remote_file.full_path] = (
slot_pixdim_map
)
if slot_affine_map:
remote_files_that_require_legacy_nifti_scaling[remote_file.full_path] = (
slot_affine_map
)

return (
remote_files_that_require_legacy_nifti_scaling,
remote_files_that_require_pixel_to_mm_transform,
)


def _scale_coordinates_by_pixdims(
maybe_parsed_files: List[dt.AnnotationFile],
remote_files_that_require_pixel_to_mm_transform: Dict[Path, Any],
) -> List[dt.AnnotationFile]:
"""
This function scales coordinates by the pixdims of the (x, y) axes.
Parameters
----------
maybe_parsed_files: List[dt.AnnotationFile]
The parsed annotation files
remote_files_that_require_pixel_to_mm_transform: Dict[Path, Any]
The remote files that require a pixel to mm transform
Returns
-------
List[dt.AnnotationFile]
The parsed annotation files with coordinates scaled by the pixdims of the (x, y) axes
"""
if not remote_files_that_require_pixel_to_mm_transform:
return maybe_parsed_files
for file in maybe_parsed_files:
if file.full_path in remote_files_that_require_pixel_to_mm_transform:
for annotation in file.annotations:
slot_name = annotation.slot_names[0]
pixdims = remote_files_that_require_pixel_to_mm_transform[
file.full_path
][slot_name]
if isinstance(annotation, dt.VideoAnnotation):
for frame_idx, frame_annotation in annotation.frames.items():
frame_annotation.scale_coordinates(
float(pixdims[0]), float(pixdims[1])
)
elif isinstance(annotation, dt.Annotation):
annotation.scale_coordinates(float(pixdims[0]), float(pixdims[1]))
return maybe_parsed_files


def slot_is_medical(slot: Dict[str, Any]) -> bool:
return slot.get("metadata", {}).get("medical") is not None


def slot_is_handled_by_monai(slot: Dict[str, Any]) -> bool:
return slot.get("metadata", {}).get("medical", {}).get("handler") == "MONAI"
Loading
Loading