Skip to content

Commit

Permalink
python: Start of re-working thor and util functions for integer math.
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanvolz committed Feb 12, 2021
1 parent eccf3d0 commit 3b3c113
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 11 deletions.
138 changes: 136 additions & 2 deletions python/digital_rf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import ast
import datetime
import fractions

import dateutil.parser
import numpy as np
Expand All @@ -32,12 +33,141 @@
epoch = datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)


def get_samplerate_frac(sr_or_numerator, denominator=None):
"""Convert argument sample rate to a rational Fraction.
Arguments are passed directly to the fractions.Fraction class, and the denominator
of the result is limited to 32 bits.
Parameters
----------
sr_or_numerator : int | float | numpy.number | Rational | Decimal | str
Sample rate in Hz, or the numerator of the sample rate if `denominator` is
given. Most numeric types are accepted, falling back to evaluating the argument
as a string if passing directly to fractions.Fraction fails. String arguments
can represent the sample rate or a rational expression like "123/456".
denominator: int | Rational, optional
Denominator of the sample rate in Hz, if not None. Must be an integer or
a Rational type, as expected by the `denominator` argument of
fractions.Fraction.
Returns
-------
frac : fractions.Fraction
Rational representation of the sample rate.
"""
try:
frac = fractions.Fraction(sr_or_numerator, denominator)
except TypeError:
# try converting sr to str, then to fraction (works for np.longdouble)
sr_frac = fractions.Fraction(str(sr_or_numerator))
frac = fractions.Fraction(sr_frac, denominator)
return frac.limit_denominator(2 ** 32)


def time_to_sample_ceil(timedelta, samples_per_second):
"""Convert a timedelta into a number of samples using a given sample rate.
Ceiling rounding is used so that the value is the whole number of samples
that spans *at least* the given `timedelta` but no more than
``timedelta + 1 / samples_per_second``. This complements the flooring in
`sample_to_time_floor`, so that::
time_to_sample_ceil(sample_to_time_floor(sample, sps), sps) == sample
Parameters
----------
timedelta : (second, picosecond) tuple | np.timedelta64 | datetime.timedelta | float
Time span to convert to a number of samples. To represent large time spans
with high accuracy, pass a 2-tuple of ints containing the number of whole
seconds and additional picoseconds. Float values are interpreted as a
number of seconds.
samples_per_second : fractions.Fraction | arg tuple for ``get_samplerate_frac``
Sample rate in Hz.
Returns
-------
nsamples : int
Number of samples in the `timedelta` time span at a rate of
`samples_per_second`, using ceiling rounding (up to the next whole sample).
"""
if isinstance(timedelta, tuple):
t_sec, t_psec = timedelta
elif isinstance(timedelta, np.timedelta64):
onesec = np.timedelta64(1, "s")
t_sec = timedelta // onesec
t_psec = (timedelta % onesec) // np.timedelta64(1, "ps")
elif isinstance(timedelta, datetime.timedelta):
t_sec = int(timedelta.total_seconds())
t_psec = 1000000 * timedelta.microseconds
else:
t_sec = int(timedelta)
t_psec = int(np.round((timedelta % 1) * 1e12))
# ensure that samples_per_seconds is a fractions.Fraction
if not isinstance(samples_per_second, fractions.Fraction):
samples_per_second = get_samplerate_frac(*samples_per_second)
# calculate rational values for the second and picosecond parts
s_frac = t_sec * samples_per_second + t_psec * samples_per_second / 10 ** 12
# get an integer value through ceiling rounding
return int(s_frac) + ((s_frac % 1) != 0)


