diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 22ec8e60..aef085f1 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -114,19 +114,22 @@ func (s *standardProducer) doSend(ctx context.Context) error { s.rawClientMu.Lock() defer s.rawClientMu.Unlock() - if s.accumulator.isEmpty() { + if !s.canSend() { + // TODO: maybe log a message return nil } // TODO: explore if we can have this buffer in a sync.Pool messages := make([]PublishingMessage, 0, s.opts.MaxBufferedMessages) - for batchSize := 0; batchSize < s.opts.MaxBufferedMessages; batchSize++ { + for batchSize := 0; batchSize < s.opts.MaxBufferedMessages && s.canSend(); batchSize++ { pm := s.accumulator.get() if pm == nil { break } messages = append(messages, pm) + // the smart layer only supports AMQP 1.0 message formats + // we should never panic here m := pm.Message().(*amqp.Message) err := s.unconfirmedMessage.addWithTimeout(&MessageConfirmation{ publishingId: pm.PublishingId(), @@ -141,6 +144,12 @@ func (s *standardProducer) doSend(ctx context.Context) error { } } + if len(messages) == 0 { + // this case happens when pending confirms == max in-flight messages + // we return so that we don't send an empty Publish frame + return nil + } + err := s.rawClient.Send(ctx, s.publisherId, messages) if err != nil { return err @@ -149,6 +158,11 @@ func (s *standardProducer) doSend(ctx context.Context) error { return nil } +func (s *standardProducer) canSend() bool { + return len(s.unconfirmedMessage.unconfirmedMessagesSemaphore) != s.opts.MaxInFlight && + !s.accumulator.isEmpty() +} + func (s *standardProducer) sendLoopAsync(ctx context.Context) { var publishingDelay time.Duration if s.opts.BatchPublishingDelay == 0 { diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 5a284732..180c02ca 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -261,7 +261,7 @@ var _ = Describe("Smart Producer", func() { }) When("the pending confirmations is greater or equal than max in flight", func() { - It("returns an error after a timeout", func() { + It("does not send the message and keeps the message in the message buffer", func() { // setup wait := make(chan struct{}) fakeRawClient.EXPECT(). @@ -276,7 +276,7 @@ var _ = Describe("Smart Producer", func() { p = newStandardProducer(12, fakeRawClient, &ProducerOptions{ MaxInFlight: 3, MaxBufferedMessages: 3, - BatchPublishingDelay: time.Millisecond * 5000, // we want to publish on max buffered + BatchPublishingDelay: time.Millisecond * 1500, // we want to publish on max buffered EnqueueTimeout: time.Millisecond * 10, // we want to time out quickly ConfirmationHandler: nil, stream: "test-stream", @@ -294,31 +294,12 @@ var _ = Describe("Smart Producer", func() { Fail("time out waiting for the mock to unblock us") } - //Expect(p.Send(context.Background(), message)).To(MatchError(ErrEnqueueTimeout)) Expect(p.Send(context.Background(), message)).To(Succeed()) - }) - - It("waits until some messages are confirmed", Pending, func() { - // setup - wait := make(chan struct{}) - fakeRawClient.EXPECT(). - Send(gomock.AssignableToTypeOf(ctxType), - gomock.AssignableToTypeOf(uint8(42)), - gomock.AssignableToTypeOf([]common.PublishingMessager{}), - ).DoAndReturn(func(_ context.Context, _ uint8, _ []common.PublishingMessager) error { - close(wait) - return nil - }) - - p = newStandardProducer(12, fakeRawClient, &ProducerOptions{ - MaxInFlight: 2, - MaxBufferedMessages: 3, - BatchPublishingDelay: time.Millisecond * 5000, // we want to publish on max buffered - EnqueueTimeout: time.Millisecond * 10, // we want to time out quickly - ConfirmationHandler: nil, - stream: "test-stream", - }) - + Consistently(p.accumulator.messages). + WithPolling(time.Millisecond * 200). + Within(time.Millisecond * 1600). + Should(HaveLen(1)) // 3 messages were sent, 1 message is buffered + Expect(p.unconfirmedMessage.messages).To(HaveLen(3)) // 3 messages were sent, 3 confirmations are pending }) }) })