Skip to content

Commit

Permalink
Refactor message confirmation
Browse files Browse the repository at this point in the history
At the raw layer, the client uses two different commands for publish
confirmation and publish error. At the smart layer, we abstract that
away and consider a publish error a "nack" or negative publish
confirmation. We have to adapt the channel in the producer, so that it
can receive publish confirmations or publish errors in the same channel.

Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Oct 25, 2023
1 parent 967fda5 commit 80ec3cc
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 18 deletions.
1 change: 0 additions & 1 deletion internal/constants.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/raw/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
for {
select {
case <-ctx.Done():
// FIXME: most of the time, cancelling the context will be OK and the intended shutdown
// probably should not return an error, just log that frame handler is stopping
log.Info("context cancelled", "reason", ctx.Err())
return ctx.Err()
default:
Expand Down
51 changes: 41 additions & 10 deletions pkg/stream/confirmation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stream
import (
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/codecs/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw"
"sync"
"time"
)
Expand All @@ -11,20 +12,25 @@ type ConfirmationStatus int

const (
// WaitingConfirmation for a publishing message
WaitingConfirmation ConfirmationStatus = 0
WaitingConfirmation ConfirmationStatus = iota
// Confirmed and received message by the server
Confirmed ConfirmationStatus = 1
Confirmed
// ClientTimeout and gave up waiting for a confirmation
ClientTimeout ConfirmationStatus = 2
ClientTimeout
// NotAvailable Stream, often because stream was deleted
NotAvailable ConfirmationStatus = 6
NotAvailable
// InternalError from the server
InternalError ConfirmationStatus = 15
// TODO: do we need this?
AccessRefused ConfirmationStatus = 16
PreconditionFailed ConfirmationStatus = 17
PublisherDoesNotExist ConfirmationStatus = 18
UndefinedError ConfirmationStatus = 200
InternalError
// AccessRefused user did not have permissions to publish to the stream
AccessRefused
// PreconditionFailed means that certain conditions required to publish
// were not met e.g. stream does not exist
PreconditionFailed
// PublisherDoesNotExist happens when the client tries to publish before the
// publisher was registered and assigned an ID
PublisherDoesNotExist
// UndefinedError is for any other error
UndefinedError
)

// TODO: docs
Expand Down Expand Up @@ -58,6 +64,31 @@ func (m *MessageConfirmation) Stream() string {
return m.stream
}

type publishConfirmOrError struct {
publishingId uint64
statusCode uint16
}

// status translates a raw response code into a ConfirmationStatus
func (p *publishConfirmOrError) status() ConfirmationStatus {
switch p.statusCode {
case raw.ResponseCodeOK:
return Confirmed
case raw.ResponseCodeStreamDoesNotExist, raw.ResponseCodeStreamNotAvailable:
return NotAvailable
case raw.ResponseCodeInternalError:
return InternalError
case raw.ResponseCodeAccessRefused:
return AccessRefused
case raw.ResponseCodePublisherDoesNotExist:
return PublisherDoesNotExist
case raw.ResponseCodePreconditionFailed:
return PreconditionFailed
default:
return UndefinedError
}
}

type confirmationTracker struct {
mapMu *sync.Mutex
messages map[uint64]*MessageConfirmation
Expand Down
10 changes: 5 additions & 5 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type standardProducer struct {
unconfirmedMessage *confirmationTracker
// this channel is used by the producer manager to send confirmation notifications
// the end-user does not have access to this channel
confirmedPublish chan uint64
confirmedPublish chan *publishConfirmOrError
}

func newStandardProducer(publisherId uint8, rawClient raw.Clienter, opts *ProducerOptions) *standardProducer {
Expand All @@ -90,7 +90,7 @@ func newStandardProducer(publisherId uint8, rawClient raw.Clienter, opts *Produc
},
done: make(chan struct{}),
unconfirmedMessage: newConfirmationTracker(opts.MaxInFlight),
confirmedPublish: make(chan uint64),
confirmedPublish: make(chan *publishConfirmOrError),
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -200,14 +200,14 @@ func (s *standardProducer) confirmationListenerLoop() {
select {
case <-s.done:
return
case id := <-s.confirmedPublish:
msgConfirm, err := s.unconfirmedMessage.confirm(id)
case confirmOrError := <-s.confirmedPublish:
msgConfirm, err := s.unconfirmedMessage.confirm(confirmOrError.publishingId)
if err != nil {
// TODO: log the error instead
panic(err)
}
msgConfirm.status = confirmOrError.status()
if s.opts.ConfirmationHandler != nil {
msgConfirm.status = Confirmed
s.opts.ConfirmationHandler(msgConfirm)
}
// TODO: do we need an else { msgConfirm = nil } to ease the job of the GC?
Expand Down
7 changes: 5 additions & 2 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,11 @@ var _ = Describe("Smart Producer", func() {
Fail("expected to be unblocked by the mock, but we are still waiting")
}

// faking the producer manager receiving and forwarding a publish confirm
p.confirmedPublish <- 0
// faking the producer manager receiving and forwarding a 'publish confirm'
p.confirmedPublish <- &publishConfirmOrError{
publishingId: 0,
statusCode: 1,
}

var mc MessageConfirmation
Eventually(pingBack).Should(Receive(&mc))
Expand Down

0 comments on commit 80ec3cc

Please sign in to comment.