Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR-1071-5] refactor internalSend #1114

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 64 additions & 209 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -478,231 +477,87 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg
cb(id, msg, err)
}

func (p *partitionProducer) internalSend(request *sendRequest) {
p.log.Debug("Received send request: ", *request.msg)

msg := request.msg

// read payload from message
uncompressedPayload := msg.Payload

var schemaPayload []byte
var err error

// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request) {
return
}
func (p *partitionProducer) internalSend(sr *sendRequest) {
if sr.sendAsBatch {
smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
p.internalFlushCurrentBatch()

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
schema = msg.Schema
} else if p.options.Schema != nil {
schema = p.options.Schema
}
if msg.Value != nil {
// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
// after flushing try again to add the current payload
ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt,
sr.schemaVersion, multiSchemaEnabled)
if !ok {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, errFailAddToBatch)
return
}
}
}
if uncompressedPayload == nil {
uncompressedPayload = schemaPayload
}

if schema != nil {
schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
if sr.flushImmediately {
p.internalFlushCurrentBatch()
}
}

uncompressedSize := len(uncompressedPayload)

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(request, int64(uncompressedSize)) {
return
}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
}

// set default ReplicationClusters when DisableReplication
if msg.DisableReplication {
msg.ReplicationClusters = []string{"__local__"}
}

mm := p.genMetadata(msg, uncompressedSize, deliverAt)

sendAsBatch := !p.options.DisableBatching &&
msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0

// Once the batching is enabled, it can close blockCh early to make block finish
if sendAsBatch {
request.stopBlock()
} else {
// update sequence id for metadata, make the size of msgMetadata more accurate
// batch sending will update sequence ID in the BatchBuilder
p.updateMetadataSeqID(mm, msg)
}

maxMessageSize := int(p._getConn().GetMaxMessageSize())

// compress payload if not batching
var compressedPayload []byte
var compressedSize int
var checkSize int
if !sendAsBatch {
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize = len(compressedPayload)
checkSize = compressedSize

// set the compress type in msgMetaData
compressionType := pb.CompressionType(p.options.CompressionType)
if compressionType != pb.CompressionType_NONE {
mm.Compression = &compressionType
}
} else {
// final check for batching message is in serializeMessage
// this is a double check
checkSize = uncompressedSize
}

// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
if sr.totalChunks <= 1 {
p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize))
return
}

var totalChunks int
// max chunk payload size
var payloadChunkSize int
if sendAsBatch || !p.options.EnableChunking {
totalChunks = 1
payloadChunkSize = int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}

// set total chunks to send request
request.totalChunks = totalChunks

if !sendAsBatch {
if totalChunks > 1 {
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
cr := newChunkRecorder()
for chunkID := 0; chunkID < totalChunks; chunkID++ {
lhs = chunkID * payloadChunkSize
if rhs = lhs + payloadChunkSize; rhs > compressedSize {
rhs = compressedSize
}
// update chunk id
mm.ChunkId = proto.Int32(int32(chunkID))
nsr := &sendRequest{
ctx: request.ctx,
msg: request.msg,
callback: request.callback,
callbackOnce: request.callbackOnce,
publishTime: request.publishTime,
blockCh: request.blockCh,
closeBlockChOnce: request.closeBlockChOnce,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
reservedMem: int64(rhs - lhs),
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
}
// close the blockCh when all the chunks acquired permits
request.stopBlock()
} else {
// close the blockCh when totalChunks is 1 (it has acquired permits)
request.stopBlock()
p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
}
} else {
smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry

p.internalFlushCurrentBatch()
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10))
sr.mm.Uuid = proto.String(uuid)
sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
cr := newChunkRecorder()

// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
lhs = chunkID * sr.payloadChunkSize
rhs = lhs + sr.payloadChunkSize
if rhs > sr.compressedSize {
rhs = sr.compressedSize
}
if request.flushImmediately {

p.internalFlushCurrentBatch()

// update chunk id
sr.mm.ChunkId = proto.Int32(int32(chunkID))
nsr := &sendRequest{
producer: sr.producer,
ctx: sr.ctx,
msg: sr.msg,
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
flushImmediately: sr.flushImmediately,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: sr.transaction,
memLimit: sr.memLimit,
semaphore: sr.semaphore,
reservedSemaphore: 1,
reservedMem: int64(rhs - lhs),
sendAsBatch: sr.sendAsBatch,
schema: sr.schema,
schemaVersion: sr.schemaVersion,
uncompressedPayload: sr.uncompressedPayload,
uncompressedSize: sr.uncompressedSize,
compressedPayload: sr.compressedPayload,
compressedSize: sr.compressedSize,
payloadChunkSize: sr.payloadChunkSize,
mm: sr.mm,
deliverAt: sr.deliverAt,
maxMessageSize: sr.maxMessageSize,
}

p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
}
}

Expand Down