Skip to content

Commit

Permalink
Update PollForResponse and PollForErrorResponse to use ControlledPoll
Browse files Browse the repository at this point in the history
This updates the code to poll for up to 10 fragments at once, but we use the controlled poll as a mechanism to ensure we are not committing to more messages than what is needed.
  • Loading branch information
grddev committed Mar 6, 2024
1 parent 54bafc0 commit 6fafc74
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions archive/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,19 @@ func init() {
codecIds.recordingStopped = recordingStopped.SbeTemplateId()
}

func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) {
action = term.ControlledPollActionContinue

pollContext, ok := context.(*PollContext)
if !ok {
logger.Errorf("context conversion failed")
return
}

if pollContext.control.Results.IsPollComplete {
return term.ControlledPollActionAbort
}

logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", pollContext.correlationID, offset, length, header)
var hdr codecs.SbeGoMessageHeader

Expand Down Expand Up @@ -170,6 +176,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i
logger.Debugf("controlFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId)
control.Results.ControlResponse = controlResponse
control.Results.IsPollComplete = true

return term.ControlledPollActionBreak
} else {
logger.Debugf("controlFragmentHandler/controlResponse ignoring sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId)
}
Expand Down Expand Up @@ -198,6 +206,7 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i
// This can happen when testing/adding new functionality
fmt.Printf("controlFragmentHandler: Unexpected message type %d\n", hdr.TemplateId)
}
return
}

// ConnectionControlFragmentHandler is the connection handling specific fragment handler.
Expand Down Expand Up @@ -355,9 +364,8 @@ func (control *Control) PollForErrorResponse() (int, error) {
for {
ret := control.poll(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
errorResponseFragmentHandler(&context, buf, offset, length, header)
return term.ControlledPollActionContinue
}, 1)
return errorResponseFragmentHandler(&context, buf, offset, length, header)
}, 10)
received += ret

// If we received a response with an error then return it
Expand All @@ -380,13 +388,19 @@ func (control *Control) PollForErrorResponse() (int, error) {
// ignore messages not on our session ID
// process recordingSignalEvents
// Log a warning if we have interrupted a synchronous event
func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) {
action = term.ControlledPollActionContinue

pollContext, ok := context.(*PollContext)
if !ok {
logger.Errorf("context conversion failed")
return
}

if pollContext.control.Results.ErrorResponse != nil {
return term.ControlledPollActionAbort
}

logger.Debugf("errorResponseFragmentHandler: offset:%d length: %d", offset, length)

var hdr codecs.SbeGoMessageHeader
Expand Down Expand Up @@ -422,6 +436,7 @@ func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, of
if controlResponse.ControlSessionId == pollContext.control.archive.SessionID {
if controlResponse.Code == codecs.ControlResponseCode.ERROR {
pollContext.control.Results.ErrorResponse = fmt.Errorf("PollForErrorResponse received a ControlResponse (correlationId:%d Code:ERROR error=\"%s\"", controlResponse.CorrelationId, controlResponse.ErrorMessage)
return term.ControlledPollActionBreak
}
}
return
Expand Down Expand Up @@ -503,6 +518,8 @@ func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, of
default:
fmt.Printf("errorResponseFragmentHandler: Insert decoder for type: %d", hdr.TemplateId)
}

return
}

// poll provides the control response poller using local state to pass
Expand Down Expand Up @@ -609,11 +626,10 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i
context := PollContext{control, correlationID}

handler := aeron.NewControlledFragmentAssembler(func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
controlFragmentHandler(&context, buf, offset, length, header)
return term.ControlledPollActionContinue
return controlFragmentHandler(&context, buf, offset, length, header)
}, aeron.DefaultFragmentAssemblyBufferLength)
for {
ret := control.poll(handler.OnFragment, 1)
ret := control.poll(handler.OnFragment, 10)

// Check result
if control.Results.IsPollComplete {
Expand Down

0 comments on commit 6fafc74

Please sign in to comment.