Skip to content

Commit

Permalink
Refactor Control poll using ControlledPoll
Browse files Browse the repository at this point in the history
This iteration just returns action continue on the top level everywhere, so this shouldn't really have any impact so far.
  • Loading branch information
grddev committed Mar 6, 2024
1 parent 4301769 commit 54bafc0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
4 changes: 3 additions & 1 deletion archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lirm/aeron-go/aeron"
"github.com/lirm/aeron-go/aeron/atomic"
"github.com/lirm/aeron-go/aeron/logbuffer"
"github.com/lirm/aeron-go/aeron/logbuffer/term"
"github.com/lirm/aeron-go/aeron/logging"
"github.com/lirm/aeron-go/archive/codecs"
)
Expand Down Expand Up @@ -318,8 +319,9 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) {

for archive.Control.State.state != ControlStateConnected && archive.Control.State.err == nil {
fragments := archive.Control.poll(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
ConnectionControlFragmentHandler(&pollContext, buf, offset, length, header)
return term.ControlledPollActionContinue
}, 1)
if fragments > 0 {
logger.Debugf("Read %d fragment(s)", fragments)
Expand Down
13 changes: 8 additions & 5 deletions archive/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,9 @@ func (control *Control) PollForErrorResponse() (int, error) {
// Poll for async events, errors etc until the queue is drained
for {
ret := control.poll(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
errorResponseFragmentHandler(&context, buf, offset, length, header)
return term.ControlledPollActionContinue
}, 1)
received += ret

Expand Down Expand Up @@ -506,15 +507,15 @@ func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, of

// poll provides the control response poller using local state to pass
// back data from the underlying subscription
func (control *Control) poll(handler term.FragmentHandler, fragmentLimit int) int {
func (control *Control) poll(handler term.ControlledFragmentHandler, fragmentLimit int) int {

// Update our globals in case they've changed so we use the current state in our callback
rangeChecking = control.archive.Options.RangeChecking

control.Results.ControlResponse = nil // Clear old results
control.Results.IsPollComplete = false // Clear completion flag

return control.Subscription.Poll(handler, fragmentLimit)
return control.Subscription.ControlledPoll(handler, fragmentLimit)
}

// Poll for control response events. Returns number of fragments read during the operation.
Expand Down Expand Up @@ -607,8 +608,9 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i
start := time.Now()
context := PollContext{control, correlationID}

handler := aeron.NewFragmentAssembler(func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
handler := aeron.NewControlledFragmentAssembler(func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
controlFragmentHandler(&context, buf, offset, length, header)
return term.ControlledPollActionContinue
}, aeron.DefaultFragmentAssemblyBufferLength)
for {
ret := control.poll(handler.OnFragment, 1)
Expand Down Expand Up @@ -799,8 +801,9 @@ func (control *Control) PollForDescriptors(correlationID int64, sessionID int64,
for !control.Results.IsPollComplete {
logger.Debugf("PollForDescriptors(%d:%d, %d)", correlationID, sessionID, int(fragmentsWanted)-descriptorCount)
fragments := control.poll(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
DescriptorFragmentHandler(&pollContext, buf, offset, length, header)
return term.ControlledPollActionContinue
}, int(fragmentsWanted)-descriptorCount)
logger.Debugf("Poll(%d:%d) returned %d fragments", correlationID, sessionID, fragments)
descriptorCount = len(control.Results.RecordingDescriptors) + len(control.Results.RecordingSubscriptionDescriptors)
Expand Down

0 comments on commit 54bafc0

Please sign in to comment.