diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ca1a746234..d62d70e150 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -491,7 +491,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { p.internalFlushCurrentBatch() // after flushing try again to add the current payload - ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) + ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, + sr.schemaVersion, multiSchemaEnabled) if !ok { p.log.WithField("size", sr.uncompressedSize). WithField("properties", sr.msg.Properties). @@ -1514,12 +1515,12 @@ func (p *partitionProducer) _getConn() internal.Connection { return p.conn.Load().(internal.Connection) } -func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { +func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { //nolint:unused p.publishSemaphore.Release() p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { //nolint:unused if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) @@ -1535,7 +1536,7 @@ func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { return true } -func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool { +func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool { //nolint:unused if p.options.DisableBlockIfQueueFull { if !p.client.memLimit.TryReserveMemory(size) { p.publishSemaphore.Release()