From 44c89169a0e9d8c9021d24f441d44d4d4c93b347 Mon Sep 17 00:00:00 2001 From: Alex Liberzon Date: Fri, 17 Nov 2023 07:10:34 +0200 Subject: [PATCH] working on read_targets that fails now --- openptv_python/parameters.py | 2 +- openptv_python/segmentation.py | 8 +- openptv_python/track.py | 201 +++++++++--------- openptv_python/tracking_frame_buf.py | 104 +++++---- openptv_python/tracking_run.py | 58 ++--- .../{test_tracker.py => test_tracker.py.bck} | 0 tests/test_tracking_run.py | 47 ++-- 7 files changed, 200 insertions(+), 220 deletions(-) rename tests/{test_tracker.py => test_tracker.py.bck} (100%) diff --git a/openptv_python/parameters.py b/openptv_python/parameters.py index ec2b271..44e4505 100644 --- a/openptv_python/parameters.py +++ b/openptv_python/parameters.py @@ -84,7 +84,7 @@ def __post_init__(self): if len(self.img_base_name) == 0: self.img_base_name = [""] * self.num_cams - def set_img_base_name(self, icam: int, new_name: str | None = None): + def set_img_base_name(self, icam: int, new_name: str): """Set the image base name for each camera.""" if icam > self.num_cams: raise ValueError("Length of names must be equal to num_cams.") diff --git a/openptv_python/segmentation.py b/openptv_python/segmentation.py index a1e5486..5640115 100644 --- a/openptv_python/segmentation.py +++ b/openptv_python/segmentation.py @@ -30,10 +30,10 @@ class Peak: def targ_rec( img: np.ndarray, targ_par: TargetPar, - xmin: float, - xmax: float, - ymin: float, - ymax: float, + xmin: int, + xmax: int, + ymin: int, + ymax: int, cpar: ControlPar, num_cam, ) -> TargetArray: diff --git a/openptv_python/track.py b/openptv_python/track.py index 0942407..be9e4de 100644 --- a/openptv_python/track.py +++ b/openptv_python/track.py @@ -11,7 +11,6 @@ COORD_UNUSED, CORRES_NONE, MAX_CANDS, - MAX_TARGETS, NEXT_NONE, POS_INF, PREV_NONE, @@ -22,9 +21,9 @@ ) from .imgcoord import img_coord from .orientation import point_position -from .parameters import ControlPar, SequencePar, TrackPar, VolumePar +from .parameters import ControlPar, TrackPar from .tracking_frame_buf import Frame, Target -from .tracking_run import TrackingRun, tr_new +from .tracking_run import TrackingRun from .trafo import dist_to_flat, metric_to_pixel, pixel_to_metric from .vec_utils import vec_copy, vec_diff_norm, vec_subt @@ -60,7 +59,10 @@ def reset_foundpix_array(arr: List[Foundpix], arr_len: int, num_cams: int) -> No # Set default values for each whichcam member of the foundpix object for cam in range(num_cams): - arr[i].whichcam[cam] = 0 + if len(arr[i].whichcam) < num_cams: + arr[i].whichcam.append(0) + else: + arr[i].whichcam[cam] = 0 return None @@ -717,21 +719,8 @@ def trackcorr_c_loop(run_info, step): ] * 4 # volume center projection on cameras rr = 0.0 - # Shortcuts to inside current frame - curr_path_inf, ref_path_inf = None, None - curr_corres = None - curr_targets = None _ix = 0 # For use in any of the complex index expressions below orig_parts = 0 # avoid infinite loop with particle addition set - - # Shortcuts into the TrackingRun struct - cal = None - fb = None - tpar = None - vpar = None - cpar = None - - w, wn = None, None count1, num_added = 0, 0 fb = run_info.fb @@ -990,7 +979,7 @@ def trackcorr_c_loop(run_info, step): fb.write_frame_from_start(step) if step < run_info.seq_par.last - 2: - fb.read_frame_at_end(step + 3, 0) + fb.read_frame_at_end(False, step + 3) # end of sequence loop @@ -1232,91 +1221,91 @@ def trackback_c(run_info: TrackingRun): } -class Tracker: - """ - Workflow: instantiate, call restart() to initialize the frame buffer, then. - - call either ``step_forward()`` while it still return True, then call - ``finalize()`` to finish the run. Alternatively, ``full_forward()`` will - do all this for you. - """ - - def __init__( - self, - cpar: ControlPar, - vpar: VolumePar, - tpar: TrackPar, - spar: SequencePar, - cals: List[Calibration], - naming: dict, - flatten_tol: float = 0.0001, - ): - """ - Initialize the tracker. - - Arguments: - --------- - ControlPar cpar, VolumePar vpar, TrackPar tpar, - SequencePar spar - the usual parameter objects, as read from - anywhere. - cals - a list of Calibratiopn objects. - dict naming - a dictionary with naming rules for the frame buffer - files. See the ``default_naming`` member (which is the default). - """ - # We need to keep a reference to the Python objects so that their - # allocations are not freed. - self._keepalive = (cpar, vpar, tpar, spar, cals) - - self.run_info = tr_new( - spar, - tpar, - vpar, - cpar, - TR_BUFSPACE, - MAX_TARGETS, - naming["corres"], - naming["linkage"], - naming["prio"], - cals, - flatten_tol, - ) - self.step = self.run_info.seq_par.first - - def restart(self): - """ - Prepare a tracking run. Sets up initial buffers and performs the. - - one-time calculations used throughout the loop. - """ - self.step = self.run_info.seq_par.first - track_forward_start(self.run_info) - - def step_forward(self): - """Perform one tracking step for the current frame of iteration.""" - if self.step >= self.run_info.seq_par.last: - return False - - trackcorr_c_loop(self.run_info, self.step) - self.step += 1 - return True - - def finalize(self): - """Finish a tracking run.""" - trackcorr_c_finish(self.run_info, self.step) - - def full_forward(self): - """Do a full tracking run from restart to finalize.""" - track_forward_start(self.run_info) - for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last): - trackcorr_c_loop(self.run_info, step) - trackcorr_c_finish(self.run_info, self.run_info.seq_par.last) - - def full_backward(self): - """Do a full backward run on existing tracking results. so make sure. - - results exist or it will explode in your face. - """ - trackback_c(self.run_info) - - def current_step(self): - return self.step +# class Tracker: +# """ +# Workflow: instantiate, call restart() to initialize the frame buffer, then. + +# call either ``step_forward()`` while it still return True, then call +# ``finalize()`` to finish the run. Alternatively, ``full_forward()`` will +# do all this for you. +# """ + +# def __init__( +# self, +# cpar: ControlPar, +# vpar: VolumePar, +# tpar: TrackPar, +# spar: SequencePar, +# cals: List[Calibration], +# naming: dict, +# flatten_tol: float = 0.0001, +# ): +# """ +# Initialize the tracker. + +# Arguments: +# --------- +# ControlPar cpar, VolumePar vpar, TrackPar tpar, +# SequencePar spar - the usual parameter objects, as read from +# anywhere. +# cals - a list of Calibratiopn objects. +# dict naming - a dictionary with naming rules for the frame buffer +# files. See the ``default_naming`` member (which is the default). +# """ +# # We need to keep a reference to the Python objects so that their +# # allocations are not freed. +# self._keepalive = (cpar, vpar, tpar, spar, cals) + +# self.run_info = tr_new( +# spar, +# tpar, +# vpar, +# cpar, +# TR_BUFSPACE, +# MAX_TARGETS, +# naming["corres"], +# naming["linkage"], +# naming["prio"], +# cals, +# flatten_tol, +# ) +# self.step = self.run_info.seq_par.first + +# def restart(self): +# """ +# Prepare a tracking run. Sets up initial buffers and performs the. + +# one-time calculations used throughout the loop. +# """ +# self.step = self.run_info.seq_par.first +# track_forward_start(self.run_info) + +# def step_forward(self): +# """Perform one tracking step for the current frame of iteration.""" +# if self.step >= self.run_info.seq_par.last: +# return False + +# trackcorr_c_loop(self.run_info, self.step) +# self.step += 1 +# return True + +# def finalize(self): +# """Finish a tracking run.""" +# trackcorr_c_finish(self.run_info, self.step) + +# def full_forward(self): +# """Do a full tracking run from restart to finalize.""" +# track_forward_start(self.run_info) +# for step in range(self.run_info.seq_par.first, self.run_info.seq_par.last): +# trackcorr_c_loop(self.run_info, step) +# trackcorr_c_finish(self.run_info, self.run_info.seq_par.last) + +# def full_backward(self): +# """Do a full backward run on existing tracking results. so make sure. + +# results exist or it will explode in your face. +# """ +# trackback_c(self.run_info) + +# def current_step(self): +# return self.step diff --git a/openptv_python/tracking_frame_buf.py b/openptv_python/tracking_frame_buf.py index 2063d7b..b6f950e 100644 --- a/openptv_python/tracking_frame_buf.py +++ b/openptv_python/tracking_frame_buf.py @@ -164,8 +164,11 @@ def read_targets(file_base: str, frame_num: int) -> List[Target]: """Read targets from a file.""" buffer = [] + # # if file_base has an extension, remove it + # file_base = file_base.split(".")[0] + if frame_num > 0: - filename = f"{file_base}{frame_num:04}_targets" + filename = f"{file_base}{frame_num}_targets" else: filename = f"{file_base}_targets" @@ -601,17 +604,21 @@ def read_frame_at_end(self, read_links: bool = False, frame_num: int = 0) -> Non frame = self.buf[-1] # last frame if read_links: - frame.correspond, frame.path_info = read_path_frame( + success = frame.read( self.corres_file_base, self.linkage_file_base, self.prio_file_base, + self.target_file_base, frame_num, ) else: - frame.correspond, frame.path_info = read_path_frame( - self.corres_file_base, "", "", frame_num + success = frame.read( + self.corres_file_base, "", "", self.target_file_base, frame_num ) + if not success: + raise IOError("Could not read frame from disk") + def disk_read_frame_at_end(self, frame_num: int, read_links: bool): """Read a frame to the last position in the ring. @@ -805,61 +812,46 @@ def write_path_frame( """ corres_fname = f"{corres_file_base}.{frame_num}" linkage_fname = f"{linkage_file_base}.{frame_num}" - prio_fname = f"{prio_file_base}.{frame_num}" if prio_file_base else None + prio_fname = f"{prio_file_base}.{frame_num}" if prio_file_base != "" else None + success = False try: - np.savetxt( - corres_fname, - [ - [ - pix + 1, - path_buf[pix].x[0], - path_buf[pix].x[1], - path_buf[pix].x[2], - cor_buf[pix].p[0], - cor_buf[pix].p[1], - cor_buf[pix].p[2], - cor_buf[pix].p[3], - ] - for pix in range(num_parts) - ], - fmt="%4d %9.3f %9.3f %9.3f %4d %4d %4d %4d", - ) - np.savetxt( - linkage_fname, - [ - [ - path_buf[pix].prev_frame, - path_buf[pix].next_frame, - path_buf[pix].x[0], - path_buf[pix].x[1], - path_buf[pix].x[2], - ] - for pix in range(num_parts) - ], - fmt="%4d %4d %10.3f %10.3f %10.3f", - ) - if prio_fname: - np.savetxt( - prio_fname, - [ - [ - path_buf[pix].prev_frame, - path_buf[pix].next_frame, - path_buf[pix].x[0], - path_buf[pix].x[1], - path_buf[pix].x[2], - path_buf[pix].prio, - ] - for pix in range(num_parts) - ], - fmt="%4d %4d %10.3f %10.3f %10.3f %f", - ) - except IOError as exc: - print(f"Error writing file: {exc}") - return False + with open(corres_fname, "w", encoding="utf8") as corres_file: + corres_file.write(f"{num_parts}\n") + with open(linkage_fname, "w", encoding="utf8") as linkage_file: + linkage_file.write(f"{num_parts}\n") + + if prio_file_base is not None: + with open(prio_fname, "w", encoding="utf8") as prio_file: # type: ignore + prio_file.write(f"{num_parts}\n") + + for pix in range(num_parts): + linkage_file.write( + f"{path_buf[pix].prev_frame} {path_buf[pix].next_frame} " + f"{path_buf[pix].x[0]:.3f} {path_buf[pix].x[1]:.3f} " + f"{path_buf[pix].x[2]:.3f}\n" + ) + + corres_file.write( + f"{pix + 1} {path_buf[pix].x[0]:.3f} " + f"{path_buf[pix].x[1]:.3f} {path_buf[pix].x[2]:.3f} " + f"{cor_buf[pix].p[0]} {cor_buf[pix].p[1]} " + f"{cor_buf[pix].p[2]} {cor_buf[pix].p[3]}\n" + ) + + if prio_file_base: + prio_file.write( + f"{path_buf[pix].prev_frame} {path_buf[pix].next_frame} " + f"{path_buf[pix].x[0]:.3f} {path_buf[pix].x[1]:.3f} " + f"{path_buf[pix].x[2]:.3f} {path_buf[pix].prio}\n" + ) - return True + success = True + + except IOError as e: + print(f"Can't open file {e.filename} for writing") + + return success def match_coords( diff --git a/openptv_python/tracking_run.py b/openptv_python/tracking_run.py index 1be93a8..cc42ee3 100644 --- a/openptv_python/tracking_run.py +++ b/openptv_python/tracking_run.py @@ -73,7 +73,14 @@ def __init__( + (tpar.dvzmin - tpar.dvzmax) ** 2 ) - volumedimension( + ( + vpar.x_lay[1], + vpar.x_lay[0], + self.ymax, + self.ymin, + vpar.z_max_lay[1], + vpar.z_min_lay[0], + ) = volumedimension( vpar.x_lay[1], vpar.x_lay[0], self.ymax, @@ -90,10 +97,10 @@ def __init__( def tr_new( - seq_par: SequencePar, - tpar: TrackPar, - vpar: VolumePar, - cpar: ControlPar, + seq_par_fname: str, + tpar_fname: str, + vpar_fname: str, + cpar_fname: str, buf_len: int, max_targets: int, corres_file_base: str, @@ -102,7 +109,16 @@ def tr_new( cal: List[Calibration], flatten_tol: float, ) -> TrackingRun: - """Create a new tracking run.""" + """Create a new tracking run from legacy files.""" + cpar = read_control_par(cpar_fname) + seq_par = read_sequence_par(seq_par_fname, cpar.num_cams) + tpar = read_track_par(tpar_fname) + vpar = read_volume_par(vpar_fname) + + buf_len = 4 + flatten_tol = 10000 + max_targets = 20000 + tr = TrackingRun( seq_par, tpar, @@ -118,33 +134,3 @@ def tr_new( ) return tr - - -def tr_new_legacy( - seq_par_fname: str, - tpar_fname: str, - vpar_fname: str, - cpar_fname: str, - cal: List[Calibration], -) -> TrackingRun: - """Create a new tracking run from legacy files.""" - cpar = read_control_par(cpar_fname) - seq_par = read_sequence_par(seq_par_fname, cpar.num_cams) - tpar = read_track_par(tpar_fname) - vpar = read_volume_par(vpar_fname) - - tr = tr_new( - seq_par, - tpar, - vpar, - cpar, - 4, - 20000, - "res/rt_is", - "res/ptv_is", - "res/added", - cal, - 10000, - ) - - return tr diff --git a/tests/test_tracker.py b/tests/test_tracker.py.bck similarity index 100% rename from tests/test_tracker.py rename to tests/test_tracker.py.bck diff --git a/tests/test_tracking_run.py b/tests/test_tracking_run.py index 6825153..2fa423a 100644 --- a/tests/test_tracking_run.py +++ b/tests/test_tracking_run.py @@ -22,7 +22,7 @@ trackcorr_c_loop, ) from openptv_python.tracking_run import ( - tr_new_legacy, + tr_new, ) EPS = 1e-9 @@ -64,8 +64,8 @@ def copy_directory(source_path, destination_path): def read_all_calibration(num_cams: int = 4) -> list[Calibration]: """Read all calibration files.""" - ori_tmpl = "tests/testing_fodder/track/cal/cam%d.tif.ori" - added_tmpl = "tests/testing_fodder/track/cal/cam%d.tif.addpar" + ori_tmpl = "cal/cam%d.tif.ori" + added_tmpl = "cal/cam%d.tif.addpar" calib = [] @@ -82,25 +82,36 @@ class TestTrackCorrNoAdd(unittest.TestCase): def test_trackcorr_no_add(self): """Test tracking without adding particles.""" - copy_directory( - "tests/testing_fodder/track/res_orig/", "tests/testing_fodder/track/res/" - ) - copy_directory( - "tests/testing_fodder/track/img_orig/", "tests/testing_fodder/track/img/" - ) + import os + + current_directory = os.getcwd() + directory = "tests/testing_fodder/track" + + os.chdir(directory) + + print(os.path.abspath(os.curdir)) + + copy_directory("res_orig", "res") + copy_directory("img_orig", "img") print("----------------------------") print("Test tracking multiple files 2 cameras, 1 particle") - cpar = read_control_par("tests/testing_fodder/track/parameters/ptv.par") + cpar = read_control_par("parameters/ptv.par") calib = read_all_calibration(cpar.num_cams) - run = tr_new_legacy( - "tests/testing_fodder/track/parameters/sequence.par", - "tests/testing_fodder/track/parameters/track.par", - "tests/testing_fodder/track/parameters/criteria.par", - "tests/testing_fodder/track/parameters/ptv.par", + run = tr_new( + "/parameters/sequence.par", + "parameters/track.par", + "parameters/criteria.par", + "parameters/ptv.par", + 4, + 20000, + "res/rt_is", + "res/ptv_is", + "res/added", calib, + 10000.0, ) run.tpar.add = 0 @@ -113,8 +124,8 @@ def test_trackcorr_no_add(self): trackcorr_c_loop(run, step) trackcorr_c_finish(run, run.seq_par.last) - remove_directory("tests/testing_fodder/track/res/") - remove_directory("tests/testing_fodder/track/img/") + remove_directory("res/") + remove_directory("img/") range_val = run.seq_par.last - run.seq_par.first npart = run.npart / range_val @@ -123,6 +134,8 @@ def test_trackcorr_no_add(self): self.assertAlmostEqual(npart, 0.8, delta=EPS) self.assertAlmostEqual(nlinks, 0.8, delta=EPS) + os.chdir(current_directory) + if __name__ == "__main__": unittest.main()