Skip to content

Commit

Permalink
Fixed escaped handler in PollForErrorResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
aayushduwadi committed Jun 6, 2024
1 parent f28bedc commit 2fbaf67
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 14 deletions.
1 change: 1 addition & 0 deletions archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) {
archive.Control.archive = archive
archive.Control.fragmentAssembler = aeron.NewControlledFragmentAssembler(
archive.Control.onFragment, aeron.DefaultFragmentAssemblyBufferLength)
archive.Control.errorFragmentHandler = archive.Control.errorResponseFragmentHandler

// Setup the Proxy (publisher/request)
archive.Proxy = new(Proxy)
Expand Down
20 changes: 6 additions & 14 deletions archive/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Control struct {

archive *Archive // link to parent
fragmentAssembler *aeron.ControlledFragmentAssembler

errorFragmentHandler term.ControlledFragmentHandler
}

// ControlResults for holding state over a Control request/response
Expand Down Expand Up @@ -355,9 +357,6 @@ func ConnectionControlFragmentHandler(context *PollContext, buffer *atomic.Buffe
// Returns an error if we detect an archive operation in progress
// and a count of how many messages were consumed
func (control *Control) PollForErrorResponse() (int, error) {

logger.Debugf("PollForErrorResponse(%d)", control.archive.SessionID)
context := PollContext{control, 0}
received := 0

control.Results.ErrorResponse = nil
Expand All @@ -367,10 +366,7 @@ func (control *Control) PollForErrorResponse() (int, error) {

// Poll for async events, errors etc until the queue is drained
for {
ret := control.poll(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) term.ControlledPollAction {
return errorResponseFragmentHandler(&context, buf, offset, length, header)
}, 10)
ret := control.poll(control.errorFragmentHandler, 10)
received += ret

// If we received a response with an error then return it
Expand All @@ -393,16 +389,12 @@ func (control *Control) PollForErrorResponse() (int, error) {
// ignore messages not on our session ID
// process recordingSignalEvents
// Log a warning if we have interrupted a synchronous event
func errorResponseFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) {
func (control *Control) errorResponseFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction) {
action = term.ControlledPollActionContinue

pollContext, ok := context.(*PollContext)
if !ok {
logger.Errorf("context conversion failed")
return
}
pollContext := PollContext{control, 0}

if pollContext.control.Results.ErrorResponse != nil {
if control.Results.ErrorResponse != nil {
return term.ControlledPollActionAbort
}

Expand Down

0 comments on commit 2fbaf67

Please sign in to comment.