From 6fafc7467998012149254318598c97403e764c5a Mon Sep 17 00:00:00 2001 From: Gustav Munkby Date: Thu, 9 Nov 2023 15:39:07 +0100 Subject: [PATCH] Update PollForResponse and PollForErrorResponse to use ControlledPoll This updates the code to poll for up to 10 fragments at once, but we use the controlled poll as a mechanism to ensure we are not committing to more messages than what is needed. --- archive/control.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/archive/control.go b/archive/control.go index 54621274..27b9380b 100644 --- a/archive/control.go +++ b/archive/control.go @@ -113,13 +113,19 @@ func init() { codecIds.recordingStopped = recordingStopped.SbeTemplateId() } -func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { +func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) { + action = term.ControlledPollActionContinue + pollContext, ok := context.(*PollContext) if !ok { logger.Errorf("context conversion failed") return } + if pollContext.control.Results.IsPollComplete { + return term.ControlledPollActionAbort + } + logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", pollContext.correlationID, offset, length, header) var hdr codecs.SbeGoMessageHeader @@ -170,6 +176,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i logger.Debugf("controlFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId) control.Results.ControlResponse = controlResponse control.Results.IsPollComplete = true + + return term.ControlledPollActionBreak } else { logger.Debugf("controlFragmentHandler/controlResponse ignoring sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId) } @@ -198,6 +206,7 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i // This can happen when testing/adding new functionality fmt.Printf("controlFragmentHandler: Unexpected message type %d\n", hdr.TemplateId) } + return } // ConnectionControlFragmentHandler is the connection handling specific fragment handler. @@ -355,9 +364,8 @@ func (control *Control) PollForErrorResponse() (int, error) { for { ret := control.poll( func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction { - errorResponseFragmentHandler(&context, buf, offset, length, header) - return term.ControlledPollActionContinue - }, 1) + return errorResponseFragmentHandler(&context, buf, offset, length, header) + }, 10) received += ret // If we received a response with an error then return it @@ -380,13 +388,19 @@ func (control *Control) PollForErrorResponse() (int, error) { // ignore messages not on our session ID // process recordingSignalEvents // Log a warning if we have interrupted a synchronous event -func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { +func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) { + action = term.ControlledPollActionContinue + pollContext, ok := context.(*PollContext) if !ok { logger.Errorf("context conversion failed") return } + if pollContext.control.Results.ErrorResponse != nil { + return term.ControlledPollActionAbort + } + logger.Debugf("errorResponseFragmentHandler: offset:%d length: %d", offset, length) var hdr codecs.SbeGoMessageHeader @@ -422,6 +436,7 @@ func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, of if controlResponse.ControlSessionId == pollContext.control.archive.SessionID { if controlResponse.Code == codecs.ControlResponseCode.ERROR { pollContext.control.Results.ErrorResponse = fmt.Errorf("PollForErrorResponse received a ControlResponse (correlationId:%d Code:ERROR error=\"%s\"", controlResponse.CorrelationId, controlResponse.ErrorMessage) + return term.ControlledPollActionBreak } } return @@ -503,6 +518,8 @@ func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, of default: fmt.Printf("errorResponseFragmentHandler: Insert decoder for type: %d", hdr.TemplateId) } + + return } // poll provides the control response poller using local state to pass @@ -609,11 +626,10 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i context := PollContext{control, correlationID} handler := aeron.NewControlledFragmentAssembler(func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction { - controlFragmentHandler(&context, buf, offset, length, header) - return term.ControlledPollActionContinue + return controlFragmentHandler(&context, buf, offset, length, header) }, aeron.DefaultFragmentAssemblyBufferLength) for { - ret := control.poll(handler.OnFragment, 1) + ret := control.poll(handler.OnFragment, 10) // Check result if control.Results.IsPollComplete {