Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory issue on Compute Canada #43

Merged
merged 11 commits into from
Feb 18, 2025
20 changes: 20 additions & 0 deletions linumpy/utils/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding:utf8 -*-
import multiprocessing


DEFAULT_N_CPUS = multiprocessing.cpu_count() - 1


def parse_processes_arg(n_processes):
if n_processes is None or n_processes <= 0:
return DEFAULT_N_CPUS
elif n_processes > DEFAULT_N_CPUS:
return DEFAULT_N_CPUS
return n_processes


def add_processes_arg(parser):
a = parser.add_argument('--n_processes', type=int, default=1,
help='Number of processes to use. -1 to use \n'
'all cores [%(default)s].')
return a
38 changes: 17 additions & 21 deletions scripts/linum_create_mosaic_grid_3d.py
Original file line number Diff line number Diff line change
@@ -5,9 +5,7 @@

import argparse
import multiprocessing
import shutil
from pathlib import Path
from os.path import join as pjoin

import numpy as np
import dask.array as da
@@ -19,8 +17,7 @@
from linumpy.microscope.oct import OCT
from tqdm.auto import tqdm

# Default number of processes is the number of cores minus 1
DEFAULT_N_CPUS = multiprocessing.cpu_count() - 1
from linumpy.utils.io import parse_processes_arg, add_processes_arg


def _build_arg_parser():
@@ -38,9 +35,10 @@ def _build_arg_parser():
help="Keep the galvo return signal (default=%(default)s)")
p.add_argument('--n_levels', type=int, default=5,
help='Number of levels in pyramid representation.')
p.add_argument('--n_processes', type=int,
help=f'Number of processes to launch [{DEFAULT_N_CPUS}].')

p.add_argument('--zarr_root',
help='Path to parent directory under which the zarr'
' temporary directory will be created [/tmp/].')
add_processes_arg(p)
return p


@@ -73,7 +71,7 @@ def process_tile(params: dict):
cmin = (my - my_min) * vol.shape[2]
rmax = rmin + vol.shape[1]
cmax = cmin + vol.shape[2]
mosaic[:, rmin:rmax, cmin:cmax] = vol
mosaic[0:tile_size[0], rmin:rmax, cmin:cmax] = vol


def main():
@@ -86,7 +84,7 @@ def main():
z = args.slice
output_resolution = args.resolution
crop = not args.keep_galvo_return
n_cpus = DEFAULT_N_CPUS if args.n_processes is None else args.n_processes
n_cpus = parse_processes_arg(args.n_processes)

# Analyze the tiles
tiles, tiles_pos = reconstruction.get_tiles_ids(tiles_directory, z=z)
@@ -115,11 +113,9 @@ def main():
mosaic_shape = [tile_size[0], n_mx * tile_size[1], n_my * tile_size[2]]

# Create the zarr persistent array
zarr_store = zarr.TempStore(suffix=".zarr")
process_sync_file = zarr_store.path.replace(".zarr", ".sync")
synchronizer = zarr.ProcessSynchronizer(process_sync_file)
mosaic = zarr.open(zarr_store, mode="w", shape=mosaic_shape, dtype=np.float32,
chunks=tile_size, synchronizer=synchronizer)
zarr_store = zarr.TempStore(dir=args.zarr_root, suffix=".zarr")
mosaic = zarr.open(zarr_store, mode="w", shape=mosaic_shape,
dtype=np.float32, chunks=tile_size)

# Create a params dictionary for every tile
params = []
@@ -133,19 +129,19 @@ def main():
"mxy_min": (mx_min, my_min)
})

# Process the tiles in parallel
with multiprocessing.Pool(n_cpus) as pool:
results = tqdm(pool.imap(process_tile, params), total=len(params))
tuple(results)
if n_cpus > 1: # process in parallel
with multiprocessing.Pool(n_cpus) as pool:
results = tqdm(pool.imap(process_tile, params), total=len(params))
tuple(results)
else: # Process the tiles sequentially
for p in tqdm(params):
process_tile(p)

