diff --git a/examples/passthrough.py b/examples/passthrough.py index eb329ae..b9d1983 100755 --- a/examples/passthrough.py +++ b/examples/passthrough.py @@ -29,7 +29,16 @@ def __init__(self, args): self.idx = 0 self.total_bytes = 0 self.queue = queue.SimpleQueue() - self.data, self.sr = sf.read(args.input_file, dtype='int16', always_2d=True) + + info = sf.info(str(args.input_file)) + if info.subtype == 'PCM_16': + dtype = 'int16' + elif info.subtype == 'PCM_32': + dtype = 'int32' + else: + raise ValueError(f'WAV input data type must be either PCM_16 or PCM_32: Got {info.subtype}') + + self.data, self.sr = sf.read(args.input_file, dtype=dtype, always_2d=True) self.encoder = pyflac.StreamEncoder( write_callback=self.encoder_callback, diff --git a/pyflac/decoder.py b/pyflac/decoder.py index bc8b80a..625b957 100644 --- a/pyflac/decoder.py +++ b/pyflac/decoder.py @@ -160,6 +160,8 @@ def __init__(self, self._done = False self._buffer = deque() + self._event = threading.Event() + self._lock = threading.Lock() self.write_callback = write_callback rc = _lib.FLAC__stream_decoder_init_stream( @@ -201,7 +203,10 @@ def process(self, data: bytes): Args: data (bytes): Bytes of FLAC data """ + self._lock.acquire() self._buffer.append(data) + self._lock.release() + self._event.set() def finish(self): """ @@ -226,6 +231,7 @@ def finish(self): # Instruct the decoder to finish up and wait until it is done # -------------------------------------------------------------- self._done = True + self._event.set() self._thread.join(timeout=3) super().finish() if self._error: @@ -316,13 +322,11 @@ def _read_callback(_decoder, """ decoder = _ffi.from_handle(client_data) - while len(decoder._buffer) == 0 and not (decoder._error or decoder._done): - # ---------------------------------------------------------- - # Wait until there is something in the buffer, or an error - # occurs, or the end of the stream is reached. - # ---------------------------------------------------------- - time.sleep(0.01) - + # ---------------------------------------------------------- + # Wait until there is something in the buffer, or an error + # occurs, or the end of the stream is reached. + # ---------------------------------------------------------- + decoder._event.wait() if decoder._error: # ---------------------------------------------------------- # If an error has been issued via the error callback, then @@ -345,16 +349,28 @@ def _read_callback(_decoder, data = bytes() maximum_bytes = int(num_bytes[0]) if len(decoder._buffer[0]) <= maximum_bytes: + decoder._lock.acquire() data = decoder._buffer.popleft() + decoder._lock.release() maximum_bytes -= len(data) if len(decoder._buffer) > 0 and len(decoder._buffer[0]) > maximum_bytes: + decoder._lock.acquire() data += decoder._buffer[0][0:maximum_bytes] decoder._buffer[0] = decoder._buffer[0][maximum_bytes:] + decoder._lock.release() actual_bytes = len(data) num_bytes[0] = actual_bytes _ffi.memmove(byte_buffer, data, actual_bytes) + + # -------------------------------------------------------------- + # If there is no more data to process from the buffer, then + # clear the event, the thread will await more data to process. + # -------------------------------------------------------------- + if len(decoder._buffer) == 0 or (len(decoder._buffer) > 0 and len(decoder._buffer[0]) == 0): + decoder._event.clear() + return _lib.FLAC__STREAM_DECODER_READ_STATUS_CONTINUE @@ -452,3 +468,4 @@ def _error_callback(_decoder, _lib.FLAC__StreamDecoderErrorStatusString[status]).decode() decoder.logger.error(f'Error in libFLAC decoder: {message}') decoder._error = message + decoder._event.set()