diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ebd292ec11..82123f988a 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -862,7 +862,7 @@ type pendingItem struct { sequenceID uint64 sentAt time.Time sendRequests []interface{} - completed bool + isDone bool flushCallback func(err error) } @@ -1001,7 +1001,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the sending has completed with error, flush make no effect - pi.Complete(errSendTimeout) + pi.done(errSendTimeout) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1076,7 +1076,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { pi.Lock() defer pi.Unlock() - if pi.completed { + if pi.isDone { // The last item in the queue has been completed while we were // looking at it. It's safe at this point to assume that every // message enqueued before Flush() was called are now persisted @@ -1281,7 +1281,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.Complete(nil) + pi.done(nil) } } @@ -1365,7 +1365,7 @@ func (p *partitionProducer) failPendingMessages() { } // flag the sending has completed with error, flush make no effect - pi.Complete(errProducerClosed) + pi.done(errProducerClosed) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1452,11 +1452,11 @@ type flushRequest struct { err error } -func (i *pendingItem) Complete(err error) { - if i.completed { +func (i *pendingItem) done(err error) { + if i.isDone { return } - i.completed = true + i.isDone = true buffersPool.Put(i.buffer) if i.flushCallback != nil { i.flushCallback(err)