# Convert to ome-zarr
mosaic_dask = da.from_zarr(mosaic)
save_zarr(mosaic_dask, args.output_zarr, scales=output_resolution,
chunks=tile_size, n_levels=args.n_levels)

# Remove the process sync file
shutil.rmtree(process_sync_file)


if __name__ == "__main__":
main()
37 changes: 21 additions & 16 deletions scripts/linum_fix_illumination_3d.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Detect and fix the lateral illumination inhomogeneities for each 3D tiles of a mosaic grid"""
"""
Detect and fix the lateral illumination inhomogeneities for each
3D tiles of a mosaic grid.
"""

from os import environ

environ["OMP_NUM_THREADS"] = "1"

import argparse
import multiprocessing
import shutil
import tempfile
from pathlib import Path
from os.path import join as pjoin

import dask.array as da

@@ -23,6 +24,7 @@
import numpy as np
from pqdm.processes import pqdm
from linumpy.io.zarr import save_zarr, read_omezarr
from linumpy.utils.io import add_processes_arg, parse_processes_arg

# TODO: add option to export the flatfields and darkfields

@@ -33,7 +35,7 @@ def _build_arg_parser():
help="Full path to the input zarr file")
p.add_argument("output_zarr",
help="Full path to the output zarr file")

add_processes_arg(p)
return p


@@ -43,7 +45,8 @@ def process_tile(params: dict):
z = params["z"]
tile_shape = params["tile_shape"]
vol = io.v3.imread(str(file))
file_output = Path(file).parent / file.name.replace(".tiff", "_corrected.tiff")
file_output = Path(file).parent / file.name.replace(".tiff",
"_corrected.tiff")

# Get the number of tiles
nx = vol.shape[0] // tile_shape[0]
@@ -94,12 +97,13 @@ def main():
# Parameters
input_zarr = Path(args.input_zarr)
output_zarr = Path(args.output_zarr)
n_cpus = multiprocessing.cpu_count() - 1
n_cpus = parse_processes_arg(args.n_processes)

# Prepare the data for the parallel processing
vol, resolution = read_omezarr(input_zarr, level=0)
n_slices = vol.shape[0]
tmp_dir = tempfile.TemporaryDirectory(suffix="_linum_fix_illumination_3d_slices", dir=output_zarr.parent)
tmp_dir = tempfile.TemporaryDirectory(
suffix="_linum_fix_illumination_3d_slices", dir=output_zarr.parent)
params_list = []
for z in tqdm(range(n_slices), "Preprocessing slices"):
slice_file = Path(tmp_dir.name) / f"slice_{z:03d}.tiff"
@@ -112,15 +116,19 @@ def main():
}
params_list.append(params)

# Process the tiles in parallel
corrected_files = pqdm(params_list, process_tile, n_jobs=n_cpus, desc="Processing tiles")
if n_cpus > 1:
# Process the tiles in parallel
corrected_files = pqdm(params_list, process_tile, n_jobs=n_cpus,
desc="Processing tiles")
else: # process sequentially
corrected_files = []
for param in tqdm(params_list):
corrected_files.append(process_tile(param))

# Retrieve the results and fix the volume
temp_store = zarr.TempStore(suffix=".zarr")
process_sync_file = temp_store.path.replace(".zarr", ".sync")
synchronizer = zarr.ProcessSynchronizer(process_sync_file)
vol_output = zarr.open(temp_store, mode="w", shape=vol.shape, dtype=vol.dtype,
chunks=vol.chunks, synchronizer=synchronizer)
vol_output = zarr.open(temp_store, mode="w", shape=vol.shape,
dtype=vol.dtype, chunks=vol.chunks)

# TODO: Rebuilding volume step could be faster
for z, f in tqdm(corrected_files, "Rebuilding volume"):
@@ -131,9 +139,6 @@ def main():
save_zarr(out_dask, output_zarr, scales=resolution,
chunks=vol.chunks)

# Remove the process sync file
shutil.rmtree(process_sync_file)

# Remove the temporary slice files used by the parallel processes
tmp_dir.cleanup()

10 changes: 5 additions & 5 deletions workflows/workflow_soct_3d_slice_reconstruction.config
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
nextflowVersion = '<= 23.10'
singularity{
autoMounts = true
enabled = true
autoMounts = true
enabled = true
}

process {
scratch=true
scratch = true
errorStrategy = { task.attempt <= 3 ? 'retry' : 'ignore' }
maxRetries = 3
stageInMode='copy'
stageInMode='symlink'
stageOutMode='rsync'
afterScript='sleep 1'
}
}
28 changes: 12 additions & 16 deletions workflows/workflow_soct_3d_slice_reconstruction.nf
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
nextflow.enable.dsl = 2

// Workflow Description
// Creates 3D mosaic grid tiff at an isotropic resolution of 25um from raw data set tiles
// Creates a 3D volume from raw S-OCT tiles for a given slice index
// Input: Directory containing raw data set tiles
// Output: 3D mosaic grid tiff at an isotropic resolution of 25um
// Output: 3D reconstruction for a given slice index

// Parameters
params.inputDir = "/Users/jlefebvre/Downloads/tiles_lowestImmersion";
params.outputDir = "/Users/jlefebvre/Downloads/tiles_lowestImmersion_reconstruction";
params.inputDir = "";
params.outputDir = "";
params.resolution = 10; // Resolution of the reconstruction in micron/pixel
params.slice = 28; // Slice to process
params.processes = 8; // Maximum number of python processes per nextflow process
params.slice = 0; // Slice to process
params.processes = 1; // Maximum number of python processes per nextflow process

// Processes
process create_mosaic_grid {
input:
path inputDir
output:
path "*.ome.zarr"
//publishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_create_mosaic_grid_3d.py ${inputDir} mosaic_grid_3d_${params.resolution}um.ome.zarr --slice ${params.slice} --resolution ${params.resolution} --n_processes ${params.processes}
@@ -31,10 +30,9 @@ process fix_focal_curvature {
path mosaic_grid
output:
path "*_focalFix.ome.zarr"
//publishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_detect_focalCurvature.py ${mosaic_grid} mosaic_grid_3d_${params.resolution}um_focalFix.ome.zarr
linum_detect_focal_curvature.py ${mosaic_grid} mosaic_grid_3d_${params.resolution}um_focalFix.ome.zarr
"""
}

