diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 82123f988a..2beb36144a 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1419,21 +1419,37 @@ func (p *partitionProducer) Close() { <-cp.doneCh } +//nolint:all type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + blockCh chan struct{} + closeBlockChOnce *sync.Once + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + transaction *transaction + reservedMem int64 + producer *partitionProducer + memLimit internal.MemoryLimitController + semaphore internal.Semaphore + reservedSemaphore int + sendAsBatch bool + schema Schema + schemaVersion []byte + uncompressedPayload []byte + uncompressedSize int64 + compressedPayload []byte + compressedSize int + payloadChunkSize int + mm *pb.MessageMetadata + deliverAt time.Time + maxMessageSize int32 } // stopBlock can be invoked multiple times safety @@ -1443,6 +1459,55 @@ func (sr *sendRequest) stopBlock() { }) } +//nolint:all +func (sr *sendRequest) done(msgID MessageID, err error) { + if err == nil { + sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) + sr.producer.metrics.MessagesPublished.Inc() + sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + } + + if err != nil { + sr.producer.log.WithError(err). + WithField("size", sr.reservedMem). + WithField("properties", sr.msg.Properties) + } + + if err == errSendTimeout { + sr.producer.metrics.PublishErrorsTimeout.Inc() + } + + if err == errMessageTooLarge { + sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() + } + + if sr.semaphore != nil { + for i := 0; i < sr.reservedSemaphore; i++ { + sr.semaphore.Release() + } + sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore)) + } + + if sr.memLimit != nil { + sr.memLimit.ReleaseMemory(sr.reservedMem) + sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) + } + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, msgID, sr.msg, err) + }) + + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(err) + } + + if sr.producer.options.Interceptors != nil && err == nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } +} + type closeProducer struct { doneCh chan struct{} }