diff --git a/openptv_python/image_processing.py b/openptv_python/image_processing.py index 4875f7c..ca6d824 100644 --- a/openptv_python/image_processing.py +++ b/openptv_python/image_processing.py @@ -1,8 +1,10 @@ """Image processing functions.""" import copy +from functools import partial import numpy as np -from numba import njit + +# from numba import njit from scipy import ndimage from scipy.ndimage import uniform_filter @@ -10,7 +12,7 @@ filter_t = np.zeros((3, 3), dtype=float) -@njit +# @njit def filter_3(img, kernel=None) -> np.ndarray: """Apply a 3x3 filter to an image.""" if kernel is None: # default is a low pass @@ -18,7 +20,7 @@ def filter_3(img, kernel=None) -> np.ndarray: filtered_img = ndimage.convolve(img, kernel) return filtered_img -@njit +# @njit def lowpass_3(img: np.ndarray) -> np.ndarray: """Lowpass filter of 3x3.""" # Define the 3x3 lowpass filter kernel @@ -29,7 +31,7 @@ def lowpass_3(img: np.ndarray) -> np.ndarray: return img_lp -@njit +# @njit def fast_box_blur(filt_span: int, src: np.ndarray, cpar: ControlPar) -> np.ndarray: """Fast box blur.""" n = 2 * filt_span + 1 @@ -57,7 +59,7 @@ def fast_box_blur(filt_span: int, src: np.ndarray, cpar: ControlPar) -> np.ndarr # new_img = map_coordinates(img, [coords_y, coords_x], mode="constant", cval=0) # return new_img -@njit +# @njit def subtract_img(img1: np.ndarray, img2: np.ndarray, img_new: np.ndarray) -> None: """ Subtract img2 from img1 and store the result in img_new. @@ -69,7 +71,7 @@ def subtract_img(img1: np.ndarray, img2: np.ndarray, img_new: np.ndarray) -> Non """ img_new[:] = ndimage.maximum(img1 - img2, 0) -@njit +# @njit def subtract_mask(img: np.ndarray, img_mask: np.ndarray): """Subtract mask from image.""" img_new = np.where(img_mask == 0, 0, img) @@ -136,6 +138,10 @@ def prepare_image( return img_hp +def preprocess_image(): + """Decorate prepare_image with default parameters.""" + return partial(prepare_image, dim_lp=1, filter_hp=0, filter_file="") + # def preprocess_image( # input_img: np.ndarray, diff --git a/openptv_python/multimed.py b/openptv_python/multimed.py index 089ca63..502adce 100644 --- a/openptv_python/multimed.py +++ b/openptv_python/multimed.py @@ -2,8 +2,8 @@ from typing import List, Tuple import numpy as np -from numba import njit +# from numba import njit from .calibration import Calibration, Exterior, Glass from .parameters import ( ControlPar, @@ -57,7 +57,7 @@ def multimed_r_nlay(cal: Calibration, mm: MultimediaPar, pos: np.ndarray) -> flo return mmf -@njit +# @njit def fast_multimed_r_nlay( nlay:int, n1: float, @@ -203,7 +203,7 @@ def back_trans_point( return pos -@njit +# @njit def move_along_ray(glob_z: float, vertex: np.ndarray, direct: np.ndarray) -> np.ndarray: """Move along the ray to the global z plane. @@ -328,7 +328,7 @@ def get_mmf_from_mmlut(cal: Calibration, pos: np.ndarray) -> float: return fast_get_mmf_from_mmlut(rw, origin, data, nz, nr, pos) -@njit +# @njit def fast_get_mmf_from_mmlut( rw: int, origin: np.ndarray, diff --git a/openptv_python/parameters.py b/openptv_python/parameters.py index c3d0c2f..344f724 100644 --- a/openptv_python/parameters.py +++ b/openptv_python/parameters.py @@ -653,10 +653,18 @@ def from_file(cls, filename: str): return ret + def get_grey_thresholds(self): + """Return the grey thresholds.""" + return self.gvthresh + + def get_pixel_count_bounds(self): + """Return the pixel count bounds.""" + return (self.nnmin, self.nnmax) def read_target_par(filename: str) -> TargetPar: """Read target parameters from file and returns target_par object.""" - return TargetPar().from_file(filename) + tpar = TargetPar() + return tpar.from_file(filename) def compare_target_par(targ1: TargetPar, targ2: TargetPar) -> bool: """Compare two target_par objects.""" @@ -799,6 +807,12 @@ def from_file(cls, file_path: str): combine_flag = bool(int(file.readline().strip())) return cls(examine_flag, combine_flag) +def read_examine_par(file_path: str) -> ExaminePar: + """Read from examine.par file.""" + with open(file_path, 'r', encoding="utf-8") as file: + examine_flag = bool(int(file.readline().strip())) + combine_flag = bool(int(file.readline().strip())) + return ExaminePar(examine_flag, combine_flag) @dataclass class PftVersionPar(Parameters): diff --git a/openptv_python/segmentation.py b/openptv_python/segmentation.py index a33edd1..5e50e06 100644 --- a/openptv_python/segmentation.py +++ b/openptv_python/segmentation.py @@ -532,7 +532,7 @@ def peak_fit_new( peaks = [] for i in range(num_objects): indices = np.argwhere(labeled == i + 1) - coordinates = [center_of_mass(mask[indices[:, 0], indices[:, 1]])[::-1]] + coordinates = np.array([center_of_mass(mask[indices[:, 0], indices[:, 1]])[::-1]]) intensity = smoothed[indices[:, 0], indices[:, 1]].max() x, y = np.mean(coordinates, axis=0) peaks.append(Peak(int(round(x)), int(round(y)), intensity)) diff --git a/openptv_python/track.py b/openptv_python/track.py index 842b05a..258c876 100644 --- a/openptv_python/track.py +++ b/openptv_python/track.py @@ -10,6 +10,7 @@ COORD_UNUSED, CORRES_NONE, MAX_CANDS, + MAX_TARGETS, NEXT_NONE, POS_INF, PREV_NONE, @@ -20,16 +21,16 @@ ) from .imgcoord import img_coord from .orientation import point_position -from .parameters import ControlPar, TrackPar +from .parameters import ControlPar, SequencePar, TrackPar, VolumePar from .tracking_frame_buf import Corres, Frame, Pathinfo, Target -from .tracking_run import TrackingRun +from .tracking_run import TrackingRun, tr_new from .trafo import dist_to_flat, metric_to_pixel, pixel_to_metric from .vec_utils import vec_copy, vec_diff_norm, vec_subt default_naming = { - "corres": b"res/rt_is", - "linkage": b"res/ptv_is", - "prio": b"res/added", + "corres": "res/rt_is", + "linkage": "res/ptv_is", + "prio": "res/added", } @@ -1346,91 +1347,91 @@ def trackback_c(run_info: TrackingRun): } -# class Tracker: -# """ -# Workflow: instantiate, call restart() to initialize the frame buffer, then. +class Tracker: + """ + Workflow: instantiate, call restart() to initialize the frame buffer, then. -# call either ``step_forward()`` while it still return True, then call -# ``finalize()`` to finish the run. Alternatively, ``full_forward()`` will -# do all this for you. -# """ + call either ``step_forward()`` while it still return True, then call + ``finalize()`` to finish the run. Alternatively, ``full_forward()`` will + do all this for you. + """ -# def __init__( -# self, -# cpar: ControlPar, -# vpar: VolumePar, -# tpar: TrackPar, -# spar: SequencePar, -# cals: List[Calibration], -# naming: dict, -# flatten_tol: float = 0.0001, -# ): -# """ -# Initialize the tracker. - -# Arguments: -# --------- -# ControlPar cpar, VolumePar vpar, TrackPar tpar, -# SequencePar spar - the usual parameter objects, as read from -# anywhere. -# cals - a list of Calibratiopn objects. -# dict naming - a dictionary with naming rules for the frame buffer -# files. See the ``default_naming`` member (which is the default). -# """ -# # We need to keep a reference to the Python objects so that their -# # allocations are not freed. -# self._keepalive = (cpar, vpar, tpar, spar, cals) - -# self.run_info = tr_new( -# spar, -# tpar, -# vpar, -# cpar, -# TR_BUFSPACE, -# MAX_TARGETS, -# naming["corres"], -# naming["linkage"], -# naming["prio"], -# cals, -# flatten_tol, -# ) -# self.step = self.run_info.seq_par.first - -# def restart(self): -# """ -# Prepare a tracking run. Sets up initial buffers and performs the. - -# one-time calculations used throughout the loop. -# """ -# self.step = self.run_info.seq_par.first -# track_forward_start(self.run_info) - -# def step_forward(self): -# """Perform one tracking step for the current frame of iteration.""" -# if self.step >= self.run_info.seq_par.last: -# return False - -# trackcorr_c_loop(self.run_info, self.step) -# self.step += 1 -# return True - -# def finalize(self): -# """Finish a tracking run.""" -# trackcorr_c_finish(self.run_info, self.step) - -# def full_forward(self): -# """Do a full tracking run from restart to finalize.""" -# track_forward_start(self.run_info) -# for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last): -# trackcorr_c_loop(self.run_info, step) -# trackcorr_c_finish(self.run_info, self.run_info.seq_par.last) - -# def full_backward(self): -# """Do a full backward run on existing tracking results. so make sure. - -# results exist or it will explode in your face. -# """ -# trackback_c(self.run_info) - -# def current_step(self): -# return self.step + def __init__( + self, + cpar: ControlPar, + vpar: VolumePar, + tpar: TrackPar, + spar: SequencePar, + cals: List[Calibration], + naming: dict, + flatten_tol: float = 0.0001, + ): + """ + Initialize the tracker. + + Arguments: + --------- + ControlPar cpar, VolumePar vpar, TrackPar tpar, + SequencePar spar - the usual parameter objects, as read from + anywhere. + cals - a list of Calibratiopn objects. + dict naming - a dictionary with naming rules for the frame buffer + files. See the ``default_naming`` member (which is the default). + """ + # We need to keep a reference to the Python objects so that their + # allocations are not freed. + self._keepalive = (cpar, vpar, tpar, spar, cals) + + self.run_info = tr_new( + spar, + tpar, + vpar, + cpar, + TR_BUFSPACE, + MAX_TARGETS, + naming["corres"], + naming["linkage"], + naming["prio"], + cals, + flatten_tol, + ) + self.step = self.run_info.seq_par.first + + def restart(self): + """ + Prepare a tracking run. Sets up initial buffers and performs the. + + one-time calculations used throughout the loop. + """ + self.step = self.run_info.seq_par.first + track_forward_start(self.run_info) + + def step_forward(self): + """Perform one tracking step for the current frame of iteration.""" + if self.step >= self.run_info.seq_par.last: + return False + + trackcorr_c_loop(self.run_info, self.step) + self.step += 1 + return True + + def finalize(self): + """Finish a tracking run.""" + trackcorr_c_finish(self.run_info, self.step) + + def full_forward(self): + """Do a full tracking run from restart to finalize.""" + track_forward_start(self.run_info) + for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last): + trackcorr_c_loop(self.run_info, step) + trackcorr_c_finish(self.run_info, self.run_info.seq_par.last) + + def full_backward(self): + """Do a full backward run on existing tracking results. so make sure. + + results exist or it will explode in your face. + """ + trackback_c(self.run_info) + + def current_step(self): + return self.step