Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IO-2040][external] Fix for NifTI exports that include filenames with dots #705

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions darwin/exporter/formats/darwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ def build_image_annotation(annotation_file: dt.AnnotationFile) -> Dict[str, Any]
print(annotations)
for annotation in annotation_file.annotations:
payload = {
annotation.annotation_class.annotation_type: _build_annotation_data(annotation),
annotation.annotation_class.annotation_type: _build_annotation_data(
annotation
),
"name": annotation.annotation_class.name,
}

if (
annotation.annotation_class.annotation_type == "complex_polygon" or annotation.annotation_class.annotation_type == "polygon"
annotation.annotation_class.annotation_type == "complex_polygon"
or annotation.annotation_class.annotation_type == "polygon"
) and "bounding_box" in annotation.data:
payload["bounding_box"] = annotation.data["bounding_box"]

Expand Down Expand Up @@ -83,7 +86,9 @@ def build_annotation_data(annotation: dt.Annotation) -> Dict[str, Any]:
return {"path": annotation.data["paths"]}

if annotation.annotation_class.annotation_type == "polygon":
return dict(filter(lambda item: item[0] != "bounding_box", annotation.data.items()))
return dict(
filter(lambda item: item[0] != "bounding_box", annotation.data.items())
)

return dict(annotation.data)

Expand All @@ -93,6 +98,8 @@ def _build_annotation_data(annotation: dt.Annotation) -> Dict[str, Any]:
return {"path": annotation.data["paths"]}

if annotation.annotation_class.annotation_type == "polygon":
return dict(filter(lambda item: item[0] != "bounding_box", annotation.data.items()))
return dict(
filter(lambda item: item[0] != "bounding_box", annotation.data.items())
)

return dict(annotation.data)
97 changes: 68 additions & 29 deletions darwin/exporter/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ def export(annotation_files: Iterable[dt.AnnotationFile], output_dir: Path) -> N
output_volumes = build_output_volumes(video_annotation)
slot_map = {slot.name: slot for slot in video_annotation.slots}
for annotation in video_annotation.annotations:
populate_output_volumes(annotation, output_dir, slot_map, output_volumes, image_id)
write_output_volume_to_disk(output_volumes, image_id=image_id, output_dir=output_dir)
populate_output_volumes(
annotation, output_dir, slot_map, output_volumes, image_id
)
write_output_volume_to_disk(
output_volumes, image_id=image_id, output_dir=output_dir
)


