diff --git a/amodem/__main__.py b/amodem/__main__.py index dc946a9..ca14a99 100644 --- a/amodem/__main__.py +++ b/amodem/__main__.py @@ -34,8 +34,7 @@ def __init__(self, stream): def read(self, size): while True: - data = self.stream.read(size) - if data: + if data := self.stream.read(size): result = self.obj.compress(data) if not result: # compression is too good :) continue # try again (since falsy data = EOF) @@ -88,12 +87,12 @@ def opener(fname): def get_volume_cmd(args): - volume_controllers = [ - dict(test='pactl --version', - send='pactl set-sink-volume @DEFAULT_SINK@', - recv='pactl set-source-volume @DEFAULT_SOURCE@') - ] if args.calibrate == 'auto': + volume_controllers = [ + dict(test='pactl --version', + send='pactl set-sink-volume @DEFAULT_SINK@', + recv='pactl set-source-volume @DEFAULT_SOURCE@') + ] for c in volume_controllers: if os.system(c['test']) == 0: return c[args.command] diff --git a/amodem/calib.py b/amodem/calib.py index 59f7f45..260d1bb 100644 --- a/amodem/calib.py +++ b/amodem/calib.py @@ -70,11 +70,7 @@ def detector(config, src, frame_length=200): flags = [total > 0.1, peak < 1.0, coherency > 0.99] success = all(flags) - if success: - msg = 'good signal' - else: - msg = f'too {errors[flags.index(False)]} signal' - + msg = 'good signal' if success else f'too {errors[flags.index(False)]} signal' yield dict( freq=freq, rms=rms, peak=peak, coherency=coherency, total=total, success=success, msg=msg @@ -123,10 +119,13 @@ def recv_iter(config, src, volume_cmd=None, dump_audio=None): result_iterator = volume_calibration(result_iterator, volume_ctl) for _prev, curr, _next in iter_window(result_iterator, size=3): # don't log errors during frequency changes - if _prev['success'] and _next['success']: - if _prev['freq'] != _next['freq']: - if not curr['success']: - curr['msg'] = 'frequency change' + if ( + _prev['success'] + and _next['success'] + and _prev['freq'] != _next['freq'] + and not curr['success'] + ): + curr['msg'] = 'frequency change' yield curr diff --git a/amodem/detect.py b/amodem/detect.py index be47285..d99a3c4 100644 --- a/amodem/detect.py +++ b/amodem/detect.py @@ -92,8 +92,7 @@ def find_start(self, buf): index = np.argmax(coeffs) log.info('Carrier coherence: %.3f%%', coeffs[index] * 100) - offset = index + len(zeroes) - return offset + return index + len(zeroes) def estimate(self, buf, skip=5): filt = dsp.exp_iwt(-self.omega, self.Nsym) / (0.5 * self.Nsym) diff --git a/amodem/dsp.py b/amodem/dsp.py index c0afd45..ae8cc5a 100644 --- a/amodem/dsp.py +++ b/amodem/dsp.py @@ -54,9 +54,7 @@ def coherence(x, omega): n = len(x) Hc = exp_iwt(-omega, n) / np.sqrt(0.5*n) norm_x = norm(x) - if not norm_x: - return 0.0 - return np.dot(Hc, x) / norm_x + return np.dot(Hc, x) / norm_x if norm_x else 0.0 def linear_regression(x, y): diff --git a/amodem/equalizer.py b/amodem/equalizer.py index 51f2227..11ecd2b 100644 --- a/amodem/equalizer.py +++ b/amodem/equalizer.py @@ -21,10 +21,10 @@ def train_symbols(self, length, constant_prefix=16): r = dsp.prbs(reg=1, poly=0x1100b, bits=2) constellation = [1, 1j, -1, -1j] - symbols = [] - for _ in range(length): - symbols.append([constellation[next(r)] for _ in range(self.Nfreq)]) - + symbols = [ + [constellation[next(r)] for _ in range(self.Nfreq)] + for _ in range(length) + ] symbols = np.array(symbols) # Constant symbols (for analog debugging) symbols[:constant_prefix, :] = 1 @@ -32,9 +32,7 @@ def train_symbols(self, length, constant_prefix=16): def modulator(self, symbols): gain = 1.0 / len(self.carriers) - result = [] - for s in symbols: - result.append(np.dot(s, self.carriers)) + result = [np.dot(s, self.carriers) for s in symbols] result = np.concatenate(result).real * gain assert np.max(np.abs(result)) <= 1 return result diff --git a/amodem/framing.py b/amodem/framing.py index 946e4b9..e816375 100644 --- a/amodem/framing.py +++ b/amodem/framing.py @@ -95,8 +95,8 @@ def __init__(self): bits = [index & (2 ** k) for k in range(self.byte_size)] bits_list.append(tuple((1 if b else 0) for b in bits)) - self.to_bits = dict((i, bits) for i, bits in enumerate(bits_list)) - self.to_byte = dict((bits, i) for i, bits in enumerate(bits_list)) + self.to_bits = dict(enumerate(bits_list)) + self.to_byte = {bits: i for i, bits in enumerate(bits_list)} @chain_wrapper diff --git a/amodem/recv.py b/amodem/recv.py index 0ccc272..d72fcdf 100644 --- a/amodem/recv.py +++ b/amodem/recv.py @@ -130,7 +130,7 @@ def _handler(received, decoded, freq): log.info('Starting demodulation') for i, block_of_bits in enumerate(stream, 1): for bits in block_of_bits: - self.stats['rx_bits'] = self.stats['rx_bits'] + len(bits) + self.stats['rx_bits'] += len(bits) yield bits if i % self.iters_per_update == 0: diff --git a/amodem/sampling.py b/amodem/sampling.py index 8bcfdd9..a9705ae 100644 --- a/amodem/sampling.py +++ b/amodem/sampling.py @@ -26,7 +26,7 @@ def __init__(self, resolution=1024, width=128): lengths = [len(f) for f in self.filt] self.coeff_len = 2 * width - assert set(lengths) == set([self.coeff_len]) # verify same lengths + assert set(lengths) == {self.coeff_len} assert len(self.filt) == resolution diff --git a/amodem/stream.py b/amodem/stream.py index e542dbe..c81b448 100644 --- a/amodem/stream.py +++ b/amodem/stream.py @@ -19,8 +19,7 @@ def __iter__(self): def next(self): block = bytearray() if self.eof: - data = self.fd.read(self.bufsize) - if data: + if data := self.fd.read(self.bufsize): self.total += len(data) block.extend(data) return block @@ -29,8 +28,7 @@ def next(self): finish_time = time.time() + self.timeout while time.time() <= finish_time: left = self.bufsize - len(block) - data = self.fd.read(left) - if data: + if data := self.fd.read(left): self.total += len(data) block.extend(data) diff --git a/amodem/tests/test_calib.py b/amodem/tests/test_calib.py index 8da5b4e..9f7971d 100644 --- a/amodem/tests/test_calib.py +++ b/amodem/tests/test_calib.py @@ -54,7 +54,7 @@ def test_too_weak(): def test_too_noisy(): r = random.Random(0) # generate random binary signal - signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))]) + signal = np.array([r.choice([-1, 1]) for _ in range(int(config.Fs))]) src = BytesIO(common.dumps(signal * 0.5)) for r in calib.detector(config, src=src): assert not r['success'] diff --git a/amodem/tests/test_common.py b/amodem/tests/test_common.py index 34745e2..add0c83 100644 --- a/amodem/tests/test_common.py +++ b/amodem/tests/test_common.py @@ -5,10 +5,9 @@ def iterlist(x, *args, **kwargs): x = np.array(x) - return list( - (i, list(x)) - for i, x in common.iterate(x, index=True, *args, **kwargs) - ) + return [ + (i, list(x)) for i, x in common.iterate(x, index=True, *args, **kwargs) + ] def test_iterate(): @@ -35,9 +34,7 @@ def test_split(): def test_icapture(): x = range(100) y = [] - z = [] - for i in common.icapture(x, result=y): - z.append(i) + z = list(common.icapture(x, result=y)) assert list(x) == y assert list(x) == z diff --git a/amodem/tests/test_dsp.py b/amodem/tests/test_dsp.py index 9a16e58..7d5c89f 100644 --- a/amodem/tests/test_dsp.py +++ b/amodem/tests/test_dsp.py @@ -69,7 +69,7 @@ def quantize(q, s): def test_overflow(): q = dsp.MODEM(config.symbols) r = np.random.RandomState(seed=0) - for i in range(10000): + for _ in range(10000): s = 10*(r.normal() + 1j * r.normal()) quantize(q, s) @@ -88,6 +88,5 @@ def test_prbs(): assert r == [1, 2, 0, 1, 3, 3, 2, 1] period = 2 ** 16 - 1 - r = list(itertools.islice(dsp.prbs(reg=1, poly=0x1100b, bits=16), period)) - r.sort() + r = sorted(itertools.islice(dsp.prbs(reg=1, poly=0x1100b, bits=16), period)) assert r == list(range(1, 2 ** 16)) diff --git a/amodem/tests/test_framing.py b/amodem/tests/test_framing.py index 1465d05..8eec725 100644 --- a/amodem/tests/test_framing.py +++ b/amodem/tests/test_framing.py @@ -10,7 +10,7 @@ def concat(iterable): r = random.Random(0) -blob = bytearray(r.randrange(0, 256) for i in range(64 * 1024)) +blob = bytearray(r.randrange(0, 256) for _ in range(64 * 1024)) @pytest.fixture(params=[b'', b'abc', b'1234567890', blob, blob[:12345]]) diff --git a/amodem/tests/test_stream.py b/amodem/tests/test_stream.py index 9f8de83..667f8dc 100644 --- a/amodem/tests/test_stream.py +++ b/amodem/tests/test_stream.py @@ -23,12 +23,9 @@ def test_read(): result = list(zip(range(10), f)) p.kill() - j = 0 - for i, buf in result: + for j, (i, buf) in enumerate(result): assert i == j assert len(buf) == f.bufsize - j += 1 - try: next(f) except IOError as e: