Skip to content

Commit

Permalink
fixes from pyptv
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlib committed Dec 16, 2023
1 parent e2d7cb4 commit 0bc357f
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 103 deletions.
18 changes: 12 additions & 6 deletions openptv_python/image_processing.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
"""Image processing functions."""
import copy
from functools import partial

import numpy as np
from numba import njit

# from numba import njit
from scipy import ndimage
from scipy.ndimage import uniform_filter

from .parameters import ControlPar

filter_t = np.zeros((3, 3), dtype=float)

@njit
# @njit
def filter_3(img, kernel=None) -> np.ndarray:
"""Apply a 3x3 filter to an image."""
if kernel is None: # default is a low pass
kernel = np.ones((3, 3)) / 9
filtered_img = ndimage.convolve(img, kernel)
return filtered_img

@njit
# @njit
def lowpass_3(img: np.ndarray) -> np.ndarray:
"""Lowpass filter of 3x3."""
# Define the 3x3 lowpass filter kernel
Expand All @@ -29,7 +31,7 @@ def lowpass_3(img: np.ndarray) -> np.ndarray:

return img_lp

@njit
# @njit
def fast_box_blur(filt_span: int, src: np.ndarray, cpar: ControlPar) -> np.ndarray:
"""Fast box blur."""
n = 2 * filt_span + 1
Expand Down Expand Up @@ -57,7 +59,7 @@ def fast_box_blur(filt_span: int, src: np.ndarray, cpar: ControlPar) -> np.ndarr
# new_img = map_coordinates(img, [coords_y, coords_x], mode="constant", cval=0)
# return new_img

@njit
# @njit
def subtract_img(img1: np.ndarray, img2: np.ndarray, img_new: np.ndarray) -> None:
"""
Subtract img2 from img1 and store the result in img_new.
Expand All @@ -69,7 +71,7 @@ def subtract_img(img1: np.ndarray, img2: np.ndarray, img_new: np.ndarray) -> Non
"""
img_new[:] = ndimage.maximum(img1 - img2, 0)