def build_output_volumes(video_annotation: dt.AnnotationFile) -> Dict:
Expand Down Expand Up @@ -97,7 +101,9 @@ def build_output_volumes(video_annotation: dt.AnnotationFile) -> Dict:
for slot in video_annotation.slots:
slot_metadata = slot.metadata
assert slot_metadata is not None
series_instance_uid = slot_metadata.get("SeriesInstanceUID", "SeriesIntanceUIDNotProvided")
series_instance_uid = slot_metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
NooshinGhavami marked this conversation as resolved.
Show resolved Hide resolved
)
# Builds output volumes per class
volume_dims, pixdims, affine, original_affine = process_metadata(slot.metadata)
output_volumes[series_instance_uid] = {
Expand All @@ -115,7 +121,9 @@ def build_output_volumes(video_annotation: dt.AnnotationFile) -> Dict:
return output_volumes


def check_for_error_and_return_imageid(video_annotation: dt.AnnotationFile, output_dir: Path):
def check_for_error_and_return_imageid(
video_annotation: dt.AnnotationFile, output_dir: Path
):
"""
Given the video_annotation file and the output directory, checks for a range of errors and
returns messages accordingly.
Expand All @@ -135,16 +143,19 @@ def check_for_error_and_return_imageid(video_annotation: dt.AnnotationFile, outp

output_volumes = None
filename = Path(video_annotation.filename)
suffixes = filename.suffixes
if len(suffixes) > 2:
return create_error_message_json(
"Misconfigured filename, contains too many suffixes", output_dir, str(filename)
)
elif len(suffixes) == 2:
try:
suffixes = filename.suffixes[-2:]
except IndexError:
suffixes = filename.suffixes
if len(suffixes) == 2:
NooshinGhavami marked this conversation as resolved.
Show resolved Hide resolved
if suffixes[0] == ".nii" and suffixes[1] == ".gz":
image_id = str(filename).strip("".join(suffixes))
image_id = str(filename).rstrip("".join(suffixes))
NooshinGhavami marked this conversation as resolved.
Show resolved Hide resolved
else:
return create_error_message_json("Two suffixes found but not ending in .nii.gz", output_dir, str(filename))
return create_error_message_json(
"Two suffixes found but not ending in .nii.gz",
output_dir,
str(filename),
)
elif len(suffixes) == 1:
if suffixes[0] == ".nii" or suffixes[0] == ".dcm":
image_id = filename.stem
Expand All @@ -162,14 +173,18 @@ def check_for_error_and_return_imageid(video_annotation: dt.AnnotationFile, outp
str(filename),
)
if video_annotation is None:
return create_error_message_json("video_annotation not found", output_dir, image_id)
return create_error_message_json(
"video_annotation not found", output_dir, image_id
)

for slot in video_annotation.slots:
# Pick the first slot to take the metadata from. We assume that all slots have the same metadata.
metadata = slot.metadata
if metadata is None:
return create_error_message_json(
f"No metadata found for {str(filename)}, are you sure this is medical data?", output_dir, image_id
f"No metadata found for {str(filename)}, are you sure this is medical data?",
output_dir,
image_id,
)

volume_dims, pixdim, affine, _ = process_metadata(metadata)
Expand Down Expand Up @@ -214,7 +229,9 @@ def populate_output_volumes(

slot_name = annotation.slot_names[0]
slot = slot_map[slot_name]
series_instance_uid = slot.metadata.get("SeriesInstanceUID", "SeriesIntanceUIDNotProvided")
series_instance_uid = slot.metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
)
volume = output_volumes.get(series_instance_uid)
frames = annotation.frames
frame_new = {}
Expand All @@ -226,7 +243,9 @@ def populate_output_volumes(

for frame_idx in frames.keys():
frame_new[frame_idx] = frames
view_idx = get_view_idx_from_slot_name(slot_name, slot.metadata.get("orientation"))
view_idx = get_view_idx_from_slot_name(
slot_name, slot.metadata.get("orientation")
)
if view_idx == XYPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[0],
Expand All @@ -245,34 +264,48 @@ def populate_output_volumes(
if "paths" in frames[frame_idx].data:
# Dealing with a complex polygon
polygons = [
shift_polygon_coords(polygon_path, volume[annotation.annotation_class.name].pixdims)
shift_polygon_coords(
polygon_path, volume[annotation.annotation_class.name].pixdims
)
for polygon_path in frames[frame_idx].data["paths"]
]
elif "path" in frames[frame_idx].data:
# Dealing with a simple polygon
polygons = shift_polygon_coords(
frames[frame_idx].data["path"], volume[annotation.annotation_class.name].pixdims
frames[frame_idx].data["path"],
volume[annotation.annotation_class.name].pixdims,
)
else:
continue
class_name = frames[frame_idx].annotation_class.name
im_mask = convert_polygons_to_mask(polygons, height=height, width=width)
volume = output_volumes[series_instance_uid]
if view_idx == 0:
volume[annotation.annotation_class.name].pixel_array[:, :, frame_idx] = np.logical_or(
im_mask, volume[annotation.annotation_class.name].pixel_array[:, :, frame_idx]
volume[annotation.annotation_class.name].pixel_array[
:, :, frame_idx
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[:, :, frame_idx],
)
elif view_idx == 1:
volume[annotation.annotation_class.name].pixel_array[:, frame_idx, :] = np.logical_or(
im_mask, volume[annotation.annotation_class.name].pixel_array[:, frame_idx, :]
volume[annotation.annotation_class.name].pixel_array[
:, frame_idx, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[:, frame_idx, :],
)
elif view_idx == 2:
volume[annotation.annotation_class.name].pixel_array[frame_idx, :, :] = np.logical_or(
im_mask, volume[annotation.annotation_class.name].pixel_array[frame_idx, :, :]
volume[annotation.annotation_class.name].pixel_array[
frame_idx, :, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[frame_idx, :, :],
)


def write_output_volume_to_disk(output_volumes: Dict, image_id: str, output_dir: Union[str, Path]) -> None:
def write_output_volume_to_disk(
output_volumes: Dict, image_id: str, output_dir: Union[str, Path]
) -> None:
# volumes are the values of this nested dict
def unnest_dict_to_list(d: Dict) -> List:
result = []
Expand All @@ -290,11 +323,15 @@ def unnest_dict_to_list(d: Dict) -> List:
affine=volume.affine,
)
if volume.original_affine is not None:
orig_ornt = io_orientation(volume.original_affine) # Get orientation of current affine
orig_ornt = io_orientation(
volume.original_affine
) # Get orientation of current affine
img_ornt = io_orientation(volume.affine) # Get orientation of RAS affine
from_canonical = ornt_transform(img_ornt, orig_ornt) # Get transform from RAS to current affine
from_canonical = ornt_transform(
img_ornt, orig_ornt
) # Get transform from RAS to current affine
img = img.as_reoriented(from_canonical)
output_path = Path(output_dir) / f"{image_id}_{volume.series_instance_uid}_{volume.class_name}.nii.gz"
NooshinGhavami marked this conversation as resolved.
Show resolved Hide resolved
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}.nii.gz"
if not output_path.parent.exists():
output_path.parent.mkdir(parents=True)
nib.save(img=img, filename=output_path)
Expand Down Expand Up @@ -362,7 +399,9 @@ def process_affine(affine):
return affine


def create_error_message_json(error_message: str, output_dir: Union[str, Path], image_id: str) -> bool:
def create_error_message_json(
error_message: str, output_dir: Union[str, Path], image_id: str
) -> bool:
output_path = Path(output_dir) / f"{image_id}_error.json"
if not output_path.parent.exists():
output_path.parent.mkdir(parents=True)
Expand Down