diff --git a/openptv_python/track.py b/openptv_python/track.py index e05914f..886ab01 100644 --- a/openptv_python/track.py +++ b/openptv_python/track.py @@ -21,7 +21,7 @@ from .imgcoord import img_coord from .orientation import point_position from .parameters import ControlPar, TrackPar -from .tracking_frame_buf import Frame, Target +from .tracking_frame_buf import Corres, Frame, Pathinfo, Target from .tracking_run import TrackingRun from .trafo import dist_to_flat, metric_to_pixel, pixel_to_metric from .vec_utils import vec_copy, vec_diff_norm, vec_subt @@ -628,7 +628,7 @@ def sorted_candidates_in_volume( if num_cands > 0: points = points[:num_cands] + [Foundpix(ftnr=TR_UNUSED)] else: - points = [] + points = [Foundpix(ftnr=TR_UNUSED)] return points @@ -743,11 +743,20 @@ def add_particle(frm: Frame, pos: np.ndarray, cand_inds: np.ndarray) -> None: - cand_inds (list[list[int]]): Indices of candidate targets for association with this particle. """ num_parts = frm.num_parts - ref_path_inf = frm.path_info[num_parts] + if num_parts > 0: + ref_path_inf = frm.path_info[num_parts] + ref_corres = frm.correspond[num_parts] + else: + ref_path_inf = Pathinfo() + frm.path_info.append(ref_path_inf) + + ref_corres = Corres() + frm.correspond.append(ref_corres) + ref_path_inf.x = vec_copy(pos) ref_path_inf.reset_links() - ref_corres = frm.correspond[num_parts] + ref_targets = frm.targets for cam in range(frm.num_cams): @@ -852,7 +861,7 @@ def trackcorr_c_loop(run_info, step): # Continue to find candidates for the candidates. count2 += 1 mm = 0 - while w[mm].ftnr != TR_UNUSED: # counter1-loop + while w[mm].ftnr != TR_UNUSED and len(fb.buf[2].path_info) > w[mm].ftnr: # counter1-loop # search for found corr of current the corr in next_frame with predicted location # found 3D-position @@ -878,7 +887,7 @@ def trackcorr_c_loop(run_info, step): if len(wn) > 0: # not empty count3 += 1 kk = 0 - while wn[kk].ftnr != TR_UNUSED: + while wn[kk].ftnr != TR_UNUSED and len(fb.buf[3].path_info) > wn[kk].ftnr: # print(f" inside wn[{kk}].ftnr {wn[kk].ftnr}") ref_path_inf = fb.buf[3].path_info[wn[kk].ftnr] X[4] = vec_copy(ref_path_inf.x) diff --git a/tests/test_tracking_run.py b/tests/test_tracking_run.py index 6c52042..91d264e 100644 --- a/tests/test_tracking_run.py +++ b/tests/test_tracking_run.py @@ -8,11 +8,25 @@ @author: yosef """ +import os import shutil import unittest from pathlib import Path +from typing import List + +import numpy as np from openptv_python.calibration import Calibration, read_calibration +from openptv_python.constants import MAX_TARGETS +from openptv_python.parameters import read_control_par +from openptv_python.track import ( + track_forward_start, + trackback_c, + trackcorr_c_finish, + trackcorr_c_loop, +) +from openptv_python.tracking_run import tr_new +from openptv_python.vec_utils import vec_norm EPS = 1e-9 @@ -66,314 +80,325 @@ def read_all_calibration(num_cams: int = 4) -> list[Calibration]: return calib -# class TestTrackCorrNoAdd(unittest.TestCase): -# """Test tracking without adding particles.""" +class TestTrackCorrNoAdd(unittest.TestCase): + """Test tracking without adding particles.""" -# def test_trackcorr_no_add(self): -# """Test tracking without adding particles.""" -# # import os + def test_trackcorr_no_add(self): + """Test tracking without adding particles.""" + # import os -# current_directory = Path.cwd() -# print(f"working from {current_directory}") -# directory = Path("tests/testing_fodder/track") + current_directory = Path.cwd() + print(f"working from {current_directory}") + directory = Path("tests/testing_fodder/track") -# os.chdir(directory) + os.chdir(directory) -# print(os.path.abspath(os.curdir)) -# print(Path.cwd()) + print(os.path.abspath(os.curdir)) + print(Path.cwd()) -# if Path("res/").exists(): -# remove_directory("res/") + if Path("res/").exists(): + remove_directory("res/") -# if Path("img/").exists(): -# remove_directory("img/") + if Path("img/").exists(): + remove_directory("img/") -# copy_directory("res_orig/", "res/") -# copy_directory("img_orig/", "img/") + copy_directory("res_orig/", "res/") + copy_directory("img_orig/", "img/") -# print("----------------------------") -# print("Test tracking multiple files 2 cameras, 1 particle") -# cpar = read_control_par("parameters/ptv.par") + print("----------------------------") + print("Test tracking multiple files 2 cameras, 1 particle") + cpar = read_control_par("parameters/ptv.par") -# calib = read_all_calibration(cpar.num_cams) + calib = read_all_calibration(cpar.num_cams) -# run = tr_new( -# "parameters/sequence.par", -# "parameters/track.par", -# "parameters/criteria.par", -# "parameters/ptv.par", -# 4, -# 20000, -# "res/rt_is", -# "res/ptv_is", -# "res/added", -# calib, -# 10000.0, -# ) -# run.tpar.add = 0 -# print(f"run.seq_par.first = {run.seq_par.first} run.seq_par.last = {run.seq_par.last}") + run = tr_new( + "parameters/sequence.par", + "parameters/track.par", + "parameters/criteria.par", + "parameters/ptv.par", + 4, + 20000, + "res/rt_is", + "res/ptv_is", + "res/added", + calib, + 10000.0, + ) + run.tpar.add = 0 + print(f"run.seq_par.first = {run.seq_par.first} run.seq_par.last = {run.seq_par.last}") -# track_forward_start(run) -# trackcorr_c_loop(run, run.seq_par.first) + track_forward_start(run) + trackcorr_c_loop(run, run.seq_par.first) -# for step in range(run.seq_par.first + 1, run.seq_par.last): -# # print(f"step = {step}") -# trackcorr_c_loop(run, step) -# # print(f"run.npart = {run.npart}") -# # print(f"run.nlinks = {run.nlinks}") + for step in range(run.seq_par.first + 1, run.seq_par.last): + # print(f"step = {step}") + trackcorr_c_loop(run, step) + # print(f"run.npart = {run.npart}") + # print(f"run.nlinks = {run.nlinks}") -# trackcorr_c_finish(run, run.seq_par.last) + trackcorr_c_finish(run, run.seq_par.last) -# range_val = run.seq_par.last - run.seq_par.first -# npart = run.npart / range_val -# nlinks = run.nlinks / range_val + range_val = run.seq_par.last - run.seq_par.first + npart = run.npart / range_val + nlinks = run.nlinks / range_val -# self.assertAlmostEqual(npart, 0.9, delta=EPS) -# self.assertAlmostEqual(nlinks, 0.8, delta=EPS) + self.assertAlmostEqual(npart, 0.9, delta=EPS) + self.assertAlmostEqual(nlinks, 0.8, delta=EPS) -# remove_directory("res/") -# remove_directory("img/") + remove_directory("res/") + remove_directory("img/") -# os.chdir(current_directory) + os.chdir(current_directory) -# def test_trackcorr_add(self): -# """Test tracking without adding particles.""" -# # import os + def test_trackcorr_add(self): + """Test tracking without adding particles.""" + # import os -# current_directory = Path.cwd() -# print(f"working from {current_directory}") -# directory = Path("tests/testing_fodder/track") + current_directory = Path.cwd() + print(f"working from {current_directory}") + directory = Path("tests/testing_fodder/track") -# os.chdir(directory) + os.chdir(directory) -# print(os.path.abspath(os.curdir)) -# print(Path.cwd()) + print(os.path.abspath(os.curdir)) + print(Path.cwd()) -# if Path("res/").exists(): -# remove_directory("res/") + if Path("res/").exists(): + remove_directory("res/") -# if Path("img/").exists(): -# remove_directory("img/") + if Path("img/").exists(): + remove_directory("img/") -# copy_directory("res_orig/", "res/") -# copy_directory("img_orig/", "img/") + copy_directory("res_orig/", "res/") + copy_directory("img_orig/", "img/") -# print("----------------------------") -# print("Test tracking multiple files 2 cameras, 1 particle") -# cpar = read_control_par("parameters/ptv.par") + print("----------------------------") + print("Test tracking multiple files 2 cameras, 1 particle") + cpar = read_control_par("parameters/ptv.par") -# calib = read_all_calibration(cpar.num_cams) -# # calib.append(Calibration()) + calib = read_all_calibration(cpar.num_cams) + # calib.append(Calibration()) -# run = tr_new( -# "parameters/sequence.par", -# "parameters/track.par", -# "parameters/criteria.par", -# "parameters/ptv.par", -# 4, -# 20000, -# "res/rt_is", -# "res/ptv_is", -# "res/added", -# calib, -# 10000.0, -# ) + run = tr_new( + "parameters/sequence.par", + "parameters/track.par", + "parameters/criteria.par", + "parameters/ptv.par", + 4, + 20000, + "res/rt_is", + "res/ptv_is", + "res/added", + calib, + 10000.0, + ) -# run.seq_par.first = 10240 -# run.seq_par.last = 10250 -# run.tpar.add = 1 + run.seq_par.first = 10240 + run.seq_par.last = 10250 + run.tpar.add = 1 -# track_forward_start(run) -# trackcorr_c_loop(run, run.seq_par.first) + track_forward_start(run) + trackcorr_c_loop(run, run.seq_par.first) -# for step in range(run.seq_par.first + 1, run.seq_par.last): -# trackcorr_c_loop(run, step) + for step in range(run.seq_par.first + 1, run.seq_par.last): + trackcorr_c_loop(run, step) -# trackcorr_c_finish(run, run.seq_par.last) + trackcorr_c_finish(run, run.seq_par.last) -# remove_directory("res/") -# remove_directory("img/") + remove_directory("res/") + remove_directory("img/") -# range_val = run.seq_par.last - run.seq_par.first -# npart = run.npart / range_val -# nlinks = run.nlinks / range_val + range_val = run.seq_par.last - run.seq_par.first + npart = run.npart / range_val + nlinks = run.nlinks / range_val -# self.assertTrue( -# abs(npart - 1.0) < EPS, -# f"Was expecting npart == 208/210 but found {npart}" -# ) -# self.assertTrue( -# abs(nlinks - 0.7) < EPS, -# f"Was expecting nlinks == 328/210 but found {nlinks}" -# ) + self.assertTrue( + abs(npart - 1.0) < EPS, + f"Was expecting npart == 208/210 but found {npart}" + ) + self.assertTrue( + abs(nlinks - 0.7) < EPS, + f"Was expecting nlinks == 328/210 but found {nlinks}" + ) -# os.chdir(current_directory) + os.chdir(current_directory) -# class TestTrackback(unittest.TestCase): -# """ Test tracking with adding particles. """ -# def test_trackback(self): -# """ Test tracking with adding particles. """ +class TestTrackback(unittest.TestCase): + """Test tracking with adding particles.""" -# current_directory = Path.cwd() -# print(f"working from {current_directory}") -# directory = Path("tests/testing_fodder/track") + def test_trackback(self): + """Test tracking with adding particles.""" + current_directory = Path.cwd() + print(f"working from {current_directory}") + directory = Path("tests/testing_fodder/track") -# os.chdir(directory) + os.chdir(directory) -# print(os.path.abspath(os.curdir)) -# print(Path.cwd()) + print(os.path.abspath(os.curdir)) + print(Path.cwd()) -# if Path("res/").exists(): -# remove_directory("res/") + if Path("res/").exists(): + remove_directory("res/") -# if Path("img/").exists(): -# remove_directory("img/") + if Path("img/").exists(): + remove_directory("img/") -# copy_directory("res_orig/", "res/") -# copy_directory("img_orig/", "img/") + copy_directory("res_orig/", "res/") + copy_directory("img_orig/", "img/") -# print("----------------------------") -# print("Test tracking multiple files 2 cameras, 1 particle") -# cpar = read_control_par("parameters/ptv.par") + print("----------------------------") + print("Test tracking multiple files 2 cameras, 1 particle") + cpar = read_control_par("parameters/ptv.par") -# calib = read_all_calibration(cpar.num_cams) -# # calib.append(Calibration()) + calib = read_all_calibration(cpar.num_cams) + # calib.append(Calibration()) -# run = tr_new( -# "parameters/sequence.par", -# "parameters/track.par", -# "parameters/criteria.par", -# "parameters/ptv.par", -# 4, -# 20000, -# "res/rt_is", -# "res/ptv_is", -# "res/added", -# calib, -# 10000.0, -# ) + run = tr_new( + "parameters/sequence.par", + "parameters/track.par", + "parameters/criteria.par", + "parameters/ptv.par", + 4, + 20000, + "res/rt_is", + "res/ptv_is", + "res/added", + calib, + 10000.0, + ) -# run.seq_par.first = 10240 -# run.seq_par.last = 10250 -# run.tpar.add = 1 + run.seq_par.first = 10240 + run.seq_par.last = 10250 + run.tpar.add = 1 -# track_forward_start(run) -# trackcorr_c_loop(run, run.seq_par.first) + track_forward_start(run) + trackcorr_c_loop(run, run.seq_par.first) -# for step in range(run.seq_par.first + 1, run.seq_par.last): -# trackcorr_c_loop(run, step) + for step in range(run.seq_par.first + 1, run.seq_par.last): + trackcorr_c_loop(run, step) -# trackcorr_c_finish(run, run.seq_par.last) + trackcorr_c_finish(run, run.seq_par.last) -# run.tpar.dvxmin = run.tpar.dvymin = run.tpar.dvzmin = -50 -# run.tpar.dvxmax = run.tpar.dvymax = run.tpar.dvzmax = 50 + run.tpar.dvxmin = run.tpar.dvymin = run.tpar.dvzmin = -50 + run.tpar.dvxmax = run.tpar.dvymax = run.tpar.dvzmax = 50 -# run.lmax = vec_norm( -# np.r_[ -# (run.tpar.dvxmin - run.tpar.dvxmax), -# (run.tpar.dvymin - run.tpar.dvymax), -# (run.tpar.dvzmin - run.tpar.dvzmax) -# ] -# ) + run.lmax = vec_norm( + np.r_[ + (run.tpar.dvxmin - run.tpar.dvxmax), + (run.tpar.dvymin - run.tpar.dvymax), + (run.tpar.dvzmin - run.tpar.dvzmax) + ] + ) -# nlinks = trackback_c(run) + nlinks = trackback_c(run) -# print(f"nlinks = {nlinks}") + print(f"nlinks = {nlinks}") -# # Uncomment the following lines to add assertions -# self.assertTrue( -# abs(nlinks - 1.0) < EPS, -# f"Was expecting nlinks to be 1.0 but found {nlinks}" -# ) + # Uncomment the following lines to add assertions + self.assertTrue( + abs(nlinks - 1.0) < EPS, + f"Was expecting nlinks to be 1.0 but found {nlinks}" + ) -# remove_directory("res/") -# remove_directory("img/") + remove_directory("res/") + remove_directory("img/") -# os.chdir(current_directory) + os.chdir(current_directory) class TestNewParticle(unittest.TestCase): """Test tracking with adding particles.""" - # def test_new_particle(self): - # """Test tracking without adding particles.""" - # ori_tmpl = "cal/sym_cam%d.tif.ori" - # added_name = "cal/cam1.tif.addpar" - # ori_name = "" - - - - # current_directory = Path.cwd() - # print(f"working from {current_directory}") - - # os.chdir("tests/testing_fodder/") - # print(os.path.abspath(os.curdir)) - - # # Set up all scene parameters to track one specially-contrived trajectory. - # calib: List[Calibration] = [] - # for cam in range(3): - # ori_name = ori_tmpl % (cam + 1) - # cal = Calibration() - # cal.from_file(ori_name, added_name) - # calib.append(cal) - - # os.chdir("track/") - # print(os.path.abspath(os.curdir)) - - # copy_directory("res_orig/", "res/") - # copy_directory("img_orig/", "img/") - - # run = tr_new( - # "parameters/sequence_newpart.par", - # "parameters/track.par", - # "parameters/criteria.par", - # "parameters/control_newpart.par", - # 4, - # MAX_TARGETS, - # "res/particles", - # "res/linkage", - # "res/whatever", - # calib, - # 0.1, - # ) - - # run.tpar.add = 0 - - # track_forward_start(run) - # trackcorr_c_loop(run, 10001) - # trackcorr_c_loop(run, 10002) - # trackcorr_c_loop(run, 10003) - # trackcorr_c_loop(run, 10004) - # # trackcorr_c_finish(run, 10004) - - # run.fb.fb_prev() # because each loop step moves the FB forward - # self.assertEqual(run.fb.buf[3].path_info[0].next_frame, -2) - # print("next is", run.fb.buf[3].path_info[0].next_frame) - - # run.tpar.add = 1 - # track_forward_start(run) - # trackcorr_c_loop(run, 10001) - # trackcorr_c_loop(run, 10002) - # trackcorr_c_loop(run, 10003) - # trackcorr_c_loop(run, 10004) - # # trackcorr_c_finish(run, 10004) - - - # run.fb.fb_prev() # because each loop step moves the FB forward - # self.assertEqual(run.fb.buf[3].path_info[0].next_frame, 0) - # print("next is", run.fb.buf[3].path_info[0].next_frame) - - # remove_directory("res/") - # remove_directory("img/") - - # os.chdir(current_directory) + def test_new_particle(self): + """Test tracking without adding particles.""" + ori_tmpl = "cal/sym_cam%d.tif.ori" + added_name = "cal/cam1.tif.addpar" + + + + current_directory = Path.cwd() + print(f"working from {current_directory}") + + os.chdir("tests/testing_fodder/") + print(os.path.abspath(os.curdir)) + + # Set up all scene parameters to track one specially-contrived trajectory. + calib: List[Calibration] = [] + for cam in range(3): + ori_name = ori_tmpl % (cam + 1) + cal = Calibration() + cal.from_file(ori_name, added_name) + calib.append(cal) + + os.chdir("track/") + print(os.path.abspath(os.curdir)) + + copy_directory("res_orig/", "res/") + copy_directory("img_orig/", "img/") + + run = tr_new( + "parameters/sequence_newpart.par", + "parameters/track.par", + "parameters/criteria.par", + "parameters/control_newpart.par", + 4, + MAX_TARGETS, + "res/particles", + "res/linkage", + "res/whatever", + calib, + 0.1, + ) + + run.tpar.add = 0 + + track_forward_start(run) + trackcorr_c_loop(run, 10001) + trackcorr_c_loop(run, 10002) + trackcorr_c_loop(run, 10003) + trackcorr_c_loop(run, 10004) + + run.fb.fb_prev() # because each loop step moves the FB forward + self.assertEqual(run.fb.buf[3].path_info[0].next_frame, -2) + print("next is", run.fb.buf[3].path_info[0].next_frame) + + # run = tr_new( + # "parameters/sequence_newpart.par", + # "parameters/track.par", + # "parameters/criteria.par", + # "parameters/control_newpart.par", + # 4, + # MAX_TARGETS, + # "res/particles", + # "res/linkage", + # "res/whatever", + # calib, + # 0.1, + # ) + run.tpar.add = 1 + track_forward_start(run) + trackcorr_c_loop(run, 10001) + trackcorr_c_loop(run, 10002) + trackcorr_c_loop(run, 10003) + trackcorr_c_loop(run, 10004) + # trackcorr_c_finish(run, 10004) + + + run.fb.fb_prev() # because each loop step moves the FB forward + # self.assertEqual(run.fb.buf[3].path_info[0].next_frame, 0) + print("next is", run.fb.buf[3].path_info[0].next_frame) + + remove_directory("res/") + remove_directory("img/") + + os.chdir(current_directory) if __name__ == "__main__":