Skip to content

Commit

Permalink
working version e2e no tests in
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Varsavsky authored and shernshiou committed Feb 19, 2024
1 parent 2f0cccd commit 45e3716
Showing 1 changed file with 213 additions and 94 deletions.
307 changes: 213 additions & 94 deletions darwin/exporter/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Volume:
pixdims: List
class_name: str
series_instance_uid: str
from_raster_layer: bool


def export(annotation_files: Iterable[dt.AnnotationFile], output_dir: Path) -> None:
Expand All @@ -58,42 +59,88 @@ def export(annotation_files: Iterable[dt.AnnotationFile], output_dir: Path) -> N
image_id = check_for_error_and_return_imageid(video_annotation, output_dir)
if not isinstance(image_id, str):
continue
output_volumes = build_output_volumes(video_annotation)
polygon_class_names = [
ann.annotation_class.name
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "polygon"
]
output_volumes = build_output_volumes(
video_annotation,
class_names_to_export=polygon_class_names,
from_raster_layer=False,
)
slot_map = {slot.name: slot for slot in video_annotation.slots}
for annotation in video_annotation.annotations:
populate_output_volumes(
annotation, output_dir, slot_map, output_volumes, image_id
)
polygon_annotations = [
ann
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "polygon"
]
populate_output_volumes_from_polygons(
polygon_annotations, slot_map, output_volumes
)
write_output_volume_to_disk(
output_volumes, image_id=image_id, output_dir=output_dir
)
# Check if there are any rasters in the annotation, these are created with a _m suffix
# in addition to those created from polygons.
annotation_types = [
a.annotation_class.annotation_type for a in video_annotation.annotations
]
# Need to map raster layers to SeriesInstanceUIDs
if "raster_layer" in annotation_types and "mask" in annotation_types:
mask_id_to_classname = {
ann.id: ann.annotation_class.name
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "mask"
}
raster_output_volumes = build_output_volumes(
video_annotation,
class_names_to_export=list(mask_id_to_classname.values()),
from_raster_layer=True,
)

# This assumes only one raster_layer annotation. If we allow multiple raster layers per annotation file we need to change this.
raster_layer_annotation = [
ann
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "raster_layer"
][0]
if raster_layer_annotation:
populate_output_volumes_from_raster_layer(
annotation=raster_layer_annotation,
mask_id_to_classname=mask_id_to_classname,
slot_map=slot_map,
output_volumes=raster_output_volumes,
)
write_output_volume_to_disk(
raster_output_volumes, image_id=image_id, output_dir=output_dir
)

def build_output_volumes(video_annotation: dt.AnnotationFile) -> Dict:

def build_output_volumes(
video_annotation: dt.AnnotationFile,
from_raster_layer: bool = False,
class_names_to_export: List[str] = None,
) -> Dict:
"""
This is a function to create the output volumes based on the whole annotation file
Parameters
----------
video_annotation : dt.AnnotationFile
The ``AnnotationFile``\\s to be exported.
from_raster_layer : bool
Whether the output volumes are being built from raster layers or not
class_names_to_export : List[str]
The list of class names to export
Returns
-------
output_volumes: Dict
The output volume built per class
"""
# Builds a map of class to integer
class_map = {}
class_count = 1
for annotation in video_annotation.annotations:
assert isinstance(annotation, dt.VideoAnnotation)
frames = annotation.frames
for frame_idx in frames.keys():
class_name = frames[frame_idx].annotation_class.name
if class_name not in class_map:
class_map[class_name] = class_count
class_count += 1
# Builds a map of class to integer, if its a polygon we use the class name as is
# for the mask annotations we append a suffix _m to ensure backwards compatibility

output_volumes = {}
for slot in video_annotation.slots:
Expand All @@ -113,8 +160,9 @@ def build_output_volumes(video_annotation: dt.AnnotationFile) -> Dict:
pixdims=pixdims,
series_instance_uid=series_instance_uid,
class_name=class_name,
from_raster_layer=from_raster_layer,
)
for class_name in class_map.keys()
for class_name in class_names_to_export
}
return output_volumes

