Skip to content

Commit

Permalink
Merge pull request #122 from NTIA/multiple-antenna-inputs
Browse files Browse the repository at this point in the history
Add test for multi-antennas
  • Loading branch information
aromanielloNTIA authored Jul 17, 2024
2 parents d7ace3b + 3b5d16f commit 23761e7
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 35 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default_language_version:
python: python3.8
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v4.6.0
hooks:
- id: check-ast
types: [file, python]
Expand All @@ -18,7 +18,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.2
rev: v3.16.0
hooks:
- id: pyupgrade
args: ["--py38-plus"]
Expand All @@ -30,12 +30,12 @@ repos:
types: [file, python]
args: ["--profile", "black", "--filter-files", "--gitignore"]
- repo: https://github.com/psf/black
rev: 24.3.0
rev: 24.4.2
hooks:
- id: black
types: [file, python]
- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.39.0
rev: v0.41.0
hooks:
- id: markdownlint
types: [file, markdown]
Expand Down
4 changes: 2 additions & 2 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def run(self, iqdata: np.ndarray) -> list:
retrieve the processed results. The order is [FFT, PVT, PFP, APD].
"""
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(self.iir_sos, iqdata))
iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
Expand Down Expand Up @@ -490,7 +490,7 @@ def __init__(self, parameters: dict):
self.iir_sb_edge_Hz,
self.sample_rate_Hz,
)
self.iir_numerators, self.iir_denominators = sos2tf(self.iir_sos)
self.iir_numerators, self.iir_denominators = sos2tf(sos=self.iir_sos)

# Remove IIR parameters which aren't needed after filter generation
for key in [
Expand Down
8 changes: 6 additions & 2 deletions scos_actions/actions/calibrate_y_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,12 @@ def calibrate(self, params: dict):
# Estimate of IIR filter ENBW does NOT account for passband ripple in sensor transfer function!
enbw_hz = self.iir_enbw_hz
logger.debug("Applying IIR filter to IQ captures")
noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"])
noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"])
noise_on_data = sosfilt(
sos=self.iir_sos, x=noise_on_measurement_result["data"]
)
noise_off_data = sosfilt(
sos=self.iir_sos, x=noise_off_measurement_result["data"]
)
else:
logger.debug("Skipping IIR filtering")
# Get ENBW from sensor calibration
Expand Down
22 changes: 22 additions & 0 deletions scos_actions/calibration/tests/test_differential_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,34 @@ def setup_differential_calibration_file(self, tmp_path: Path):
"calibration_reference": "antenna input",
"calibration_data": {3555e6: 11.5},
}
dict_to_json_multiple_antenna = {
"calibration_parameters": ["rf_path", "frequency"],
"calibration_reference": "antenna input",
"calibration_data": {
"antenna1": {3555e6: 11.5},
"antenna2": {3555e6: 21.5},
},
}
self.valid_file_path = tmp_path / "sample_diff_cal.json"
self.valid_file_path_multiple_antenna = (
tmp_path / "sample_diff_cal_multiple_antenna.json"
)
self.invalid_file_path = tmp_path / "sample_diff_cal_invalid.json"

self.sample_diff_cal = DifferentialCalibration(
file_path=self.valid_file_path, **dict_to_json
)
self.sample_diff_cal_multiple_antenna = DifferentialCalibration(
file_path=self.valid_file_path_multiple_antenna,
**dict_to_json_multiple_antenna,
)

with open(self.valid_file_path, "w") as f:
f.write(json.dumps(dict_to_json))

with open(self.valid_file_path_multiple_antenna, "w") as f:
f.write(json.dumps(dict_to_json_multiple_antenna))

dict_to_json.pop("calibration_reference", None)

with open(self.invalid_file_path, "w") as f:
Expand All @@ -35,6 +53,10 @@ def test_from_json(self):
"""Check from_json functionality with valid and invalid dummy data."""
diff_cal = DifferentialCalibration.from_json(self.valid_file_path)
assert diff_cal == self.sample_diff_cal
diff_cal_multiple_antenna = DifferentialCalibration.from_json(
self.valid_file_path_multiple_antenna
)
assert diff_cal_multiple_antenna == self.sample_diff_cal_multiple_antenna
with pytest.raises(Exception):
_ = DifferentialCalibration.from_json(self.invalid_file_path)

Expand Down
4 changes: 2 additions & 2 deletions scos_actions/signal_processing/fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_fft(
time_data *= fft_window

# Take the FFT
complex_fft = sp_fft(time_data, norm=norm, workers=workers)
complex_fft = sp_fft(x=time_data, norm=norm, workers=workers)

# Shift the frequencies if desired
if shift:
Expand Down Expand Up @@ -129,7 +129,7 @@ def get_fft_window(window_type: str, window_length: int) -> np.ndarray:
window_type = "hann"

# Get window samples
window = get_window(window_type, window_length, fftbins=True)
window = get_window(window=window_type, Nx=window_length, fftbins=True)

# Return the window
return window
Expand Down
40 changes: 28 additions & 12 deletions scos_actions/signal_processing/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,23 @@ def generate_elliptic_iir_low_pass_filter(
+ f"edge frequency {pb_edge_Hz} Hz."
)
ord, wn = ellipord(
pb_edge_Hz, sb_edge_Hz, gpass_dB, gstop_dB, False, sample_rate_Hz
wp=pb_edge_Hz,
ws=sb_edge_Hz,
gpass=gpass_dB,
gstop=gstop_dB,
analog=False,
fs=sample_rate_Hz,
)
sos = ellip(
N=ord,
rp=gpass_dB,
rs=gstop_dB,
Wn=wn,
btype="lowpass",
analog=False,
output="sos",
fs=sample_rate_Hz,
)
sos = ellip(ord, gpass_dB, gstop_dB, wn, "lowpass", False, "sos", sample_rate_Hz)
return sos


Expand All @@ -62,14 +76,16 @@ def generate_fir_low_pass_filter(
:param sample_rate_Hz: Sampling rate, in Hz.
:return: Coeffiecients of the FIR low pass filter.
"""
ord, beta = kaiserord(attenuation_dB, width_Hz / (0.5 * sample_rate_Hz))
ord, beta = kaiserord(
ripple=attenuation_dB, width=width_Hz / (0.5 * sample_rate_Hz)
)
taps = firwin(
ord + 1,
cutoff_Hz,
width_Hz,
("kaiser", beta),
"lowpass",
True,
numtaps=ord + 1,
cutoff=cutoff_Hz,
width=width_Hz,
window=("kaiser", beta),
pass_zero="lowpass",
scale=True,
fs=sample_rate_Hz,
)
return taps
Expand All @@ -91,7 +107,7 @@ def get_iir_frequency_response(
The second is the array containing the frequency response values, which
are complex values in linear units.
"""
w, h = sosfreqz(sos, worN, whole=True, fs=sample_rate_Hz)
w, h = sosfreqz(sos=sos, worN=worN, whole=True, fs=sample_rate_Hz)
return w, h


Expand All @@ -110,7 +126,7 @@ def get_iir_phase_response(
frequencies, in Hz, for which the phase response was calculated.
The second is the array containing the phase response values, in radians.
"""
w, h = sosfreqz(sos, worN, whole=False, fs=sample_rate_Hz)
w, h = sosfreqz(sos=sos, worN=worN, whole=False, fs=sample_rate_Hz)
angles = np.unwrap(np.angle(h))
return w, angles

Expand Down Expand Up @@ -156,6 +172,6 @@ def is_stable(sos: np.ndarray) -> bool:
:param sos: Second-order sections representation of the IIR filter.
:return: True if the filter is stable, False if not.
"""
_, poles, _ = sos2zpk(sos)
_, poles, _ = sos2zpk(sos=sos)
stable = all([p < 1 for p in np.square(np.abs(poles))])
return stable
12 changes: 6 additions & 6 deletions scos_actions/signal_processing/tests/test_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_get_fft():
fft_size = 1024
num_ffts = 5
signal_amplitude = 500
window = get_window("flattop", fft_size, True)
window = get_window(window="flattop", Nx=fft_size, fftbins=True)
window_acf = window_amplitude_correction(window)

# Generated signal is constant: the FFT should be zero in all bins except
Expand Down Expand Up @@ -175,7 +175,7 @@ def test_get_fft_window():
# generated using SciPy
for w_type in supported_window_types:
win = fft.get_fft_window(w_type, 1024)
true_win = get_window(w_type, 1024, True)
true_win = get_window(window=w_type, Nx=1024, fftbins=True)
assert isinstance(win, np.ndarray)
assert win.size == 1024
assert np.array_equal(win, true_win)
Expand All @@ -193,10 +193,10 @@ def test_get_fft_window():
# Check that input formatting works as expected
for bad_w_type, true_w_type in window_alt_format_map.items():
win = fft.get_fft_window(bad_w_type, 1024)
true_win = get_window(true_w_type, 1024, True)
true_win = get_window(window=true_w_type, Nx=1024, fftbins=True)
assert np.array_equal(win, true_win)
with pytest.raises(ValueError, match="Unknown window type."):
_ = get_window(bad_w_type, 1024, True)
_ = get_window(window=bad_w_type, Nx=1024, fftbins=True)


def test_get_fft_window_correction():
Expand All @@ -216,7 +216,7 @@ def test_get_fft_window_correction():

# Function under test should raise a ValueError for invalid correction type
bad_correction_type = ["amp", "e", "both", "other"]
test_good_window = get_window("flattop", 1024, True)
test_good_window = get_window(window="flattop", Nx=1024, fftbins=True)
for ct in bad_correction_type:
with pytest.raises(ValueError, match=f"Invalid window correction type: {ct}"):
_ = fft.get_fft_window_correction(test_good_window, ct)
Expand Down Expand Up @@ -253,7 +253,7 @@ def test_get_fft_enbw():
fft_size = 1024
sample_rate__Hz = 14e6
for w_type in window_types:
window = get_window(w_type, fft_size, True)
window = get_window(window=w_type, Nx=fft_size, fftbins=True)
true_fft_bin_enbw__Hz = fft_bin_enbw(window, sample_rate__Hz)
test_fft_bin_enbw__Hz = fft.get_fft_enbw(window, sample_rate__Hz)
assert isinstance(test_fft_bin_enbw__Hz, float)
Expand Down
31 changes: 24 additions & 7 deletions scos_actions/signal_processing/tests/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ def sr(): # Sample rate, Hz

@pytest.fixture
def example_sos(gpass, gstop, pb, sb, sr):
o, w = ellipord(pb, sb, gpass, gstop, False, sr)
return ellip(o, gpass, gstop, w, "lowpass", False, "sos", sr)
o, w = ellipord(wp=pb, ws=sb, gpass=gpass, gstop=gstop, analog=False, fs=sr)
return ellip(
N=o,
rp=gpass,
rs=gstop,
Wn=w,
btype="lowpass",
analog=False,
output="sos",
fs=sr,
)


def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb, sr):
Expand All @@ -56,8 +65,16 @@ def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb
def test_generate_fir_low_pass_filter():
# Same approach as above for IIR: basically duplicate the functionality here
att, wid, xoff, sr = 10, 100, 1000, 10e3
o, b = kaiserord(att, wid / (0.5 * sr))
true_taps = firwin(o + 1, xoff, wid, ("kaiser", b), "lowpass", True, fs=sr)
o, b = kaiserord(ripple=att, width=wid / (0.5 * sr))
true_taps = firwin(
numtaps=o + 1,
cutoff=xoff,
width=wid,
window=("kaiser", b),
pass_zero="lowpass",
scale=True,
fs=sr,
)
test_taps = filtering.generate_fir_low_pass_filter(att, wid, xoff, sr)
assert isinstance(test_taps, np.ndarray)
assert test_taps.shape == (o + 1,)
Expand All @@ -66,7 +83,7 @@ def test_generate_fir_low_pass_filter():

def test_get_iir_frequency_response(example_sos, pb, sb, sr):
for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]:
true_w, true_h = sosfreqz(example_sos, worN, True, sr)
true_w, true_h = sosfreqz(sos=example_sos, worN=worN, whole=True, fs=sr)
test_w, test_h = filtering.get_iir_frequency_response(example_sos, worN, sr)
if isinstance(worN, int):
assert all(len(x) == worN for x in [test_w, test_h])
Expand All @@ -78,7 +95,7 @@ def test_get_iir_frequency_response(example_sos, pb, sb, sr):

def test_get_iir_phase_response(example_sos, pb, sb, sr):
for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]:
true_w, h = sosfreqz(example_sos, worN, False, sr)
true_w, h = sosfreqz(sos=example_sos, worN=worN, whole=False, fs=sr)
true_angles = np.unwrap(np.angle(h))
test_w, test_angles = filtering.get_iir_phase_response(example_sos, worN, sr)
if isinstance(worN, int):
Expand All @@ -103,6 +120,6 @@ def test_is_stable(example_sos):
stable_test = filtering.is_stable(example_sos)
assert isinstance(stable_test, bool)
assert stable_test is True
_, poles, _ = sos2zpk(example_sos)
_, poles, _ = sos2zpk(sos=example_sos)
stable_true = all([p < 1 for p in np.square(np.abs(poles))])
assert stable_true == stable_test

0 comments on commit 23761e7

Please sign in to comment.