Skip to content

Commit

Permalink
Delegate HasBacklog to ConsumerRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Aug 31, 2019
1 parent 38f50b0 commit 99d2934
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
19 changes: 19 additions & 0 deletions src/Disruptor/Dsl/ConsumerRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ public void Add<T>(WorkerPool<T> workerPool, ISequenceBarrier sequenceBarrier)
}
}

public bool HasBacklog(long cursor, bool includeStopped)
{
foreach (var consumerInfo in _consumerInfos)
{
if ((includeStopped || consumerInfo.IsRunning) && consumerInfo.IsEndOfChain)
{
foreach (var sequence in consumerInfo.Sequences)
{
if (cursor > sequence.Value)
{
return true;
}
}
}
}

return false;
}

public ISequence[] GetLastSequenceInChain(bool includeStopped)
{
var lastSequence = new List<ISequence>();
Expand Down
7 changes: 1 addition & 6 deletions src/Disruptor/Dsl/Disruptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,7 @@ public void Shutdown(TimeSpan timeout)
private bool HasBacklog()
{
var cursor = _ringBuffer.Cursor;
foreach (var sequence in _consumerRepository.GetLastSequenceInChain(false))
{
if (cursor > sequence.Value)
return true;
}
return false;
return _consumerRepository.HasBacklog(cursor, false);
}

internal EventHandlerGroup<T> CreateEventProcessors(ISequence[] barrierSequences, IEventHandler<T>[] eventHandlers)
Expand Down
7 changes: 1 addition & 6 deletions src/Disruptor/Dsl/ValueDisruptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,7 @@ public void Shutdown(TimeSpan timeout)
private bool HasBacklog()
{
var cursor = _ringBuffer.Cursor;
foreach (var sequence in _consumerRepository.GetLastSequenceInChain(false))
{
if (cursor > sequence.Value)
return true;
}
return false;
return _consumerRepository.HasBacklog(cursor, false);
}

internal ValueEventHandlerGroup<T> CreateEventProcessors(ISequence[] barrierSequences, IValueEventHandler<T>[] eventHandlers)
Expand Down

0 comments on commit 99d2934

Please sign in to comment.