From 6b4443ee66828f34e02c92f31ca81f10f9f1d5c5 Mon Sep 17 00:00:00 2001 From: gunli Date: Wed, 30 Aug 2023 15:55:50 +0800 Subject: [PATCH] refactor: call sendRequest.done() in failPendingMessages() --- pulsar/producer_partition.go | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 112c8921b7..bbafb0cd00 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1371,6 +1371,7 @@ func (p *partitionProducer) failPendingMessages() { if viewSize <= 0 { return } + p.log.Infof("Failing %d messages on closing producer", viewSize) lastViewItem := curViewItems[viewSize-1].(*pendingItem) @@ -1389,24 +1390,9 @@ func (p *partitionProducer) failPendingMessages() { pi.Lock() for _, i := range pi.sendRequests { - sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.log.WithError(errProducerClosed). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errProducerClosed) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) + sr, ok := i.(*sendRequest) + if ok { + sr.done(nil, errProducerClosed) } } @@ -1502,13 +1488,16 @@ func (sr *sendRequest) done(msgID MessageID, err error) { sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) } - if err == errSendTimeout { - sr.producer.metrics.PublishErrorsTimeout.Inc() + if err != nil { sr.producer.log.WithError(err). WithField("size", sr.reservedMem). WithField("properties", sr.msg.Properties) } + if err == errSendTimeout { + sr.producer.metrics.PublishErrorsTimeout.Inc() + } + if err == errMessageTooLarge { sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() }