def sample_to_time_floor(nsamples, samples_per_second):
"""Convert a number of samples into a timedelta using a given sample rate.
Floor rounding is used so that the given whole number of samples spans
*at least* the returned amount of time, accurate to the picosecond.
This complements the ceiling rounding in `time_to_sample_ceil`, so that::
time_to_sample_ceil(sample_to_time_floor(sample, sps), sps) == sample
Parameters
----------
nsamples : int
Whole number of samples to convert into a span of time.
samples_per_second : fractions.Fraction | arg tuple for ``get_samplerate_frac``
Sample rate in Hz.
Returns
-------
seconds : int
Number of whole seconds in the time span covered by `nsamples` at a rate
of `samples_per_second`.
picoseconds : int
Number of additional picoseconds in the time span covered by `nsamples`,
using floor rounding (down to the previous whole number of picoseconds).
"""
nsamples = int(nsamples)
# ensure that samples_per_seconds is a fractions.Fraction
if not isinstance(samples_per_second, fractions.Fraction):
samples_per_second = get_samplerate_frac(*samples_per_second)

# get the timedelta as a Fraction
t_frac = nsamples / samples_per_second

seconds = int(t_frac)
picoseconds = int((t_frac % 1) * 10 ** 12)

return (seconds, picoseconds)


def time_to_sample(time, samples_per_second):
"""Get a sample index from a time using a given sample rate.
Parameters
----------
time : datetime | float
Time corresponding to the desired sample index. If not given as a
datetime object, the numeric value is interpreted as a UTC timestamp
Expand All @@ -49,7 +179,6 @@ def time_to_sample(time, samples_per_second):
Returns
-------
sample_index : int
Index to the identified sample given in the number of samples since
the epoch (time_since_epoch*sample_per_second).
Expand Down Expand Up @@ -80,6 +209,11 @@ def sample_to_datetime(sample, samples_per_second):
samples_per_second : np.longdouble
Sample rate in Hz.
epoch : datetime, optional
Epoch time for converting absolute `time` values to a number of seconds
since `epoch`. If None, the Digital RF default (the Unix epoch,
January 1, 1970) is used.
Returns
-------
Expand Down
17 changes: 8 additions & 9 deletions python/tools/thor.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,9 @@ def _usrp_setup(self):
# (integer division of clock rate)
cr = op.clock_rates[0]
srdec = int(round(cr / samplerate))
op.samplerate_frac = Fraction(cr).limit_denominator(2 ** 32) / srdec
samplerate_ld = np.longdouble(cr) / srdec
op.samplerate = samplerate_ld
op.samplerate_frac = Fraction(cr).limit_denominator() / srdec

# set per-channel options
# set command time so settings are synced
Expand Down Expand Up @@ -667,7 +667,7 @@ def _usrp_setup(self):
info["lo_off"] = op.lo_offsets[ch_num]
info["lo_source"] = op.lo_sources[ch_num]
info["lo_export"] = op.lo_exports[ch_num]
info["sr"] = op.samplerate
info["sr"] = op.samplerate_frac
print(chinfo.format(**info))
print("-" * 78)

Expand All @@ -683,19 +683,18 @@ def _finalize_options(self):
op.channelizer_filter_taps = []
op.channelizer_filter_delays = []
for ko, (osr, nsc) in enumerate(zip(op.ch_samplerates, op.ch_nsubchannels)):
# get output sample rate fraction
# get output resampling ratio
# (op.samplerate_frac final value is set in _usrp_setup
# so can't get output sample rate until after that is done)
if osr is None:
ch_samplerate_frac = op.samplerate_frac
ratio = Fraction(1)
else:
ch_samplerate_frac = Fraction(osr).limit_denominator()
op.ch_samplerates_frac.append(ch_samplerate_frac)

# get resampling ratio
ratio = ch_samplerate_frac / op.samplerate_frac
ratio = (Fraction(osr) / op.samplerate_frac).limit_denominator(2 ** 16)
op.resampling_ratios.append(ratio)

# get output samplerate fraction
op.ch_samplerates_frac.append(op.samplerate_frac * ratio)

# get resampling low-pass filter taps
if ratio == 1:
op.resampling_filter_taps.append(np.zeros(0))
Expand Down

0 comments on commit 3b3c113

Please sign in to comment.