Skip to content

Commit

Permalink
WIP - message confirmation
Browse files Browse the repository at this point in the history
Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Oct 24, 2023
1 parent 6e09e69 commit 967fda5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
18 changes: 16 additions & 2 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,22 @@ func (s *standardProducer) doSend(ctx context.Context) error {
s.rawClientMu.Lock()
defer s.rawClientMu.Unlock()

if s.accumulator.isEmpty() {
if !s.canSend() {
// TODO: maybe log a message
return nil
}

// TODO: explore if we can have this buffer in a sync.Pool
messages := make([]PublishingMessage, 0, s.opts.MaxBufferedMessages)
for batchSize := 0; batchSize < s.opts.MaxBufferedMessages; batchSize++ {
for batchSize := 0; batchSize < s.opts.MaxBufferedMessages && s.canSend(); batchSize++ {
pm := s.accumulator.get()
if pm == nil {
break
}
messages = append(messages, pm)

// the smart layer only supports AMQP 1.0 message formats
// we should never panic here
m := pm.Message().(*amqp.Message)
err := s.unconfirmedMessage.addWithTimeout(&MessageConfirmation{
publishingId: pm.PublishingId(),
Expand All @@ -141,6 +144,12 @@ func (s *standardProducer) doSend(ctx context.Context) error {
}
}

if len(messages) == 0 {
// this case happens when pending confirms == max in-flight messages
// we return so that we don't send an empty Publish frame
return nil
}

err := s.rawClient.Send(ctx, s.publisherId, messages)
if err != nil {
return err
Expand All @@ -149,6 +158,11 @@ func (s *standardProducer) doSend(ctx context.Context) error {
return nil
}

func (s *standardProducer) canSend() bool {
return len(s.unconfirmedMessage.unconfirmedMessagesSemaphore) != s.opts.MaxInFlight &&
!s.accumulator.isEmpty()
}

func (s *standardProducer) sendLoopAsync(ctx context.Context) {
var publishingDelay time.Duration
if s.opts.BatchPublishingDelay == 0 {
Expand Down
33 changes: 7 additions & 26 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ var _ = Describe("Smart Producer", func() {
})

When("the pending confirmations is greater or equal than max in flight", func() {
It("returns an error after a timeout", func() {
It("does not send the message and keeps the message in the message buffer", func() {
// setup
wait := make(chan struct{})
fakeRawClient.EXPECT().
Expand All @@ -276,7 +276,7 @@ var _ = Describe("Smart Producer", func() {
p = newStandardProducer(12, fakeRawClient, &ProducerOptions{
MaxInFlight: 3,
MaxBufferedMessages: 3,
BatchPublishingDelay: time.Millisecond * 5000, // we want to publish on max buffered
BatchPublishingDelay: time.Millisecond * 1500, // we want to publish on max buffered
EnqueueTimeout: time.Millisecond * 10, // we want to time out quickly
ConfirmationHandler: nil,
stream: "test-stream",
Expand All @@ -294,31 +294,12 @@ var _ = Describe("Smart Producer", func() {
Fail("time out waiting for the mock to unblock us")
}

//Expect(p.Send(context.Background(), message)).To(MatchError(ErrEnqueueTimeout))
Expect(p.Send(context.Background(), message)).To(Succeed())
})

It("waits until some messages are confirmed", Pending, func() {
// setup
wait := make(chan struct{})
fakeRawClient.EXPECT().
Send(gomock.AssignableToTypeOf(ctxType),
gomock.AssignableToTypeOf(uint8(42)),
gomock.AssignableToTypeOf([]common.PublishingMessager{}),
).DoAndReturn(func(_ context.Context, _ uint8, _ []common.PublishingMessager) error {
close(wait)
return nil
})

p = newStandardProducer(12, fakeRawClient, &ProducerOptions{
MaxInFlight: 2,
MaxBufferedMessages: 3,
BatchPublishingDelay: time.Millisecond * 5000, // we want to publish on max buffered
EnqueueTimeout: time.Millisecond * 10, // we want to time out quickly
ConfirmationHandler: nil,
stream: "test-stream",
})

Consistently(p.accumulator.messages).
WithPolling(time.Millisecond * 200).
Within(time.Millisecond * 1600).
Should(HaveLen(1)) // 3 messages were sent, 1 message is buffered
Expect(p.unconfirmedMessage.messages).To(HaveLen(3)) // 3 messages were sent, 3 confirmations are pending
})
})
})
Expand Down

0 comments on commit 967fda5

Please sign in to comment.