Expand Down Expand Up @@ -192,13 +240,11 @@ def check_for_error_and_return_imageid(
return image_id


def populate_output_volumes(
annotation: Union[dt.Annotation, dt.VideoAnnotation],
output_dir: Union[str, Path],
def populate_output_volumes_from_polygons(
annotations: List[Union[dt.Annotation, dt.VideoAnnotation]],
slot_map: Dict,
output_volumes: Dict,
image_id: str,
) -> None:
) -> Dict:
"""
Exports the given ``AnnotationFile``\\s into nifti format inside of the given
``output_dir``. Deletes everything within ``output_dir/masks`` before writting to it.
Expand All @@ -207,95 +253,152 @@ def populate_output_volumes(
----------
annotation : Union[dt.Annotation, dt.VideoAnnotation]
The Union of these two files used to populate the volume with
output_dir : Path
The folder where the new instance mask files will be.
slot_map : Dict
Dictionary of the different slots within the annotation file
output_volumes : Dict
volumes created from the build_output_volumes file
image_id : str
Returns
-------
volume : dict
Returns the populated volume
Returns dict of volumes with class names as keys and volumes as values
"""
for annotation in annotations:
slot_name = annotation.slot_names[0]
slot = slot_map[slot_name]
series_instance_uid = slot.metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
)
volume = output_volumes.get(series_instance_uid)
frames = annotation.frames

# define the different planes
XYPLANE = 0
XZPLANE = 1
YZPLANE = 2

for frame_idx in frames.keys():
view_idx = get_view_idx_from_slot_name(
slot_name, slot.metadata.get("orientation")
)
if view_idx == XYPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[0],
volume[annotation.annotation_class.name].dims[1],
)
elif view_idx == XZPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[0],
volume[annotation.annotation_class.name].dims[2],
)
elif view_idx == YZPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[1],
volume[annotation.annotation_class.name].dims[2],
)
if "paths" in frames[frame_idx].data:
# Dealing with a complex polygon
polygons = [
shift_polygon_coords(
polygon_path, volume[annotation.annotation_class.name].pixdims
)
for polygon_path in frames[frame_idx].data["paths"]
]
elif "path" in frames[frame_idx].data:
# Dealing with a simple polygon
polygons = shift_polygon_coords(
frames[frame_idx].data["path"],
volume[annotation.annotation_class.name].pixdims,
)
else:
continue
frames[frame_idx].annotation_class.name
im_mask = convert_polygons_to_mask(polygons, height=height, width=width)
volume = output_volumes[series_instance_uid]
if view_idx == 0:
volume[annotation.annotation_class.name].pixel_array[
:, :, frame_idx
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[
:, :, frame_idx
],
)
elif view_idx == 1:
volume[annotation.annotation_class.name].pixel_array[
:, frame_idx, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[
:, frame_idx, :
],
)
elif view_idx == 2:
volume[annotation.annotation_class.name].pixel_array[
frame_idx, :, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[
frame_idx, :, :
],
)
return volume


def populate_output_volumes_from_raster_layer(
annotation: dt.Annotation,
mask_id_to_classname: Dict,
slot_map: Dict,
output_volumes: Dict,
) -> Dict:
"""
Populates the output volumes provided with the raster layer annotations
Parameters
----------
annotation : Union[dt.Annotation, dt.VideoAnnotation]
The Union of these two files used to populate the volume with
mask_id_to_classname : Dict
Map from mask id to class names
slot_map: Dict
Dictionary of the different slots within the annotation file
output_volumes : Dict
volumes created from the build_output_volumes file
Returns
-------
volume : dict
Returns dict of volumes with class names as keys and volumes as values
"""
slot_name = annotation.slot_names[0]
slot = slot_map[slot_name]
series_instance_uid = slot.metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
)
volume = output_volumes.get(series_instance_uid)
frames = annotation.frames
frame_new = {}

