Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ PR-1071-2] add sendRequest.done() to release resource together #1110

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,21 +1419,37 @@ func (p *partitionProducer) Close() {
<-cp.doneCh
}

//nolint:all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure we remove this nolint in a following patch.

type sendRequest struct {
ctx context.Context
msg *ProducerMessage
callback func(MessageID, *ProducerMessage, error)
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
blockCh chan struct{}
closeBlockChOnce *sync.Once
totalChunks int
chunkID int
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
ctx context.Context
msg *ProducerMessage
callback func(MessageID, *ProducerMessage, error)
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
blockCh chan struct{}
closeBlockChOnce *sync.Once
totalChunks int
chunkID int
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
producer *partitionProducer
memLimit internal.MemoryLimitController
semaphore internal.Semaphore
reservedSemaphore int
sendAsBatch bool
schema Schema
schemaVersion []byte
uncompressedPayload []byte
uncompressedSize int64
compressedPayload []byte
compressedSize int
payloadChunkSize int
mm *pb.MessageMetadata
deliverAt time.Time
maxMessageSize int32
}

// stopBlock can be invoked multiple times safety
Expand All @@ -1443,6 +1459,55 @@ func (sr *sendRequest) stopBlock() {
})
}

//nolint:all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

func (sr *sendRequest) done(msgID MessageID, err error) {
if err == nil {
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
sr.producer.metrics.MessagesPublished.Inc()
sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
}

if err != nil {
sr.producer.log.WithError(err).
WithField("size", sr.reservedMem).
WithField("properties", sr.msg.Properties)
}

if err == errSendTimeout {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}

if err == errMessageTooLarge {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}

if sr.semaphore != nil {
for i := 0; i < sr.reservedSemaphore; i++ {
sr.semaphore.Release()
}
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
}

if sr.memLimit != nil {
sr.memLimit.ReleaseMemory(sr.reservedMem)
sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
}

if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, msgID, sr.msg, err)
})

if sr.transaction != nil {
sr.transaction.endSendOrAckOp(err)
}

if sr.producer.options.Interceptors != nil && err == nil {
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
}
}
}

type closeProducer struct {
doneCh chan struct{}
}
Expand Down