@njit
# @njit
def subtract_mask(img: np.ndarray, img_mask: np.ndarray):
"""Subtract mask from image."""
img_new = np.where(img_mask == 0, 0, img)
Expand Down Expand Up @@ -136,6 +138,10 @@ def prepare_image(

return img_hp

def preprocess_image():
"""Decorate prepare_image with default parameters."""
return partial(prepare_image, dim_lp=1, filter_hp=0, filter_file="")


# def preprocess_image(
# input_img: np.ndarray,
Expand Down
8 changes: 4 additions & 4 deletions openptv_python/multimed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from typing import List, Tuple

import numpy as np
from numba import njit

# from numba import njit
from .calibration import Calibration, Exterior, Glass
from .parameters import (
ControlPar,
Expand Down Expand Up @@ -57,7 +57,7 @@ def multimed_r_nlay(cal: Calibration, mm: MultimediaPar, pos: np.ndarray) -> flo
return mmf


@njit
# @njit
def fast_multimed_r_nlay(
nlay:int,
n1: float,
Expand Down Expand Up @@ -203,7 +203,7 @@ def back_trans_point(

return pos

@njit
# @njit
def move_along_ray(glob_z: float, vertex: np.ndarray, direct: np.ndarray) -> np.ndarray:
"""Move along the ray to the global z plane.
Expand Down Expand Up @@ -328,7 +328,7 @@ def get_mmf_from_mmlut(cal: Calibration, pos: np.ndarray) -> float:

return fast_get_mmf_from_mmlut(rw, origin, data, nz, nr, pos)

@njit
# @njit
def fast_get_mmf_from_mmlut(
rw: int,
origin: np.ndarray,
Expand Down
16 changes: 15 additions & 1 deletion openptv_python/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,18 @@ def from_file(cls, filename: str):

return ret

def get_grey_thresholds(self):
"""Return the grey thresholds."""
return self.gvthresh

def get_pixel_count_bounds(self):
"""Return the pixel count bounds."""
return (self.nnmin, self.nnmax)

def read_target_par(filename: str) -> TargetPar:
"""Read target parameters from file and returns target_par object."""
return TargetPar().from_file(filename)
tpar = TargetPar()
return tpar.from_file(filename)

def compare_target_par(targ1: TargetPar, targ2: TargetPar) -> bool:
"""Compare two target_par objects."""
Expand Down Expand Up @@ -799,6 +807,12 @@ def from_file(cls, file_path: str):
combine_flag = bool(int(file.readline().strip()))
return cls(examine_flag, combine_flag)

def read_examine_par(file_path: str) -> ExaminePar:
"""Read from examine.par file."""
with open(file_path, 'r', encoding="utf-8") as file:
examine_flag = bool(int(file.readline().strip()))
combine_flag = bool(int(file.readline().strip()))
return ExaminePar(examine_flag, combine_flag)

@dataclass
class PftVersionPar(Parameters):
Expand Down
2 changes: 1 addition & 1 deletion openptv_python/segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def peak_fit_new(
peaks = []
for i in range(num_objects):
indices = np.argwhere(labeled == i + 1)
coordinates = [center_of_mass(mask[indices[:, 0], indices[:, 1]])[::-1]]
coordinates = np.array([center_of_mass(mask[indices[:, 0], indices[:, 1]])[::-1]])
intensity = smoothed[indices[:, 0], indices[:, 1]].max()
x, y = np.mean(coordinates, axis=0)
peaks.append(Peak(int(round(x)), int(round(y)), intensity))
Expand Down
183 changes: 92 additions & 91 deletions openptv_python/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
COORD_UNUSED,
CORRES_NONE,
MAX_CANDS,
MAX_TARGETS,
NEXT_NONE,
POS_INF,
PREV_NONE,
Expand All @@ -20,16 +21,16 @@
)
from .imgcoord import img_coord
from .orientation import point_position
from .parameters import ControlPar, TrackPar
from .parameters import ControlPar, SequencePar, TrackPar, VolumePar
from .tracking_frame_buf import Corres, Frame, Pathinfo, Target
from .tracking_run import TrackingRun
from .tracking_run import TrackingRun, tr_new
from .trafo import dist_to_flat, metric_to_pixel, pixel_to_metric
from .vec_utils import vec_copy, vec_diff_norm, vec_subt

default_naming = {
"corres": b"res/rt_is",
"linkage": b"res/ptv_is",
"prio": b"res/added",
"corres": "res/rt_is",
"linkage": "res/ptv_is",
"prio": "res/added",
}


Expand Down Expand Up @@ -1346,91 +1347,91 @@ def trackback_c(run_info: TrackingRun):
}


# class Tracker:
# """
# Workflow: instantiate, call restart() to initialize the frame buffer, then.
class Tracker:
"""
Workflow: instantiate, call restart() to initialize the frame buffer, then.
# call either ``step_forward()`` while it still return True, then call
# ``finalize()`` to finish the run. Alternatively, ``full_forward()`` will
# do all this for you.
# """
call either ``step_forward()`` while it still return True, then call
``finalize()`` to finish the run. Alternatively, ``full_forward()`` will
do all this for you.
"""

# def __init__(
# self,
# cpar: ControlPar,
# vpar: VolumePar,
# tpar: TrackPar,
# spar: SequencePar,
# cals: List[Calibration],
# naming: dict,
# flatten_tol: float = 0.0001,
# ):
# """
# Initialize the tracker.

# Arguments:
# ---------
# ControlPar cpar, VolumePar vpar, TrackPar tpar,
# SequencePar spar - the usual parameter objects, as read from
# anywhere.
# cals - a list of Calibratiopn objects.
# dict naming - a dictionary with naming rules for the frame buffer
# files. See the ``default_naming`` member (which is the default).
# """
# # We need to keep a reference to the Python objects so that their
# # allocations are not freed.
# self._keepalive = (cpar, vpar, tpar, spar, cals)

# self.run_info = tr_new(
# spar,
# tpar,
# vpar,
# cpar,
# TR_BUFSPACE,
# MAX_TARGETS,
# naming["corres"],
# naming["linkage"],
# naming["prio"],
# cals,
# flatten_tol,
# )
# self.step = self.run_info.seq_par.first

# def restart(self):
# """
# Prepare a tracking run. Sets up initial buffers and performs the.

# one-time calculations used throughout the loop.
# """
# self.step = self.run_info.seq_par.first
# track_forward_start(self.run_info)

# def step_forward(self):
# """Perform one tracking step for the current frame of iteration."""
# if self.step >= self.run_info.seq_par.last:
# return False

# trackcorr_c_loop(self.run_info, self.step)
# self.step += 1
# return True

# def finalize(self):
# """Finish a tracking run."""
# trackcorr_c_finish(self.run_info, self.step)

# def full_forward(self):
# """Do a full tracking run from restart to finalize."""
# track_forward_start(self.run_info)
# for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last):
# trackcorr_c_loop(self.run_info, step)
# trackcorr_c_finish(self.run_info, self.run_info.seq_par.last)

# def full_backward(self):
# """Do a full backward run on existing tracking results. so make sure.

# results exist or it will explode in your face.
# """
# trackback_c(self.run_info)

# def current_step(self):
# return self.step
def __init__(
self,
cpar: ControlPar,
vpar: VolumePar,
tpar: TrackPar,
spar: SequencePar,
cals: List[Calibration],
naming: dict,
flatten_tol: float = 0.0001,
):
"""
Initialize the tracker.
Arguments:
---------
ControlPar cpar, VolumePar vpar, TrackPar tpar,
SequencePar spar - the usual parameter objects, as read from
anywhere.
cals - a list of Calibratiopn objects.
dict naming - a dictionary with naming rules for the frame buffer
files. See the ``default_naming`` member (which is the default).
"""
# We need to keep a reference to the Python objects so that their
# allocations are not freed.
self._keepalive = (cpar, vpar, tpar, spar, cals)

self.run_info = tr_new(
spar,
tpar,
vpar,
cpar,
TR_BUFSPACE,
MAX_TARGETS,
naming["corres"],
naming["linkage"],
naming["prio"],
cals,
flatten_tol,
)
self.step = self.run_info.seq_par.first

def restart(self):
"""
Prepare a tracking run. Sets up initial buffers and performs the.
one-time calculations used throughout the loop.
"""
self.step = self.run_info.seq_par.first
track_forward_start(self.run_info)

def step_forward(self):
"""Perform one tracking step for the current frame of iteration."""
if self.step >= self.run_info.seq_par.last:
return False

trackcorr_c_loop(self.run_info, self.step)
self.step += 1
return True

def finalize(self):
"""Finish a tracking run."""
trackcorr_c_finish(self.run_info, self.step)

def full_forward(self):
"""Do a full tracking run from restart to finalize."""
track_forward_start(self.run_info)
for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last):
trackcorr_c_loop(self.run_info, step)
trackcorr_c_finish(self.run_info, self.run_info.seq_par.last)

def full_backward(self):
"""Do a full backward run on existing tracking results. so make sure.
results exist or it will explode in your face.
"""
trackback_c(self.run_info)

def current_step(self):
return self.step

0 comments on commit 0bc357f

Please sign in to comment.