From 6e34ad44e3ebbaa2a84c1a2991d576886e0f619f Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Fri, 1 Nov 2024 16:24:40 +0000 Subject: [PATCH 1/6] Remove runtime configuration of bitrate for StreamSplitter - Add support for dynamic reconfiguraton of drop rate - Remove methods and events associates with runtime reconfiguration - Fix minor bug in dissemination of data rate --- inc/streams/DataStream.h | 15 --- inc/streams/EffectFilter.h | 1 - inc/streams/StreamFlowTrigger.h | 37 ------- inc/streams/StreamNormalizer.h | 2 - inc/streams/StreamSplitter.h | 14 +-- source/streams/DataStream.cpp | 11 --- source/streams/EffectFilter.cpp | 5 - source/streams/StreamFlowTrigger.cpp | 63 ------------ source/streams/StreamNormalizer.cpp | 9 +- source/streams/StreamSplitter.cpp | 138 +++++++++------------------ 10 files changed, 53 insertions(+), 242 deletions(-) delete mode 100644 inc/streams/StreamFlowTrigger.h delete mode 100644 source/streams/StreamFlowTrigger.cpp diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index dc69e5f6..55ce29ee 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -55,7 +55,6 @@ namespace codal class DataSink { public: - virtual int pullRequest(); }; @@ -72,7 +71,6 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); - virtual float requestSampleRate(float sampleRate); }; /** @@ -198,19 +196,6 @@ namespace codal */ virtual float getSampleRate() override; - /** - * Request a new sample rate on this stream. - * - * Most components will simply forward this call upstream, and upon reaching a data source, if possible the source should change - * the sample rate to accomodate the request. - * - * @warning Not all sample rates will be possible for all devices, so if the caller needs to know the _actual_ rate, they should check the returned value here - * - * @param sampleRate The requested sample rate, to be handled by the nearest component capable of doing so. - * @return float The actual sample rate this stream will now run at, may differ from the requested sample rate. - */ - virtual float requestSampleRate(float sampleRate) override; - private: /** * Issue a deferred pull request to our downstream component, if one has been registered. diff --git a/inc/streams/EffectFilter.h b/inc/streams/EffectFilter.h index f2de3834..6d5bee33 100644 --- a/inc/streams/EffectFilter.h +++ b/inc/streams/EffectFilter.h @@ -29,7 +29,6 @@ namespace codal virtual int setFormat( int format ); virtual float getSampleRate(); - virtual float requestSampleRate(float sampleRate); /** * Defines if this filter should perform a deep copy of incoming data, or update data in place. diff --git a/inc/streams/StreamFlowTrigger.h b/inc/streams/StreamFlowTrigger.h deleted file mode 100644 index f13c81f3..00000000 --- a/inc/streams/StreamFlowTrigger.h +++ /dev/null @@ -1,37 +0,0 @@ -#include "ManagedBuffer.h" -#include "DataStream.h" - -#ifndef STREAM_FLOW_TRIGGER_H -#define STREAM_FLOW_TRIGGER_H - -#define TRIGGER_PULL 1 -#define TRIGGER_REQUEST 2 - -namespace codal { - - class StreamFlowTrigger : public DataSource, public DataSink { - private: - - DataSink *downStream; - DataSource &upStream; - - void (*eventHandler)(int); - - public: - - StreamFlowTrigger( DataSource &source ); - ~StreamFlowTrigger(); - - void setDataHandler( void (*handler)(int) ); - - virtual ManagedBuffer pull(); - virtual int pullRequest(); - virtual void connect( DataSink &sink ); - bool isConnected(); - virtual void disconnect(); - virtual int getFormat(); - virtual int setFormat( int format ); - }; -} - -#endif \ No newline at end of file diff --git a/inc/streams/StreamNormalizer.h b/inc/streams/StreamNormalizer.h index 00d353a6..86dcdf2d 100644 --- a/inc/streams/StreamNormalizer.h +++ b/inc/streams/StreamNormalizer.h @@ -138,8 +138,6 @@ namespace codal{ int setOrMask(uint32_t mask); float getSampleRate(); - - float requestSampleRate(float sampleRate); /** * Determines if this source is connected to a downstream component diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h index 976caff2..d1a5d7e9 100644 --- a/inc/streams/StreamSplitter.h +++ b/inc/streams/StreamSplitter.h @@ -58,14 +58,14 @@ namespace codal{ class SplitterChannel : public DataSource, public DataSink { private: StreamSplitter * parent; - float sampleRate; - unsigned int inUnderflow; + int sampleDropRate = 1; + int sampleDropPosition = 0; + int sampleSigma = 0; ManagedBuffer resample( ManagedBuffer _in, uint8_t * buffer = NULL, int length = -1 ); public: - int pullAttempts; // Number of failed pull request attempts - uint32_t sentBuffers; + DataSink * output; /** @@ -88,22 +88,21 @@ namespace codal{ virtual void disconnect(); virtual int getFormat(); virtual int setFormat(int format); + virtual int requestSampleDropRate(int sampleDropRate); virtual float getSampleRate(); - virtual float requestSampleRate(float sampleRate); }; class StreamSplitter : public DataSink, public CodalComponent { private: ManagedBuffer lastBuffer; // Buffer being processed - uint64_t __cycle; public: bool isActive; // Track if we need to emit activate/deactivate messages int channels; // Current number of channels Splitter is serving volatile int activeChannels; // Current number of /active/ channels this Splitter is serving DataSource &upstream; // The upstream component of this Splitter - SplitterChannel * outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving + SplitterChannel *outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving /** * Creates a component that distributes a single upstream datasource to many downstream datasinks @@ -121,6 +120,7 @@ namespace codal{ virtual SplitterChannel * createChannel(); virtual bool destroyChannel( SplitterChannel * channel ); virtual SplitterChannel * getChannel( DataSink * output ); + virtual float getSampleRate(); /** * Destructor. diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index 91539487..af6b55b9 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -61,11 +61,6 @@ float DataSource::getSampleRate() { return DATASTREAM_SAMPLE_RATE_UNKNOWN; } -float DataSource::requestSampleRate(float sampleRate) { - // Just consume this by default, we don't _have_ to honour requests for specific rates. - return DATASTREAM_SAMPLE_RATE_UNKNOWN; -} - int DataSink::pullRequest() { return DEVICE_NOT_SUPPORTED; @@ -208,9 +203,3 @@ float DataStream::getSampleRate() { return this->upStream->getSampleRate(); return DATASTREAM_SAMPLE_RATE_UNKNOWN; } - -float DataStream::requestSampleRate(float sampleRate) { - if( this->upStream != NULL ) - return this->upStream->requestSampleRate( sampleRate ); - return DATASTREAM_SAMPLE_RATE_UNKNOWN; -} \ No newline at end of file diff --git a/source/streams/EffectFilter.cpp b/source/streams/EffectFilter.cpp index 5e81ddb4..2ace4ef2 100644 --- a/source/streams/EffectFilter.cpp +++ b/source/streams/EffectFilter.cpp @@ -63,11 +63,6 @@ float EffectFilter::getSampleRate() return this->upStream.getSampleRate(); } -float EffectFilter::requestSampleRate(float sampleRate) -{ - return this->upStream.requestSampleRate( sampleRate ); -} - /** * Defines if this filter should perform a deep copy of incoming data, or update data in place. * diff --git a/source/streams/StreamFlowTrigger.cpp b/source/streams/StreamFlowTrigger.cpp deleted file mode 100644 index a6191163..00000000 --- a/source/streams/StreamFlowTrigger.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "StreamFlowTrigger.h" -#include "ManagedBuffer.h" -#include "DataStream.h" -#include "CodalDmesg.h" - -using namespace codal; - -StreamFlowTrigger::StreamFlowTrigger( DataSource &source ) : upStream( source ) -{ - this->eventHandler = NULL; - this->downStream = NULL; - source.connect( *this ); -} - -StreamFlowTrigger::~StreamFlowTrigger() -{ - // NOP -} - -void StreamFlowTrigger::setDataHandler( void (*handler)(int) ) -{ - this->eventHandler = handler; -} - -ManagedBuffer StreamFlowTrigger::pull() -{ - (*this->eventHandler)( TRIGGER_PULL ); - return this->upStream.pull(); -} - -int StreamFlowTrigger::pullRequest() -{ - (*this->eventHandler)( TRIGGER_REQUEST ); - if( this->downStream != NULL ) - return this->downStream->pullRequest(); - - return DEVICE_BUSY; -} - -void StreamFlowTrigger::connect( DataSink &sink ) -{ - this->downStream = &sink; -} - -bool StreamFlowTrigger::isConnected() -{ - return this->downStream != NULL; -} - -void StreamFlowTrigger::disconnect() -{ - this->downStream = NULL; -} - -int StreamFlowTrigger::getFormat() -{ - return this->upStream.getFormat(); -} - -int StreamFlowTrigger::setFormat( int format ) -{ - return this->upStream.setFormat( format ); -} \ No newline at end of file diff --git a/source/streams/StreamNormalizer.cpp b/source/streams/StreamNormalizer.cpp index 9b8a04b8..7453f92d 100644 --- a/source/streams/StreamNormalizer.cpp +++ b/source/streams/StreamNormalizer.cpp @@ -177,7 +177,7 @@ ManagedBuffer StreamNormalizer::pull() else buffer = ManagedBuffer(samples * bytesPerSampleOut); - // Initialise input an doutput buffer pointers. + // Initialise input and output buffer pointers. data = &inputBuffer[0]; result = &buffer[0]; @@ -316,11 +316,8 @@ StreamNormalizer::~StreamNormalizer() } float StreamNormalizer::getSampleRate() { - return this->upstream.getSampleRate(); -} - -float StreamNormalizer::requestSampleRate(float sampleRate) { - return this->upstream.requestSampleRate( sampleRate ); + int v = this->upstream.getSampleRate(); + return v; } bool StreamNormalizer::isConnected() diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp index 400b3dac..2ca6b056 100644 --- a/source/streams/StreamSplitter.cpp +++ b/source/streams/StreamSplitter.cpp @@ -33,12 +33,8 @@ using namespace codal; SplitterChannel::SplitterChannel( StreamSplitter * parent, DataSink * output = NULL ) { - this->sampleRate = DATASTREAM_SAMPLE_RATE_UNKNOWN; this->parent = parent; this->output = output; - this->pullAttempts = 0; - this->sentBuffers = 0; - this->inUnderflow = 0; } SplitterChannel::~SplitterChannel() @@ -47,98 +43,66 @@ SplitterChannel::~SplitterChannel() } int SplitterChannel::pullRequest() { - this->pullAttempts++; if( output != NULL ) return output->pullRequest(); + return DEVICE_BUSY; } -ManagedBuffer SplitterChannel::resample( ManagedBuffer _in, uint8_t * buffer, int length ) { +ManagedBuffer SplitterChannel::resample( ManagedBuffer _in, uint8_t *buffer, int length ) { - // Going the long way around - drop any extra samples... - float inRate = parent->upstream.getSampleRate(); - float outRate = sampleRate; - + // Fast path. Perform a shallow copy of the input buffer where possible. + // TODO: verify this is still a safe operation. + if (this->sampleDropRate == 1) + return _in; + + // Going the long way around - drop any excess samples... int inFmt = parent->upstream.getFormat(); int bytesPerSample = DATASTREAM_FORMAT_BYTES_PER_SAMPLE( inFmt ); int totalSamples = _in.length() / bytesPerSample; + int numOutputSamples = (totalSamples / sampleDropRate) + 1; + uint8_t *outPtr = NULL; - // Integer estimate the number of sample drops required - int byteDeficit = (int)inRate - (int)outRate; - int packetsPerSec = (int)inRate / totalSamples; - int dropPerPacket = byteDeficit / packetsPerSec; - int samplesPerOut = totalSamples - dropPerPacket; - - // If we're not supplied an external buffer, make our own... - uint8_t * output = buffer; - if( output == NULL ) { - output = (uint8_t *)malloc( samplesPerOut * bytesPerSample ); - length = samplesPerOut * bytesPerSample; - } else { - if (length > samplesPerOut * bytesPerSample) { - length = samplesPerOut * bytesPerSample; - } - } - - int oversample_offset = 0; - int oversample_step = (totalSamples * CONFIG_SPLITTER_OVERSAMPLE_STEP) / samplesPerOut; + ManagedBuffer output = ManagedBuffer(numOutputSamples * bytesPerSample); + outPtr = output.getBytes(); - uint8_t *inPtr = &_in[0]; - uint8_t *outPtr = output; - while( outPtr - output < length ) + for (int i = 0; i < totalSamples * bytesPerSample; i++) { - int a = StreamNormalizer::readSample[inFmt]( inPtr + ((int)(oversample_offset / CONFIG_SPLITTER_OVERSAMPLE_STEP) * bytesPerSample) ); - int b = StreamNormalizer::readSample[inFmt]( inPtr + (((int)(oversample_offset / CONFIG_SPLITTER_OVERSAMPLE_STEP) + 1) * bytesPerSample) ); - int s = a + ((int)((b - a)/CONFIG_SPLITTER_OVERSAMPLE_STEP) * (oversample_offset % CONFIG_SPLITTER_OVERSAMPLE_STEP)); + sampleSigma += StreamNormalizer::readSample[inFmt]( &_in[i]); + sampleDropPosition++; - oversample_offset += oversample_step; + if (sampleDropPosition >= sampleDropRate) + { + StreamNormalizer::writeSample[inFmt](outPtr, sampleSigma / sampleDropRate); + outPtr += bytesPerSample; - StreamNormalizer::writeSample[inFmt](outPtr, s); - outPtr += bytesPerSample; + sampleDropPosition = 0; + sampleSigma = 0; + } } - ManagedBuffer result = ManagedBuffer( output, length ); - - // Did we create this memory? If so, free it again. - if( buffer == NULL ) - free( output ); + output.truncate(outPtr - output.getBytes()); - return result; + return output; } uint8_t * SplitterChannel::pullInto( uint8_t * rawBuffer, int length ) { - this->pullAttempts = 0; - this->sentBuffers++; ManagedBuffer inData = parent->getBuffer(); - - // Shortcuts - we can't fabricate samples, so just pass on what we can if we don't know or can't keep up. - if( this->sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN || this->sampleRate >= this->parent->upstream.getSampleRate() ) { - inData.readBytes( rawBuffer, 0, min(inData.length(), length) ); - return rawBuffer + min(inData.length(), length); - } - ManagedBuffer result = this->resample( inData, rawBuffer, length ); + return rawBuffer + result.length(); } ManagedBuffer SplitterChannel::pull() { - this->pullAttempts = 0; - this->sentBuffers++; ManagedBuffer inData = parent->getBuffer(); - - // Shortcuts - we can't fabricate samples, so just pass on what we can if we don't know or can't keep up. - if( this->sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN || this->sampleRate >= this->parent->upstream.getSampleRate() ) - return inData; - return this->resample( inData ); // Autocreate the output buffer } void SplitterChannel::connect(DataSink &sink) { output = &sink; - Event e( parent->id, SPLITTER_CHANNEL_CONNECT ); } bool SplitterChannel::isConnected() @@ -149,7 +113,6 @@ bool SplitterChannel::isConnected() void SplitterChannel::disconnect() { output = NULL; - Event e( parent->id, SPLITTER_CHANNEL_DISCONNECT ); } int SplitterChannel::getFormat() @@ -162,33 +125,16 @@ int SplitterChannel::setFormat(int format) return parent->upstream.setFormat( format ); } -float SplitterChannel::getSampleRate() -{ - if( sampleRate != DATASTREAM_SAMPLE_RATE_UNKNOWN ) - return sampleRate; - return parent->upstream.getSampleRate(); -} - -float SplitterChannel::requestSampleRate( float sampleRate ) +int SplitterChannel::requestSampleDropRate( int sampleDropRate ) { - this->sampleRate = sampleRate; - - // Do we need to request a higher rate upstream? - if( parent->upstream.getSampleRate() < sampleRate ) { - - // Request it, and if we got less that we expected, report that rate - if( parent->upstream.requestSampleRate( sampleRate ) < sampleRate ) - return parent->upstream.getSampleRate(); - } + // TODO: Any validaiton to do here? Or do we permit any integer multiple? + this->sampleDropRate = sampleDropRate; + this->sampleDropPosition = 0; + this->sampleSigma = 0; - // Otherwise, report our own rate (we're matching or altering it ourselves) - return sampleRate; + return this->sampleDropRate; } - - - - /** * Creates a component that distributes a single upstream datasource to many downstream datasinks * @@ -206,9 +152,6 @@ StreamSplitter::StreamSplitter(DataSource &source, uint16_t id) : upstream(sourc outputChannels[i] = NULL; upstream.connect(*this); - - this->__cycle = 0; - //this->status |= DEVICE_COMPONENT_STATUS_SYSTEM_TICK; } StreamSplitter::~StreamSplitter() @@ -237,22 +180,16 @@ int StreamSplitter::pullRequest() if (outputChannels[i] != NULL) { if( outputChannels[i]->pullRequest() == DEVICE_OK ) { activeChannels++; - - if( !isActive ) - Event e( id, SPLITTER_ACTIVATE ); - isActive = true; } } } if( activeChannels == 0 && isActive ) { - Event e( id, SPLITTER_DEACTIVATE ); isActive = false; } lastBuffer = ManagedBuffer(); - Event e( id, SPLITTER_TICK ); return DEVICE_BUSY; } @@ -269,6 +206,7 @@ SplitterChannel * StreamSplitter::createChannel() break; } } + if(placed != -1) { channels++; return outputChannels[placed]; @@ -302,4 +240,14 @@ SplitterChannel * StreamSplitter::getChannel( DataSink * output ) { } return NULL; -} \ No newline at end of file +} + +float SplitterChannel::getSampleRate() { + int v = parent->upstream.getSampleRate() / sampleDropRate; + return v; +} + +float StreamSplitter::getSampleRate() { + int v = upstream.getSampleRate(); + return v; +} From 5f4473d1774e761332b66967b929f717d74468db Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Fri, 22 Nov 2024 10:38:40 +0000 Subject: [PATCH 2/6] Remove legacy code. --- inc/streams/DataStream.h | 10 ---------- source/streams/DataStream.cpp | 11 ----------- 2 files changed, 21 deletions(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 55ce29ee..30b258bb 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -107,16 +107,6 @@ namespace codal */ ~DataStream(); - /** - * Controls if this component should emit flow state events. - * - * @warning Should not be called mutliple times with `id == 0`, as it will spuriously reallocate event IDs - * - * @param id If zero, this will auto-allocate a new event ID - * @return uint16_t The new event ID for this DataStream - */ - uint16_t emitFlowEvents( uint16_t id = 0 ); - /** * Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory. * @return true if one or more of the ManagedBuffers in this stream reside in FLASH memory, false otherwise. diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index af6b55b9..58d3e626 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -83,17 +83,6 @@ DataStream::~DataStream() { } -uint16_t DataStream::emitFlowEvents( uint16_t id ) -{ - if( this->flowEventCode == 0 ) { - if( id == 0 ) - this->flowEventCode = allocateNotifyEvent(); - else - this->flowEventCode = id; - } - return this->flowEventCode; -} - bool DataStream::isReadOnly() { if( this->hasPending ) From d3e69d460f7319cb00f732e06b53faa71c2b7814 Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Tue, 26 Nov 2024 11:17:05 +0000 Subject: [PATCH 3/6] Introduce DataSourceSink base class - Refactor to have a common base class - Standardize implementation - reduce duplicted code n.b. this is a first cut, compiling but untestsed code. --- inc/streams/DataStream.h | 89 +++++++++------------- inc/streams/EffectFilter.h | 14 +--- inc/streams/FIFOStream.h | 10 +-- inc/streams/StreamNormalizer.h | 19 +---- inc/streams/StreamRecording.h | 12 +-- inc/streams/StreamSplitter.h | 15 +--- source/streams/DataStream.cpp | 112 +++++++++++++++++----------- source/streams/EffectFilter.cpp | 41 +--------- source/streams/FIFOStream.cpp | 28 +------ source/streams/StreamNormalizer.cpp | 32 +------- source/streams/StreamRecording.cpp | 39 +--------- source/streams/StreamSplitter.cpp | 39 ++-------- 12 files changed, 121 insertions(+), 329 deletions(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 30b258bb..13e79abc 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -63,6 +63,8 @@ namespace codal */ class DataSource { + bool dataIsWanted; + public: virtual ManagedBuffer pull(); virtual void connect(DataSink &sink); @@ -71,6 +73,39 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); + virtual int dataWanted(bool wanted); + virtual bool isWanted(); + }; + + /** + * This class acts as a base class for objects that serve both the DataSource and DataSink interfaces. + * Classes anre not required to use this, but are strongly encouraged to use this as a base class, in order + * to reduce complexity, ensure consistent behaviour, and reduce duplicated code. + */ + class DataSourceSink : public DataSource, public DataSink + { + + public: + DataSink *downStream; + DataSource &upStream; + + /** + * Constructor. + * Creates an empty DataSourceSink. + * + * @param upstream the component that will normally feed this datastream with data. + */ + DataSourceSink(DataSource &upstream); + virtual ~DataSourceSink(); + + virtual void connect(DataSink &sink); + virtual bool isConnected(); + virtual void disconnect(); + virtual int getFormat(); + virtual int setFormat(int format); + virtual float getSampleRate(); + virtual int dataWanted(bool wanted); + virtual int pullRequest(); }; /** @@ -78,19 +113,14 @@ namespace codal * A Datastream holds a number of ManagedBuffer references, provides basic flow control through a push/pull mechanism * and byte level access to the datastream, even if it spans different buffers. */ - class DataStream : public DataSource, public DataSink + class DataStream : public DataSourceSink { uint16_t pullRequestEventCode; - uint16_t flowEventCode; ManagedBuffer nextBuffer; bool hasPending; bool isBlocking; - unsigned int missedBuffers; int downstreamReturn; - DataSink *downStream; - DataSource *upStream; - public: /** @@ -113,42 +143,6 @@ namespace codal */ bool isReadOnly(); - /** - * Attempts to determine if another component downstream of this one is _actually_ pulling data, and thus, data - * is flowing. - * - * @return true If there is a count-match between `pullRequest` and `pull` calls. - * @return false If `pullRequest` calls are not currently being matched by `pull` calls. - */ - bool isFlowing(); - - /** - * Define a downstream component for data stream. - * - * @sink The component that data will be delivered to, when it is available - */ - virtual void connect(DataSink &sink) override; - - /** - * Determines if this source is connected to a downstream component - * - * @return true If a downstream is connected - * @return false If a downstream is not connected - */ - virtual bool isConnected(); - - /** - * Define a downstream component for data stream. - * - * @sink The component that data will be delivered to, when it is available - */ - virtual void disconnect() override; - - /** - * Determine the data format of the buffers streamed out of this component. - */ - virtual int getFormat() override; - /** * Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer * will result int he calling fiber being blocked until space is available. Downstream DataSinks will also attempt to process data @@ -175,17 +169,6 @@ namespace codal */ virtual int pullRequest(); - /** - * Query the stream for its current sample rate. - * - * If the current object is unable to determine this itself, it will pass the call upstream until it reaches a component can respond. - * - * @warning The sample rate for a stream may change during its lifetime. If a component is sensitive to this, it should periodically check. - * - * @return float The current sample rate for this stream, or `DATASTREAM_SAMPLE_RATE_UNKNOWN` if none is found. - */ - virtual float getSampleRate() override; - private: /** * Issue a deferred pull request to our downstream component, if one has been registered. diff --git a/inc/streams/EffectFilter.h b/inc/streams/EffectFilter.h index 6d5bee33..29a5ae82 100644 --- a/inc/streams/EffectFilter.h +++ b/inc/streams/EffectFilter.h @@ -7,12 +7,8 @@ namespace codal { - class EffectFilter : public DataSource, public DataSink + class EffectFilter : public DataSourceSink { - protected: - - DataSink *downStream; - DataSource &upStream; bool deepCopy; public: @@ -21,14 +17,6 @@ namespace codal ~EffectFilter(); virtual ManagedBuffer pull(); - virtual int pullRequest(); - virtual void connect( DataSink &sink ); - bool isConnected(); - virtual void disconnect(); - virtual int getFormat(); - virtual int setFormat( int format ); - - virtual float getSampleRate(); /** * Defines if this filter should perform a deep copy of incoming data, or update data in place. diff --git a/inc/streams/FIFOStream.h b/inc/streams/FIFOStream.h index a16503b2..f715a583 100644 --- a/inc/streams/FIFOStream.h +++ b/inc/streams/FIFOStream.h @@ -9,7 +9,7 @@ namespace codal { - class FIFOStream : public DataSource, public DataSink + class FIFOStream : public DataSourceSink { private: @@ -20,9 +20,6 @@ namespace codal { bool allowInput; bool allowOutput; - DataSink *downStream; - DataSource &upStream; - public: FIFOStream( DataSource &source ); @@ -30,11 +27,6 @@ namespace codal { virtual ManagedBuffer pull(); virtual int pullRequest(); - virtual void connect( DataSink &sink ); - bool isConnected(); - virtual void disconnect(); - virtual int getFormat(); - virtual int setFormat( int format ); int length(); void dumpState(); diff --git a/inc/streams/StreamNormalizer.h b/inc/streams/StreamNormalizer.h index 86dcdf2d..f37834a1 100644 --- a/inc/streams/StreamNormalizer.h +++ b/inc/streams/StreamNormalizer.h @@ -41,7 +41,7 @@ typedef void (*SampleWriteFn)(uint8_t *, int); namespace codal{ - class StreamNormalizer : public DataSink, public DataSource + class StreamNormalizer : public DataSourceSink { public: int outputFormat; // The format to output in. By default, this is the sme as the input. @@ -52,9 +52,7 @@ namespace codal{ bool normalize; // If set, will recalculate a zero offset. bool zeroOffsetValid; // Set to true after the first buffer has been processed. bool outputEnabled; // When set any bxuffer processed will be forwarded downstream. - DataSource &upstream; // The upstream component of this StreamNormalizer. DataStream output; // The downstream output stream of this StreamNormalizer. - //ManagedBuffer buffer; // The buffer being processed. static SampleReadFn readSample[9]; static SampleWriteFn writeSample[9]; @@ -94,11 +92,6 @@ namespace codal{ */ bool getNormalize(); - /** - * Determine the data format of the buffers streamed out of this component. - */ - virtual int getFormat(); - /** * Defines the data format of the buffers streamed out of this component. * @param format valid values include: @@ -137,16 +130,6 @@ namespace codal{ */ int setOrMask(uint32_t mask); - float getSampleRate(); - - /** - * Determines if this source is connected to a downstream component - * - * @return true If a downstream is connected - * @return false If a downstream is not connected - */ - bool isConnected(); - /** * Destructor. */ diff --git a/inc/streams/StreamRecording.h b/inc/streams/StreamRecording.h index 3e1b390f..c2803a18 100644 --- a/inc/streams/StreamRecording.h +++ b/inc/streams/StreamRecording.h @@ -27,12 +27,10 @@ namespace codal } }; - class StreamRecording : public DataSource, public DataSink + class StreamRecording : public DataSourceSink { private: - //ManagedBuffer buffer[REC_MAX_BUFFERS]; - //StreamRecording_Buffer_t * bufferChain; StreamRecording_Buffer * lastBuffer; StreamRecording_Buffer * readHead; uint32_t maxBufferLenth; @@ -41,9 +39,6 @@ namespace codal int state; float lastUpstreamRate; - DataSink *downStream; - DataSource &upStream; - void initialise(); public: @@ -65,11 +60,6 @@ namespace codal virtual ManagedBuffer pull(); virtual int pullRequest(); - virtual void connect( DataSink &sink ); - bool isConnected(); - virtual void disconnect(); - virtual int getFormat(); - virtual int setFormat( int format ); void printChain(); diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h index d1a5d7e9..d6c5dff3 100644 --- a/inc/streams/StreamSplitter.h +++ b/inc/streams/StreamSplitter.h @@ -55,7 +55,7 @@ namespace codal{ class StreamSplitter; - class SplitterChannel : public DataSource, public DataSink { + class SplitterChannel : public DataSourceSink { private: StreamSplitter * parent; int sampleDropRate = 1; @@ -66,8 +66,6 @@ namespace codal{ public: - DataSink * output; - /** * @brief Construct a new Splitter Channel object. * @@ -80,12 +78,8 @@ namespace codal{ SplitterChannel( StreamSplitter *parent, DataSink *output ); virtual ~SplitterChannel(); - virtual int pullRequest(); uint8_t * pullInto( uint8_t * rawBuffer, int length ); virtual ManagedBuffer pull(); - virtual void connect(DataSink &sink); - bool isConnected(); - virtual void disconnect(); virtual int getFormat(); virtual int setFormat(int format); virtual int requestSampleDropRate(int sampleDropRate); @@ -110,6 +104,7 @@ namespace codal{ * @param source a DataSource to receive data from */ StreamSplitter(DataSource &source, uint16_t id = CodalComponent::generateDynamicID()); + virtual ~StreamSplitter(); /** * Callback provided when data is ready. @@ -120,12 +115,6 @@ namespace codal{ virtual SplitterChannel * createChannel(); virtual bool destroyChannel( SplitterChannel * channel ); virtual SplitterChannel * getChannel( DataSink * output ); - virtual float getSampleRate(); - - /** - * Destructor. - */ - virtual ~StreamSplitter(); friend SplitterChannel; diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index 58d3e626..7d8ec0da 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -61,59 +61,98 @@ float DataSource::getSampleRate() { return DATASTREAM_SAMPLE_RATE_UNKNOWN; } +int DataSource::dataWanted(bool wanted) +{ + dataIsWanted = wanted; + return DEVICE_OK; +} + +bool DataSource::isWanted() +{ + return dataIsWanted; +} + +//DataSink methods. int DataSink::pullRequest() { return DEVICE_NOT_SUPPORTED; } -DataStream::DataStream(DataSource &upstream) +// DataSourceSink methods. +DataSourceSink::DataSourceSink(DataSource &source) : upStream( source ) { - this->pullRequestEventCode = 0; - this->isBlocking = true; - this->hasPending = false; - this->missedBuffers = CODAL_DATASTREAM_HIGH_WATER_MARK; - this->downstreamReturn = DEVICE_OK; - this->flowEventCode = 0; + downStream = NULL; + source.connect( *this ); +} - this->downStream = NULL; - this->upStream = &upstream; +DataSourceSink::~DataSourceSink() +{ } -DataStream::~DataStream() +void DataSourceSink::connect(DataSink &sink) { + downStream = &sink; } -bool DataStream::isReadOnly() +bool DataSourceSink::isConnected() +{ + return downStream != NULL; +} + +void DataSourceSink::disconnect() { - if( this->hasPending ) - return this->nextBuffer.isReadOnly(); - return true; + downStream = NULL; } -bool DataStream::isFlowing() +int DataSourceSink::getFormat() { - return this->missedBuffers < CODAL_DATASTREAM_HIGH_WATER_MARK; + return upStream.getFormat(); } -void DataStream::connect(DataSink &sink) +int DataSourceSink::setFormat(int format) { - this->downStream = &sink; - this->upStream->connect(*this); + return upStream.setFormat( format ); } -bool DataStream::isConnected() +float DataSourceSink::getSampleRate() { - return this->downStream != NULL; + return upStream.getSampleRate(); } -int DataStream::getFormat() +int DataSourceSink::dataWanted(bool wanted) { - return upStream->getFormat(); + DataSource::dataWanted(wanted); + return upStream.dataWanted(wanted); } -void DataStream::disconnect() +int DataSourceSink::pullRequest() { - this->downStream = NULL; + if( this->downStream != NULL ) + return this->downStream->pullRequest(); + return DEVICE_BUSY; +} + +/** + * Definition for a DataStream class. This doresn't *really* belong in here, as its key role is to + * decouple a pipeline the straddles an interrupt context boundary... + */ +DataStream::DataStream(DataSource &upstream) : DataSourceSink(upstream) +{ + this->pullRequestEventCode = 0; + this->isBlocking = true; + this->hasPending = false; + this->downstreamReturn = DEVICE_OK; +} + +DataStream::~DataStream() +{ +} + +bool DataStream::isReadOnly() +{ + if( this->hasPending ) + return this->nextBuffer.isReadOnly(); + return true; } void DataStream::setBlocking(bool isBlocking) @@ -132,14 +171,9 @@ void DataStream::setBlocking(bool isBlocking) ManagedBuffer DataStream::pull() { - // 1, as we will normally be at '1' waiting buffer here if we're in-sync with the source - if( this->missedBuffers > 1 ) - Event evt( DEVICE_ID_NOTIFY, this->flowEventCode ); - - this->missedBuffers = 0; // Are we running in sync (blocking) mode? if( this->isBlocking ) - return this->upStream->pull(); + return this->upStream.pull(); this->hasPending = false; return ManagedBuffer( this->nextBuffer ); // Deep copy! @@ -161,12 +195,6 @@ bool DataStream::canPull(int size) int DataStream::pullRequest() { - // _Technically_ not a missed buffer... yet. But we can only check later. - if( this->missedBuffers < CODAL_DATASTREAM_HIGH_WATER_MARK ) - if( ++this->missedBuffers == CODAL_DATASTREAM_HIGH_WATER_MARK ) - if( this->flowEventCode != 0 ) - Event evt( DEVICE_ID_NOTIFY, this->flowEventCode ); - // Are we running in async (non-blocking) mode? if( !this->isBlocking ) { if( this->hasPending && this->downstreamReturn != DEVICE_OK ) { @@ -174,7 +202,7 @@ int DataStream::pullRequest() return this->downstreamReturn; } - this->nextBuffer = this->upStream->pull(); + this->nextBuffer = this->upStream.pull(); this->hasPending = true; Event evt( DEVICE_ID_NOTIFY, this->pullRequestEventCode ); @@ -185,10 +213,4 @@ int DataStream::pullRequest() return this->downStream->pullRequest(); return DEVICE_BUSY; -} - -float DataStream::getSampleRate() { - if( this->upStream != NULL ) - return this->upStream->getSampleRate(); - return DATASTREAM_SAMPLE_RATE_UNKNOWN; -} +} \ No newline at end of file diff --git a/source/streams/EffectFilter.cpp b/source/streams/EffectFilter.cpp index 2ace4ef2..77285e65 100644 --- a/source/streams/EffectFilter.cpp +++ b/source/streams/EffectFilter.cpp @@ -6,11 +6,9 @@ using namespace codal; -EffectFilter::EffectFilter(DataSource &source, bool deepCopy) : upStream( source ) +EffectFilter::EffectFilter(DataSource &source, bool deepCopy) : DataSourceSink( source ) { - this->downStream = NULL; this->deepCopy = deepCopy; - source.connect( *this ); } EffectFilter::~EffectFilter() @@ -26,43 +24,6 @@ ManagedBuffer EffectFilter::pull() return output; } -int EffectFilter::pullRequest() -{ - if( this->downStream != NULL ) - return this->downStream->pullRequest(); - return DEVICE_BUSY; -} - -void EffectFilter::connect(DataSink &sink) -{ - this->downStream = &sink; -} - -bool EffectFilter::isConnected() -{ - return this->downStream != NULL; -} - -void EffectFilter::disconnect() -{ - this->downStream = NULL; -} - -int EffectFilter::getFormat() -{ - return this->upStream.getFormat(); -} - -int EffectFilter::setFormat( int format ) -{ - return this->upStream.setFormat( format ); -} - -float EffectFilter::getSampleRate() -{ - return this->upStream.getSampleRate(); -} - /** * Defines if this filter should perform a deep copy of incoming data, or update data in place. * diff --git a/source/streams/FIFOStream.cpp b/source/streams/FIFOStream.cpp index 7f10ae6d..9491fc8d 100644 --- a/source/streams/FIFOStream.cpp +++ b/source/streams/FIFOStream.cpp @@ -7,7 +7,7 @@ using namespace codal; -FIFOStream::FIFOStream( DataSource &source ) : upStream( source ) +FIFOStream::FIFOStream( DataSource &source ) : DataSourceSink( source ) { this->bufferCount = 0; this->bufferLength = 0; @@ -22,7 +22,6 @@ FIFOStream::FIFOStream( DataSource &source ) : upStream( source ) FIFOStream::~FIFOStream() { - // } bool FIFOStream::canPull() @@ -92,31 +91,6 @@ int FIFOStream::pullRequest() return DEVICE_OK; } -void FIFOStream::connect( DataSink &sink ) -{ - this->downStream = &sink; -} - -bool FIFOStream::isConnected() -{ - return this->downStream != NULL; -} - -void FIFOStream::disconnect() -{ - this->downStream = NULL; -} - -int FIFOStream::getFormat() -{ - return this->upStream.getFormat(); -} - -int FIFOStream::setFormat( int format ) -{ - return this->upStream.setFormat( format ); -} - void FIFOStream::setInputEnable( bool state ) { this->allowInput = state; diff --git a/source/streams/StreamNormalizer.cpp b/source/streams/StreamNormalizer.cpp index 7453f92d..5b15f622 100644 --- a/source/streams/StreamNormalizer.cpp +++ b/source/streams/StreamNormalizer.cpp @@ -125,7 +125,7 @@ SampleWriteFn StreamNormalizer::writeSample[] = {write_sample_1, write_sample_1, * @param format The format to convert the input stream into * @param stabilisation the maximum change of zero-offset permitted between subsequent buffers before output is initiated. Set to zero to disable (default) */ -StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normalize, int format, int stabilisation) : upstream(source), output(*this) +StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normalize, int format, int stabilisation) : DataSourceSink(source), output(*this) { setFormat(format); setGain(gain); @@ -135,9 +135,6 @@ StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normaliz this->zeroOffset = 0; this->stabilisation = stabilisation; this->outputEnabled = normalize && stabilisation ? false : true; - - // Register with our upstream component - source.connect(*this); } /** @@ -157,9 +154,9 @@ ManagedBuffer StreamNormalizer::pull() ManagedBuffer buffer; // The buffer being processed. // Determine the input format. - inputFormat = upstream.getFormat(); + inputFormat = upStream.getFormat(); - // If no output format has been selected, infer it from our upstream component. + // If no output format has been selected, infer it from our upStream component. if (outputFormat == DATASTREAM_FORMAT_UNKNOWN) outputFormat = inputFormat; @@ -168,7 +165,7 @@ ManagedBuffer StreamNormalizer::pull() bytesPerSampleOut = DATASTREAM_FORMAT_BYTES_PER_SAMPLE(outputFormat); // Acquire the buffer to be processed. - ManagedBuffer inputBuffer = upstream.pull(); + ManagedBuffer inputBuffer = upStream.pull(); samples = inputBuffer.length() / bytesPerSampleIn; // Use in place processing where possible, but allocate a new buffer when needed. @@ -251,16 +248,6 @@ bool StreamNormalizer::getNormalize() return normalize; } -/** - * Determine the data format of the buffers streamed out of this component. - */ -int StreamNormalizer::getFormat() -{ - if (outputFormat == DATASTREAM_FORMAT_UNKNOWN) - outputFormat = upstream.getFormat(); - - return outputFormat; -} /** * Defines the data format of the buffers streamed out of this component. @@ -314,14 +301,3 @@ int StreamNormalizer::setOrMask(uint32_t mask) StreamNormalizer::~StreamNormalizer() { } - -float StreamNormalizer::getSampleRate() { - int v = this->upstream.getSampleRate(); - return v; -} - -bool StreamNormalizer::isConnected() -{ - //return this->output.isConnected(); - return false; -} \ No newline at end of file diff --git a/source/streams/StreamRecording.cpp b/source/streams/StreamRecording.cpp index c5462dc9..45c04809 100644 --- a/source/streams/StreamRecording.cpp +++ b/source/streams/StreamRecording.cpp @@ -14,7 +14,7 @@ using namespace codal; ( sizeof(StreamRecording_Buffer) + sizeof(BufferData) + 2 * sizeof(PROCESSOR_WORD_TYPE)) -StreamRecording::StreamRecording( DataSource &source, uint32_t maxLength ) : upStream( source ) +StreamRecording::StreamRecording( DataSource &source, uint32_t maxLength ) : DataSourceSink( source ) { this->state = REC_STATE_STOPPED; @@ -24,14 +24,10 @@ StreamRecording::StreamRecording( DataSource &source, uint32_t maxLength ) : upS this->maxBufferLenth = maxLength + ( maxLength / 256 + 1) * CODAL_STREAM_RECORDING_BUFFER_OVERHEAD; initialise(); - - this->downStream = NULL; - upStream.connect( *this ); } StreamRecording::~StreamRecording() { - // } void StreamRecording::initialise() @@ -141,31 +137,6 @@ int StreamRecording::pullRequest() return DEVICE_NO_RESOURCES; } -void StreamRecording::connect( DataSink &sink ) -{ - this->downStream = &sink; -} - -bool StreamRecording::isConnected() -{ - return this->downStream != NULL; -} - -void StreamRecording::disconnect() -{ - this->downStream = NULL; -} - -int StreamRecording::getFormat() -{ - return this->upStream.getFormat(); -} - -int StreamRecording::setFormat( int format ) -{ - return this->upStream.setFormat( format ); -} - bool StreamRecording::recordAsync() { // Duplicate check from within erase(), but here for safety in case of later code edits... @@ -251,11 +222,3 @@ bool StreamRecording::isStopped() fiber_sleep(0); return this->state == REC_STATE_STOPPED; } - -float StreamRecording::getSampleRate() -{ - if( this->lastUpstreamRate == DATASTREAM_SAMPLE_RATE_UNKNOWN ) - return this->upStream.getSampleRate(); - - return this->lastUpstreamRate; -} \ No newline at end of file diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp index 2ca6b056..e85d3f23 100644 --- a/source/streams/StreamSplitter.cpp +++ b/source/streams/StreamSplitter.cpp @@ -31,22 +31,14 @@ DEALINGS IN THE SOFTWARE. using namespace codal; -SplitterChannel::SplitterChannel( StreamSplitter * parent, DataSink * output = NULL ) +SplitterChannel::SplitterChannel( StreamSplitter * parent, DataSink * output = NULL ) : DataSourceSink(*(new DataSource())) { this->parent = parent; - this->output = output; + this->downStream = output; } SplitterChannel::~SplitterChannel() { - // -} - -int SplitterChannel::pullRequest() { - if( output != NULL ) - return output->pullRequest(); - - return DEVICE_BUSY; } ManagedBuffer SplitterChannel::resample( ManagedBuffer _in, uint8_t *buffer, int length ) { @@ -100,21 +92,6 @@ ManagedBuffer SplitterChannel::pull() return this->resample( inData ); // Autocreate the output buffer } -void SplitterChannel::connect(DataSink &sink) -{ - output = &sink; -} - -bool SplitterChannel::isConnected() -{ - return this->output != NULL; -} - -void SplitterChannel::disconnect() -{ - output = NULL; -} - int SplitterChannel::getFormat() { return parent->upstream.getFormat(); @@ -233,7 +210,7 @@ SplitterChannel * StreamSplitter::getChannel( DataSink * output ) { { if( outputChannels[i] != NULL ) { - if( outputChannels[i]->output == output ) { + if( outputChannels[i]->downStream == output ) { return outputChannels[i]; } } @@ -243,11 +220,5 @@ SplitterChannel * StreamSplitter::getChannel( DataSink * output ) { } float SplitterChannel::getSampleRate() { - int v = parent->upstream.getSampleRate() / sampleDropRate; - return v; -} - -float StreamSplitter::getSampleRate() { - int v = upstream.getSampleRate(); - return v; -} + return parent->upstream.getSampleRate() / sampleDropRate; +} \ No newline at end of file From efc7017237502dd63c6ae915450b568398643f62 Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Tue, 26 Nov 2024 13:18:38 +0000 Subject: [PATCH 4/6] More refactoring. Compiles, but not tested. --- inc/streams/DataStream.h | 4 +-- inc/streams/StreamSplitter.h | 5 ++-- source/streams/DataStream.cpp | 5 ++-- source/streams/StreamSplitter.cpp | 41 ++++++++++++++++++++----------- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 13e79abc..707e6664 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -73,7 +73,7 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); - virtual int dataWanted(bool wanted); + virtual void dataWanted(bool wanted); virtual bool isWanted(); }; @@ -104,7 +104,7 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); - virtual int dataWanted(bool wanted); + virtual void dataWanted(bool wanted); virtual int pullRequest(); }; diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h index d6c5dff3..64fe7a71 100644 --- a/inc/streams/StreamSplitter.h +++ b/inc/streams/StreamSplitter.h @@ -84,6 +84,7 @@ namespace codal{ virtual int setFormat(int format); virtual int requestSampleDropRate(int sampleDropRate); virtual float getSampleRate(); + virtual void dataWanted(bool wanted); }; class StreamSplitter : public DataSink, public CodalComponent @@ -92,9 +93,7 @@ namespace codal{ ManagedBuffer lastBuffer; // Buffer being processed public: - bool isActive; // Track if we need to emit activate/deactivate messages int channels; // Current number of channels Splitter is serving - volatile int activeChannels; // Current number of /active/ channels this Splitter is serving DataSource &upstream; // The upstream component of this Splitter SplitterChannel *outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving @@ -110,6 +109,7 @@ namespace codal{ * Callback provided when data is ready. */ virtual int pullRequest(); + virtual void dataWanted(bool wanted); virtual ManagedBuffer getBuffer(); virtual SplitterChannel * createChannel(); @@ -117,7 +117,6 @@ namespace codal{ virtual SplitterChannel * getChannel( DataSink * output ); friend SplitterChannel; - }; } diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index 7d8ec0da..bd37f302 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -61,10 +61,9 @@ float DataSource::getSampleRate() { return DATASTREAM_SAMPLE_RATE_UNKNOWN; } -int DataSource::dataWanted(bool wanted) +void DataSource::dataWanted(bool wanted) { dataIsWanted = wanted; - return DEVICE_OK; } bool DataSource::isWanted() @@ -119,7 +118,7 @@ float DataSourceSink::getSampleRate() return upStream.getSampleRate(); } -int DataSourceSink::dataWanted(bool wanted) +void DataSourceSink::dataWanted(bool wanted) { DataSource::dataWanted(wanted); return upStream.dataWanted(wanted); diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp index e85d3f23..10d6fdff 100644 --- a/source/streams/StreamSplitter.cpp +++ b/source/streams/StreamSplitter.cpp @@ -112,6 +112,13 @@ int SplitterChannel::requestSampleDropRate( int sampleDropRate ) return this->sampleDropRate; } +void SplitterChannel::dataWanted(bool wanted) +{ + DataSource::dataWanted(wanted); + return parent->dataWanted(wanted); +} + + /** * Creates a component that distributes a single upstream datasource to many downstream datasinks * @@ -121,8 +128,6 @@ StreamSplitter::StreamSplitter(DataSource &source, uint16_t id) : upstream(sourc { this->id = id; this->channels = 0; - this->activeChannels = 0; - this->isActive = false; // init array to NULL. for (int i = 0; i < CONFIG_MAX_CHANNELS; i++) @@ -136,9 +141,26 @@ StreamSplitter::~StreamSplitter() // Nop. } +void StreamSplitter::dataWanted(bool wanted) +{ + // Determine if any of our active splitter channels require data. + bool streamWanted = 0; + + for(int i=0; iisWanted()) + { + streamWanted = 1; + break; + } + } + + return upstream.dataWanted(streamWanted); +} + ManagedBuffer StreamSplitter::getBuffer() { - if( lastBuffer == ManagedBuffer() ) + if(lastBuffer == ManagedBuffer()) lastBuffer = upstream.pull(); return lastBuffer; @@ -149,22 +171,13 @@ ManagedBuffer StreamSplitter::getBuffer() */ int StreamSplitter::pullRequest() { - activeChannels = 0; - // For each downstream channel that exists in array outputChannels - make a pullRequest for (int i = 0; i < CONFIG_MAX_CHANNELS; i++) { - if (outputChannels[i] != NULL) { - if( outputChannels[i]->pullRequest() == DEVICE_OK ) { - activeChannels++; - } - } + if (outputChannels[i] != NULL) + outputChannels[i]->pullRequest(); } - if( activeChannels == 0 && isActive ) { - isActive = false; - } - lastBuffer = ManagedBuffer(); return DEVICE_BUSY; From 1cfa54de1dc09e806221a2a855bdf8e24e71a73c Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Thu, 28 Nov 2024 17:03:15 +0000 Subject: [PATCH 5/6] Bugfix minor errors --- inc/streams/DataStream.h | 2 ++ inc/streams/StreamNormalizer.h | 2 +- source/streams/DataStream.cpp | 7 +++++++ source/streams/StreamNormalizer.cpp | 4 ++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 707e6664..546e4fab 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -169,6 +169,8 @@ namespace codal */ virtual int pullRequest(); + virtual void connect(DataSink &sink); + private: /** * Issue a deferred pull request to our downstream component, if one has been registered. diff --git a/inc/streams/StreamNormalizer.h b/inc/streams/StreamNormalizer.h index f37834a1..b6c51c36 100644 --- a/inc/streams/StreamNormalizer.h +++ b/inc/streams/StreamNormalizer.h @@ -106,7 +106,7 @@ namespace codal{ * DATASTREAM_FORMAT_32BIT_SIGNED */ virtual int setFormat(int format); - + virtual int getFormat(); /** * Defines an optional gain to apply to the input, as a floating point multiple. * diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index bd37f302..4daa3752 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -28,6 +28,7 @@ DEALINGS IN THE SOFTWARE. #include "Event.h" #include "CodalFiber.h" #include "ErrorNo.h" +#include "CodalDmesg.h" using namespace codal; @@ -212,4 +213,10 @@ int DataStream::pullRequest() return this->downStream->pullRequest(); return DEVICE_BUSY; +} + +void DataStream::connect(DataSink &sink) +{ + DMESG("CONNECT REQUEST: this: %p, sink: %p", this, &sink); + this->downStream = &sink; } \ No newline at end of file diff --git a/source/streams/StreamNormalizer.cpp b/source/streams/StreamNormalizer.cpp index 5b15f622..0cffc276 100644 --- a/source/streams/StreamNormalizer.cpp +++ b/source/streams/StreamNormalizer.cpp @@ -248,6 +248,10 @@ bool StreamNormalizer::getNormalize() return normalize; } +int StreamNormalizer::getFormat() +{ + return outputFormat; +} /** * Defines the data format of the buffers streamed out of this component. From eca270157e0da6260ac9550f1bcd1ecc918a442d Mon Sep 17 00:00:00 2001 From: Joe Finney Date: Mon, 2 Dec 2024 17:56:24 +0000 Subject: [PATCH 6/6] Add tri-state for connectedness of audio filters - add - bugfix --- inc/streams/DataStream.h | 13 ++-- inc/streams/LevelDetectorSPL.h | 37 +++++----- inc/streams/StreamSplitter.h | 4 +- source/streams/DataStream.cpp | 9 +-- source/streams/LevelDetectorSPL.cpp | 108 ++++++++++++++++++---------- source/streams/StreamSplitter.cpp | 24 +++---- 6 files changed, 121 insertions(+), 74 deletions(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 546e4fab..012b6fea 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -47,6 +47,11 @@ DEALINGS IN THE SOFTWARE. #define DATASTREAM_SAMPLE_RATE_UNKNOWN 0.0f +#define DATASTREAM_DONT_CARE 0 +#define DATASTREAM_NOT_WANTED 1 +#define DATASTREAM_WANTED 2 + + namespace codal { /** @@ -63,7 +68,7 @@ namespace codal */ class DataSource { - bool dataIsWanted; + int dataIsWanted; public: virtual ManagedBuffer pull(); @@ -73,8 +78,8 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); - virtual void dataWanted(bool wanted); - virtual bool isWanted(); + virtual void dataWanted(int wanted); + virtual int isWanted(); }; /** @@ -104,7 +109,7 @@ namespace codal virtual int getFormat(); virtual int setFormat(int format); virtual float getSampleRate(); - virtual void dataWanted(bool wanted); + virtual void dataWanted(int wanted); virtual int pullRequest(); }; diff --git a/inc/streams/LevelDetectorSPL.h b/inc/streams/LevelDetectorSPL.h index f016cfcc..7bc9445c 100644 --- a/inc/streams/LevelDetectorSPL.h +++ b/inc/streams/LevelDetectorSPL.h @@ -36,6 +36,7 @@ DEALINGS IN THE SOFTWARE. #define LEVEL_DETECTOR_SPL_HIGH_THRESHOLD_PASSED 0x02 #define LEVEL_DETECTOR_SPL_LOW_THRESHOLD_PASSED 0x04 #define LEVEL_DETECTOR_SPL_CLAP 0x08 +#define LEVEL_DETECTOR_SPL_DATA_REQUESTED 0x10 /** @@ -82,6 +83,7 @@ DEALINGS IN THE SOFTWARE. #define LEVEL_DETECTOR_SPL_CLAP_MIN_LOUD_BLOCKS 2 // ensure noise not too short to be a clap #define LEVEL_DETECTOR_SPL_CLAP_MIN_QUIET_BLOCKS 20 // prevent very fast taps being registered as clap +#define LEVEL_DETECTOR_SPL_TIMEOUT 50 // Time in ms at which we request no further data. namespace codal{ class LevelDetectorSPL : public CodalComponent, public DataSink @@ -97,18 +99,18 @@ namespace codal{ int sigma; // Running total of the samples in the current window. float gain; float minValue; - bool activated; // Has this component been connected yet - bool enabled; // Is the component currently running - int unit; // The units to be returned from this level detector (e.g. dB or linear 8bit) - int quietBlockCount; // number of quiet blocks consecutively - used for clap detection - int noisyBlockCount; // number of noisy blocks consecutively - used for clap detection - bool inNoisyBlock; // if had noisy and waiting to lower beyond lower threshold - float maxRms; // maximum rms within a noisy block + bool enabled; // Is the component currently running. + int unit; // The units to be returned from this level detector (e.g. dB or linear 8bit). + int quietBlockCount; // number of quiet blocks consecutively - used for clap detection. + int noisyBlockCount; // number of noisy blocks consecutively - used for clap detection. + bool inNoisyBlock; // if had noisy and waiting to lower beyond lower threshold. + float maxRms; // maximum rms within a noisy block. private: - uint64_t timeout; // The timestamp at which this component will cease actively sampling the data stream - uint8_t bufferCount; // Used to track that enough buffers have been seen since activation to output a valid value/event - FiberLock resourceLock; + uint8_t bufferCount; // Used to track that enough buffers have been seen since activation to output a valid value/event. + uint8_t listenerCount; // The total number of active listeners to this component. + FiberLock resourceLock; // Fiberlock - used purely hold fibers requesting data before it is available. + uint64_t timestamp; // Timestamp of the last time someone requesed data from this component. public: /** @@ -122,9 +124,13 @@ namespace codal{ */ LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue = 52, - uint16_t id = DEVICE_ID_SYSTEM_LEVEL_DETECTOR_SPL, - bool activateImmediately = true); + uint16_t id = DEVICE_ID_SYSTEM_LEVEL_DETECTOR_SPL); + /** + * Periodic callback, every 6ms or so. + */ + void periodicCallback(); + /** * Callback provided when data is ready. */ @@ -139,11 +145,10 @@ namespace codal{ float getValue( int scale = -1 ); /** - * Keep this component active and processing buffers so that events can be produced - * - * @param state If set to true, this component will connect (if required) and start consuming buffers + * Callback when a listener to this component is added. + * n.b. we currently don't support removing listners (future work if necessary) */ - void activateForEvents( bool state ); + void listenerAdded(); /** * Disable component diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h index 64fe7a71..c3301963 100644 --- a/inc/streams/StreamSplitter.h +++ b/inc/streams/StreamSplitter.h @@ -84,7 +84,7 @@ namespace codal{ virtual int setFormat(int format); virtual int requestSampleDropRate(int sampleDropRate); virtual float getSampleRate(); - virtual void dataWanted(bool wanted); + virtual void dataWanted(int wanted); }; class StreamSplitter : public DataSink, public CodalComponent @@ -109,7 +109,7 @@ namespace codal{ * Callback provided when data is ready. */ virtual int pullRequest(); - virtual void dataWanted(bool wanted); + virtual void dataWanted(int wanted); virtual ManagedBuffer getBuffer(); virtual SplitterChannel * createChannel(); diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index 4daa3752..3b153b25 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -62,12 +62,12 @@ float DataSource::getSampleRate() { return DATASTREAM_SAMPLE_RATE_UNKNOWN; } -void DataSource::dataWanted(bool wanted) +void DataSource::dataWanted(int wanted) { dataIsWanted = wanted; } -bool DataSource::isWanted() +int DataSource::isWanted() { return dataIsWanted; } @@ -83,6 +83,7 @@ DataSourceSink::DataSourceSink(DataSource &source) : upStream( source ) { downStream = NULL; source.connect( *this ); + dataWanted(DATASTREAM_DONT_CARE); } DataSourceSink::~DataSourceSink() @@ -119,7 +120,7 @@ float DataSourceSink::getSampleRate() return upStream.getSampleRate(); } -void DataSourceSink::dataWanted(bool wanted) +void DataSourceSink::dataWanted(int wanted) { DataSource::dataWanted(wanted); return upStream.dataWanted(wanted); @@ -133,7 +134,7 @@ int DataSourceSink::pullRequest() } /** - * Definition for a DataStream class. This doresn't *really* belong in here, as its key role is to + * Definition for a DataStream class. This doesn't *really* belong in here, as its key role is to * decouple a pipeline the straddles an interrupt context boundary... */ DataStream::DataStream(DataSource &upstream) : DataSourceSink(upstream) diff --git a/source/streams/LevelDetectorSPL.cpp b/source/streams/LevelDetectorSPL.cpp index 39032716..91b994fb 100644 --- a/source/streams/LevelDetectorSPL.cpp +++ b/source/streams/LevelDetectorSPL.cpp @@ -34,7 +34,7 @@ DEALINGS IN THE SOFTWARE. using namespace codal; -LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue, uint16_t id, bool activateImmediately) : upstream(source), resourceLock(0) +LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue, uint16_t id) : upstream(source), resourceLock(0) { this->id = id; this->level = 0; @@ -43,16 +43,9 @@ LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, floa this->highThreshold = highThreshold; this->minValue = minValue; this->gain = gain; - this->status |= LEVEL_DETECTOR_SPL_INITIALISED; + this->status |= LEVEL_DETECTOR_SPL_INITIALISED | LEVEL_DETECTOR_SPL_DATA_REQUESTED; this->unit = LEVEL_DETECTOR_SPL_DB; - enabled = true; - if(activateImmediately){ - upstream.connect(*this); - this->activated = true; - } - else{ - this->activated = false; - } + this->enabled = true; this->quietBlockCount = 0; this->noisyBlockCount = 0; @@ -60,15 +53,55 @@ LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, floa this->maxRms = 0; this->bufferCount = 0; - this->timeout = 0; + this->listenerCount = 0; + + // Request a periodic callback + status |= DEVICE_COMPONENT_STATUS_SYSTEM_TICK; + + source.connect(*this); +} + + +/** + * Periodic callback from Device system timer. + * Change the upstream active status accordingly. + */ +void LevelDetectorSPL::periodicCallback() +{ + // Ensure we don't timeout whilst waiting for data to stabilise. + if (this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS) + { + //DMESG("ALLOWING BUFFERS TO FILL..."); + return; + } + + // Calculate the time since the last request for data. + // If this is above the given threshold and our channel is active, request that the upstream generation of data be stopped. + if (status & LEVEL_DETECTOR_SPL_DATA_REQUESTED && !listenerCount && (system_timer->getTime() - this->timestamp >= LEVEL_DETECTOR_SPL_TIMEOUT)) + { + //DMESG("LevelDetectorSPL: CALLBACK: DATA NO LONGER REQUIRED..."); + this->status &= ~LEVEL_DETECTOR_SPL_DATA_REQUESTED; + upstream.dataWanted(DATASTREAM_NOT_WANTED); + } } int LevelDetectorSPL::pullRequest() { - // If we're not manually activated, not held active by a timeout, and we have no-one waiting on our data, bail. - if( !activated && !(system_timer_current_time() - this->timeout < CODAL_STREAM_IDLE_TIMEOUT_MS) && resourceLock.getWaitCount() == 0 ) { - this->bufferCount = 0; - return DEVICE_BUSY; + //DMESG("LevelDetectorSPL: PR"); + + // If we haven't requested data and there are no active listeners, there's nothing to do. + if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS ) { + this->bufferCount++; // Here to prevent this endlessly increasing + return DEVICE_OK; + } + + if( this->resourceLock.getWaitCount() > 0 ) + this->resourceLock.notifyAll(); + + if (!(status & LEVEL_DETECTOR_SPL_DATA_REQUESTED || listenerCount)) + { + //DMESG("LevelDetectorSPL: PR: ignoring data"); + return DEVICE_OK; } ManagedBuffer b = upstream.pull(); @@ -153,13 +186,6 @@ int LevelDetectorSPL::pullRequest() * EMIT EVENTS ******************************/ - if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS ) { - this->bufferCount++; // Here to prevent this endlessly increasing - return DEVICE_OK; - } - if( this->resourceLock.getWaitCount() > 0 ) - this->resourceLock.notifyAll(); - // HIGH THRESHOLD if ((!(status & LEVEL_DETECTOR_SPL_HIGH_THRESHOLD_PASSED)) && level > highThreshold) { @@ -217,31 +243,35 @@ int LevelDetectorSPL::pullRequest() float LevelDetectorSPL::getValue( int scale ) { - if( !this->upstream.isConnected() ) - this->upstream.connect( *this ); + // Update out timestamp. + this->timestamp = system_timer->getTime(); + + if (!(status & LEVEL_DETECTOR_SPL_DATA_REQUESTED)) + { + // We've just been asked for data after a (potentially) long wait. + // Let our upstream components know that we're interested in data again. + //DMESG("LevelDetectorSPL: getValue: dataWanted(1)"); + status |= LEVEL_DETECTOR_SPL_DATA_REQUESTED; + upstream.dataWanted(DATASTREAM_WANTED); + } // Lock the resource, THEN bump the timout, so we get consistent on-time - if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS ) + if(this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS) + { + //DMESG("WAITING ON LevelDetectorSPL::resourceLock..."); + //codal_dmesg_flush(); resourceLock.wait(); - - this->timeout = system_timer_current_time(); + //DMESG("Escaped!"); + //codal_dmesg_flush(); + } return splToUnit( this->level, scale ); } -void LevelDetectorSPL::activateForEvents( bool state ) -{ - this->activated = state; - if( this->activated && !this->upstream.isConnected() ) { - this->upstream.connect( *this ); - } -} - void LevelDetectorSPL::disable(){ enabled = false; } - int LevelDetectorSPL::setLowThreshold(float value) { // Convert specified unit into db if necessary @@ -353,6 +383,12 @@ float LevelDetectorSPL::unitToSpl(float level, int queryUnit) return level; } +void LevelDetectorSPL::listenerAdded() +{ + this->listenerCount++; + this->getValue(); +} + LevelDetectorSPL::~LevelDetectorSPL() { } diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp index 10d6fdff..cb822c35 100644 --- a/source/streams/StreamSplitter.cpp +++ b/source/streams/StreamSplitter.cpp @@ -112,13 +112,17 @@ int SplitterChannel::requestSampleDropRate( int sampleDropRate ) return this->sampleDropRate; } -void SplitterChannel::dataWanted(bool wanted) +void SplitterChannel::dataWanted(int wanted) { - DataSource::dataWanted(wanted); - return parent->dataWanted(wanted); + // Only pass along the requets if our status has changed. + if (wanted != DataSource::isWanted()) + { + //DMESG("SplitterChannel[%p]: dataWanted: %d", this, wanted); + DataSource::dataWanted(wanted); + parent->dataWanted(wanted); + } } - /** * Creates a component that distributes a single upstream datasource to many downstream datasinks * @@ -138,21 +142,17 @@ StreamSplitter::StreamSplitter(DataSource &source, uint16_t id) : upstream(sourc StreamSplitter::~StreamSplitter() { - // Nop. } -void StreamSplitter::dataWanted(bool wanted) +void StreamSplitter::dataWanted(int wanted) { // Determine if any of our active splitter channels require data. - bool streamWanted = 0; + int streamWanted = DATASTREAM_DONT_CARE; for(int i=0; iisWanted()) - { - streamWanted = 1; - break; - } + if(outputChannels[i] && outputChannels[i]->isWanted() > streamWanted) + streamWanted = outputChannels[i]->isWanted(); } return upstream.dataWanted(streamWanted);