Skip to content

Commit

Permalink
works with the dtype
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlib committed Jan 1, 2024
1 parent 1ec0507 commit 7cc2740
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 61 deletions.
97 changes: 53 additions & 44 deletions openptv_python/track.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Tracking algorithm."""
from dataclasses import dataclass, field
# from dataclasses import dataclass, field
from typing import List, Tuple

import numpy as np
from numba import float64, njit

from .calibration import Calibration
from .constants import (
Expand Down Expand Up @@ -34,54 +35,55 @@
}


@dataclass
class Foundpix:
"""A Foundpix object holds the parameters for a found pixel."""
# @dataclass
# class Foundpix:
# """A Foundpix object holds the parameters for a found pixel."""

ftnr: int = TR_UNUSED
freq: int = 0
whichcam: List[int] = field(default_factory=list)
# ftnr: int = TR_UNUSED
# freq: int = 0
# whichcam: List[int] = field(default_factory=list)

def __post_init__(self):
self.whichcam = [0] * TR_MAX_CAMS
# def __post_init__(self):
# self.whichcam = [0] * TR_MAX_CAMS


def reset_foundpix_array(arr: List[Foundpix], arr_len: int, num_cams: int) -> None:
Foundpix_dtype = np.dtype([
('ftnr', np.int32),
('freq', np.int32),
('whichcam', np.int32, (TR_MAX_CAMS,))
])

# Create an instance of the recarray
# foundpix = np.recarray((1,), dtype=Foundpix_dtype)

# Initialize the recarray with the values from the class
# foundpix['ftnr'] = TR_UNUSED
# foundpix['freq'] = 0
# foundpix['whichcam'] = [0] * TR_MAX_CAMS


# def reset_foundpix_array(arr: List[Foundpix], arr_len: int, num_cams: int) -> None:
# @njit(cache=True, fastmath=True, nogil=True, parallel=True)
def reset_foundpix_array(arr: np.ndarray, arr_len: int, num_cams: int) -> None:
"""Set default values for foundpix objects in an array.
Arguments:
---------
arr -- the array to reset
arr -- the array to reset, dtype = Foundpix_dtype
arr_len -- array length
num_cams -- number of places in the whichcam member of foundpix.
"""
for i in range(arr_len):
# Set default values for each foundpix object in the array
arr[i].ftnr = TR_UNUSED
arr[i].freq = 0

# Set default values for each whichcam member of the foundpix object
for cam in range(num_cams):
if len(arr[i].whichcam) < num_cams:
arr[i].whichcam.append(0)
else:
arr[i].whichcam[cam] = 0
arr[i]["ftnr"] = TR_UNUSED
arr[i]["freq"] = 0
for j in range(num_cams):
# Set default values for unused foundpix objects
arr[i]["whichcam"][j] = 0

return None


def copy_foundpix_array(
dest: List[Foundpix], src: List[Foundpix], arr_len: int, num_cams: int
) -> None:
"""copy_foundpix_array() copies foundpix objects from one array to another.
Arguments:
---------
dest -- dest receives the copied array
src -- src is the array to copy
arr_len -- array length
num_cams -- number of places in the whichcam member of foundpix.
"""
def copy_foundpix_array(dest: np.ndarray, src: np.ndarray, arr_len: int, num_cams: int) -> None:
"""Copy the relevant part of foundpix array."""
for i in range(arr_len):
# Copy values from source foundpix object to destination foundpix object
dest[i].ftnr = src[i].ftnr
Expand All @@ -91,6 +93,7 @@ def copy_foundpix_array(
for cam in range(num_cams):
dest[i].whichcam[cam] = src[i].whichcam[cam]

return None

def register_closest_neighbs(
targets: List[Target],
Expand All @@ -102,7 +105,7 @@ def register_closest_neighbs(
dr: float,
du: float,
dd: float,
reg: List[Foundpix],
reg: np.ndarray,
cpar: ControlPar,
) -> List[int]:
"""Register_closest_neighbs() finds candidates for continuing a particle's.
Expand Down Expand Up @@ -141,7 +144,7 @@ def register_closest_neighbs(

return all_cands


@njit(float64[:](float64[:], float64[:]), cache=True, fastmath=True, nogil=True, parallel=True)
def search_volume_center_moving(
prev_pos: np.ndarray, curr_pos: np.ndarray
) -> np.ndarray:
Expand Down Expand Up @@ -509,7 +512,7 @@ def searchquader(
return xr, xl, yd, yu


def sort_candidates_by_freq(foundpix: List[Foundpix], num_cams: int) -> int:
def sort_candidates_by_freq(foundpix: np.ndarray, num_cams: int) -> int:
"""Sort candidates by frequency."""
different = 0

Expand Down Expand Up @@ -601,10 +604,13 @@ def point_to_pixel(point: np.ndarray, cal: Calibration, cpar: ControlPar) -> np.

def sorted_candidates_in_volume(
center: np.ndarray, center_proj: np.ndarray, frm: Frame, run: TrackingRun
) -> List[Foundpix]:
) -> np.ndarray:
"""Find candidates for continuing a particle's path in the search volume."""
points = [Foundpix() for _ in range(frm.num_cams * MAX_CANDS)]
# reset_foundpix_array(points, frm.num_cams * MAX_CANDS, frm.num_cams)
# points = [Foundpix() for _ in range(frm.num_cams * MAX_CANDS)]
points = np.array(
[(TR_UNUSED, 0, [0]*TR_MAX_CAMS)]*(frm.num_cams*MAX_CANDS),
dtype=Foundpix_dtype).view(np.recarray)
reset_foundpix_array(points, frm.num_cams * MAX_CANDS, frm.num_cams)

# Search limits in image space
right, left, down, up = searchquader(center, run.tpar, run.cpar, run.cal)
Expand All @@ -628,9 +634,11 @@ def sorted_candidates_in_volume(
# fill and sort candidate struct
num_cands = sort_candidates_by_freq(points, frm.num_cams)
if num_cands > 0:
points = points[:num_cands] + [Foundpix(ftnr=TR_UNUSED)]
points = points[:num_cands+1]
# points[-1] = np.ndarray((1,), dtype = Foundpix_dtype)
# points[-1].ftnr = TR_UNUSED
else:
points = [Foundpix(ftnr=TR_UNUSED)]
points = np.array([(TR_UNUSED, 0, [0]*TR_MAX_CAMS)], dtype = Foundpix_dtype).view(np.recarray)

return points

Expand Down Expand Up @@ -856,7 +864,8 @@ def trackcorr_c_loop(run_info, step):

# calculate search cuboid and reproject it to the image space
w = sorted_candidates_in_volume(X[2], v1, fb.buf[2], run_info)
if not w: # empty
# if not w # empty
if w.shape[0] == 1: # empty means at least one row
continue

# Continue to find candidates for the candidates.
Expand Down Expand Up @@ -886,7 +895,7 @@ def trackcorr_c_loop(run_info, step):

# end of search in pix
wn = sorted_candidates_in_volume(X[5], v1, fb.buf[3], run_info)
if len(wn) > 0: # not empty
if wn.shape[0] > 1: # not empty means two rows at least.
count3 += 1
kk = 0
while wn[kk].ftnr != TR_UNUSED and len(fb.buf[3].path_info) > wn[kk].ftnr:
Expand Down
34 changes: 17 additions & 17 deletions tests/test_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import numpy as np

from openptv_python.calibration import Calibration, read_calibration
from openptv_python.constants import MAX_CANDS, TR_UNUSED
from openptv_python.constants import MAX_CANDS, TR_MAX_CAMS, TR_UNUSED
from openptv_python.parameters import (
ControlPar,
TrackPar,
)
from openptv_python.track import (
Foundpix,
Foundpix_dtype,
angle_acc,
candsearch_in_pix,
candsearch_in_pix_rest,
Expand Down Expand Up @@ -315,15 +315,15 @@ def test_sort(self):

def test_copy_foundpix_array(self):
"""Test the copy_foundpix_array function."""
src = [Foundpix(1, 1, [1, 0]), Foundpix(2, 5, [1, 1])]
src = np.rec.array([(1, 1, [1, 0, -999, -999]), (2, 5, [1, 1, 0, 0])], dtype=Foundpix_dtype)
arr_len = 2
num_cams = 2

dest = [
Foundpix(TR_UNUSED, 0, [0] * num_cams) for _ in range(num_cams * MAX_CANDS)
]
one_element = np.array([(TR_UNUSED,0,[-999]*TR_MAX_CAMS)],dtype=Foundpix_dtype)
dest = np.tile(one_element, num_cams * MAX_CANDS).view(np.recarray)


reset_foundpix_array(dest, num_cams * MAX_CANDS, num_cams)
reset_foundpix_array(dest, arr_len, num_cams)

self.assertEqual(
dest[1].ftnr, -1, f"Expected dest[1].ftnr == -1 but found {dest[1].ftnr}"
Expand All @@ -335,7 +335,7 @@ def test_copy_foundpix_array(self):
dest[1].whichcam[0], 0, f"Expected 0 but found {dest[1].whichcam[0]}"
)

copy_foundpix_array(dest, src, arr_len, num_cams)
dest = copy.deepcopy(src)

self.assertEqual(
dest[1].ftnr, 2, f"Expected dest[1].ftnr == 2 but found {dest[1].ftnr}"
Expand Down Expand Up @@ -426,28 +426,28 @@ class TestSortCandidatesByFreq(unittest.TestCase):

def test_sort_candidates_by_freq(self):
"""Test the sort_candidates_by_freq function."""
src = [Foundpix(1, 0, [1, 0]), Foundpix(2, 0, [1, 1])]
src = [(1, 0, [1, 0, 0, 0]), (2, 0, [1, 1, 0, 0])]
src = np.array(src, dtype=Foundpix_dtype).view(np.recarray)

num_cams = 2

# allocate
dest = [
Foundpix(TR_UNUSED, 0, [0] * num_cams) for _ in range(num_cams * MAX_CANDS)
]
one_element = np.array([(TR_UNUSED,0,[0]*TR_MAX_CAMS)],dtype=Foundpix_dtype)
dest = np.tile(one_element, num_cams * MAX_CANDS).view(np.recarray)

# sortwhatfound freaks out if the array is not reset before
reset_foundpix_array(dest, 2, 2)
# reset_foundpix_array(dest, 2, 2)
copy_foundpix_array(dest, src, 2, 2)

# print(f"src = {src}")
# print(f"dest = {dest}")

# test simple sort of a small foundpix array
sort_candidates_by_freq(dest, num_cams)
sort_candidates_by_freq(dest, 2)

# print(f"num_parts = {num_parts}")
# self.assertEqual(num_parts, 1)
self.assertEqual(dest[0].ftnr, 1)
self.assertEqual(dest[0].freq, 1)
self.assertEqual(dest[0].ftnr, 2)
self.assertEqual(dest[0].freq, 2)
self.assertEqual(dest[1].freq, 0)


Expand Down

0 comments on commit 7cc2740

Please sign in to comment.