@@ -43,10 +41,9 @@ process fix_illumination {
path mosaic_grid
output:
path "*_illuminationFix.ome.zarr"
//publishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_fix_illumination_3d.py ${mosaic_grid} mosaic_grid_3d_${params.resolution}um_illuminationFix.ome.zarr
linum_fix_illumination_3d.py ${mosaic_grid} mosaic_grid_3d_${params.resolution}um_illuminationFix.ome.zarr --n_processes ${params.processes}
"""
}

@@ -55,7 +52,6 @@ process generate_aip {
path mosaic_grid
output:
path "aip.ome.zarr"
//publishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_aip.py ${mosaic_grid} aip.ome.zarr
@@ -67,7 +63,6 @@ process estimate_xy_transformation {
path aip
output:
path "transform_xy.npy"
//ublishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_estimate_transform.py ${aip} transform_xy.npy
@@ -118,7 +113,8 @@ process compute_attenuation_bias {
publishDir path: "${params.outputDir}", mode: 'copy'
script:
"""
linum_compute_attenuationBiasField.py ${slice_attn} "slice_z${params.slice}_${params.resolution}um_bias.ome.zarr"
# NOTE: --isInCM argument is required, else we get data overflow
linum_compute_attenuation_bias_field.py ${slice_attn} "slice_z${params.slice}_${params.resolution}um_bias.ome.zarr" --isInCM
"""
}

@@ -164,4 +160,4 @@ workflow{

// Compensate attenuation
compensate_attenuation(compensate_psf.out.combine(compute_attenuation_bias.out))
}
}