diff --git a/src/totalsegmri/utils/generate_laplace_images.py b/src/totalsegmri/utils/generate_laplace_images.py new file mode 100644 index 0000000..24344ab --- /dev/null +++ b/src/totalsegmri/utils/generate_laplace_images.py @@ -0,0 +1,166 @@ +import sys, argparse, textwrap +import multiprocessing as mp +from functools import partial +from tqdm.contrib.concurrent import process_map +from scipy.ndimage import laplace +from pathlib import Path +import nibabel as nib +import numpy as np +import warnings + +warnings.filterwarnings("ignore") + + +def main(): + + # Description and arguments + parser = argparse.ArgumentParser( + description=textwrap.dedent(f''' + This script processes NIfTI (Neuroimaging Informatics Technology Initiative) image and generate laplace trasformed images.''' + ), + epilog=textwrap.dedent(''' + Examples: + generate_laplace_images -i images -o images + For BIDS: + generate_laplace_images -i . -o . --image-suffix "" --output-image-suffix "_laplace" -d "sub-" -u "anat" + '''), + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + '--images-dir', '-i', type=Path, required=True, + help='The folder where input NIfTI images files are located (required).' + ) + parser.add_argument( + '--output-images-dir', '-o', type=Path, required=True, + help='The folder where output augmented images will be saved with _a1, _a2 etc. suffixes (required).' + ) + parser.add_argument( + '--subject-dir', '-d', type=str, default=None, nargs='?', const='', + help=textwrap.dedent(''' + Is every subject has its oen direcrory. + If this argument will be provided without value it will look for any directory in the segmentation directory. + If value also provided it will be used as a prefix to subject directory, defaults to False (no subjet directory). + '''), + ) + parser.add_argument( + '--subject-subdir', '-u', type=str, default='', + help='Subfolder inside subject folder containing masks, defaults to no subfolder.' + ) + parser.add_argument( + '--prefix', '-p', type=str, default='', + help='File prefix to work on.' + ) + parser.add_argument( + '--image-suffix', type=str, default='_0000', + help='Image suffix, defaults to "_0000".' + ) + parser.add_argument( + '--output-image-suffix', type=str, default='_0000', + help='Image suffix for output, defaults to "_0000".' + ) + parser.add_argument( + '--max-workers', '-w', type=int, default=mp.cpu_count(), + help='Max worker to run in parallel proccess, defaults to multiprocessing.cpu_count().' + ) + parser.add_argument( + '--verbose', '-v', type=int, default=1, choices=[0, 1], + help='verbose. 0: Display only errors/warnings, 1: Errors/warnings + info messages. Default is 1.' + ) + + # Parse the command-line arguments + try: + args = parser.parse_args() + except BaseException as e: + sys.exit() + + # Get the command-line argument values + images_path = args.images_dir + output_images_path = args.output_images_dir + subject_dir = args.subject_dir + subject_subdir = args.subject_subdir + prefix = args.prefix + image_suffix = args.image_suffix + output_image_suffix = args.output_image_suffix + max_workers = args.max_workers + verbose = args.verbose + + # Print the argument values if verbose is enabled + if verbose: + print(textwrap.dedent(f''' + Running {Path(__file__).stem} with the following params: + images_path = "{images_path}" + output_images_path = "{output_images_path}" + subject_dir = "{subject_dir}" + subject_subdir = "{subject_subdir}" + prefix = "{prefix}" + image_suffix = "{image_suffix}" + output_image_suffix = "{output_image_suffix}" + max_workers = {max_workers} + verbose = {verbose} + ''')) + + glob_pattern = "" + if subject_dir is not None: + glob_pattern += f"{subject_dir}*/" + if len(subject_subdir) > 0: + glob_pattern += f"{subject_subdir}/" + glob_pattern += f'{prefix}*{image_suffix}.nii.gz' + + # Process the NIfTI image and segmentation files + images_path_list = list(images_path.glob(glob_pattern)) + + # Create a partially-applied function with the extra arguments + partial_generate_laplace_images = partial( + generate_laplace_images, + images_path=images_path, + output_images_path=output_images_path, + image_suffix=image_suffix, + output_image_suffix=output_image_suffix, + ) + + with mp.Pool() as pool: + process_map(partial_generate_laplace_images, images_path_list, max_workers=max_workers) + + +def generate_laplace_images( + image_path, + images_path, + output_images_path, + image_suffix, + output_image_suffix, + ): + + image = nib.load(image_path) + + # Get the data type of the image + image_data_dtype = np.asanyarray(image.dataobj).dtype + image_header_dtype = getattr(np, image.get_data_dtype().name) + + image_data = image.get_fdata().astype(np.float64) + + output_image_data = laplace(image_data) + + # Rescale the image to the output data type if necessary + # code from https://github.com/spinalcordtoolbox/spinalcordtoolbox/blob/6.3/spinalcordtoolbox/image.py#L1217 + if "int" in np.dtype(image_header_dtype).name: + # get min/max from output type + min_out = np.iinfo(image_header_dtype).min + max_out = np.iinfo(image_header_dtype).max + min_in = output_image_data.min() + max_in = output_image_data.max() + if (min_in < min_out) or (max_in > max_out): + data_rescaled = output_image_data * (max_out - min_out) / (max_in - min_in) + output_image_data = data_rescaled - (data_rescaled.min() - min_out) + + output_image_path = output_images_path / image_path.relative_to(images_path).parent / image_path.name.replace(f'{image_suffix}.nii.gz', f'{output_image_suffix}.nii.gz') + + # Make sure output directory exists and save with original header image dtype + output_image_path.parent.mkdir(parents=True, exist_ok=True) + output_image = nib.Nifti1Image(output_image_data.astype(image_header_dtype), image.affine, image.header) + output_image.set_qform(image.affine) + output_image.set_sform(image.affine) + output_image.set_data_dtype(image_header_dtype) + nib.save(output_image, output_image_path) + +if __name__ == '__main__': + main() \ No newline at end of file