diff --git a/src/Disruptor/Dsl/ConsumerRepository.cs b/src/Disruptor/Dsl/ConsumerRepository.cs index d8cd5bd7..a94b808d 100644 --- a/src/Disruptor/Dsl/ConsumerRepository.cs +++ b/src/Disruptor/Dsl/ConsumerRepository.cs @@ -43,6 +43,25 @@ public void Add(WorkerPool workerPool, ISequenceBarrier sequenceBarrier) } } + public bool HasBacklog(long cursor, bool includeStopped) + { + foreach (var consumerInfo in _consumerInfos) + { + if ((includeStopped || consumerInfo.IsRunning) && consumerInfo.IsEndOfChain) + { + foreach (var sequence in consumerInfo.Sequences) + { + if (cursor > sequence.Value) + { + return true; + } + } + } + } + + return false; + } + public ISequence[] GetLastSequenceInChain(bool includeStopped) { var lastSequence = new List(); diff --git a/src/Disruptor/Dsl/Disruptor.cs b/src/Disruptor/Dsl/Disruptor.cs index 375f845c..3437c836 100644 --- a/src/Disruptor/Dsl/Disruptor.cs +++ b/src/Disruptor/Dsl/Disruptor.cs @@ -396,12 +396,7 @@ public void Shutdown(TimeSpan timeout) private bool HasBacklog() { var cursor = _ringBuffer.Cursor; - foreach (var sequence in _consumerRepository.GetLastSequenceInChain(false)) - { - if (cursor > sequence.Value) - return true; - } - return false; + return _consumerRepository.HasBacklog(cursor, false); } internal EventHandlerGroup CreateEventProcessors(ISequence[] barrierSequences, IEventHandler[] eventHandlers) diff --git a/src/Disruptor/Dsl/ValueDisruptor.cs b/src/Disruptor/Dsl/ValueDisruptor.cs index a3e7e768..f2cd47e4 100644 --- a/src/Disruptor/Dsl/ValueDisruptor.cs +++ b/src/Disruptor/Dsl/ValueDisruptor.cs @@ -314,12 +314,7 @@ public void Shutdown(TimeSpan timeout) private bool HasBacklog() { var cursor = _ringBuffer.Cursor; - foreach (var sequence in _consumerRepository.GetLastSequenceInChain(false)) - { - if (cursor > sequence.Value) - return true; - } - return false; + return _consumerRepository.HasBacklog(cursor, false); } internal ValueEventHandlerGroup CreateEventProcessors(ISequence[] barrierSequences, IValueEventHandler[] eventHandlers)