# define the different planes
XYPLANE = 0
XZPLANE = 1
YZPLANE = 2

for frame_idx in frames.keys():
frame_new[frame_idx] = frames
view_idx = get_view_idx_from_slot_name(
slot_name, slot.metadata.get("orientation")
mask_annotation_ids_mapping = {}
multilabel_volume = np.zeros(slot.metadata["shape"][1:])
for frame_idx in sorted(frames.keys()):
frame_idx = int(frame_idx)
frame_data = annotation.frames[frame_idx]
dense_rle = frame_data.data["dense_rle"]
mask_2d = decode_rle(dense_rle, slot.width, slot.height)
multilabel_volume[:, :, frame_idx] = mask_2d.T
mask_annotation_ids_mapping.update(
frame_data.data["mask_annotation_ids_mapping"]
)
if view_idx == XYPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[0],
volume[annotation.annotation_class.name].dims[1],
)
elif view_idx == XZPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[0],
volume[annotation.annotation_class.name].dims[2],
)
elif view_idx == YZPLANE:
height, width = (
volume[annotation.annotation_class.name].dims[1],
volume[annotation.annotation_class.name].dims[2],
)
if "paths" in frames[frame_idx].data:
# Dealing with a complex polygon
polygons = [
shift_polygon_coords(
polygon_path, volume[annotation.annotation_class.name].pixdims
)
for polygon_path in frames[frame_idx].data["paths"]
]
elif "path" in frames[frame_idx].data:
# Dealing with a simple polygon
polygons = shift_polygon_coords(
frames[frame_idx].data["path"],
volume[annotation.annotation_class.name].pixdims,
)
else:
continue
frames[frame_idx].annotation_class.name
im_mask = convert_polygons_to_mask(polygons, height=height, width=width)
# Now we convert this multilabel array into this dictionary of output volumes
# in order to re-use the write_output_volume_to_disk function.
for mask_id, class_name in mask_id_to_classname.items():
volume = output_volumes[series_instance_uid]
if view_idx == 0:
volume[annotation.annotation_class.name].pixel_array[
:, :, frame_idx
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[:, :, frame_idx],
)
elif view_idx == 1:
volume[annotation.annotation_class.name].pixel_array[
:, frame_idx, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[:, frame_idx, :],
)
elif view_idx == 2:
volume[annotation.annotation_class.name].pixel_array[
frame_idx, :, :
] = np.logical_or(
im_mask,
volume[annotation.annotation_class.name].pixel_array[frame_idx, :, :],
)
mask_int_id = mask_annotation_ids_mapping[mask_id]
# We want to create a binary mask for each class
volume[class_name].pixel_array = np.where(
multilabel_volume == int(mask_int_id), 1, volume[class_name].pixel_array
)
return volume


def write_output_volume_to_disk(
Expand Down Expand Up @@ -326,7 +429,10 @@ def unnest_dict_to_list(d: Dict) -> List:
img_ornt, orig_ornt
) # Get transform from RAS to current affine
img = img.as_reoriented(from_canonical)
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}.nii.gz"
if volume.from_raster_layer:
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}_m.nii.gz"
else:
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}.nii.gz"
if not output_path.parent.exists():
output_path.parent.mkdir(parents=True)
nib.save(img=img, filename=output_path)
Expand Down Expand Up @@ -404,3 +510,16 @@ def create_error_message_json(
native_json.dump({"error": error_message}, f)

return False


def decode_rle(rle_data, width, height):
"""Decodes run-length encoding (RLE) data into a mask array."""
total_pixels = width * height
mask = np.zeros(total_pixels, dtype=np.uint8)
pos = 0
for i in range(0, len(rle_data), 2):
value = rle_data[i]
length = rle_data[i + 1]
mask[pos : pos + length] = value
pos += length
return mask.reshape(height, width)

0 comments on commit 45e3716

Please sign in to comment.