From 803af709f4564d9af0f1b8c97d6170f9d15da07a Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 24 Oct 2023 12:39:11 +0800 Subject: [PATCH] refactor internalSend --- pulsar/producer_partition.go | 273 ++++++++--------------------------- 1 file changed, 64 insertions(+), 209 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ebd292ec11..1f321de80b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math" "strconv" "strings" "sync" @@ -478,231 +477,87 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg cb(id, msg, err) } -func (p *partitionProducer) internalSend(request *sendRequest) { - p.log.Debug("Received send request: ", *request.msg) - - msg := request.msg - - // read payload from message - uncompressedPayload := msg.Payload - - var schemaPayload []byte - var err error - - // The block chan must be closed when returned with exception - defer request.stopBlock() - if !p.canAddToQueue(request) { - return - } +func (p *partitionProducer) internalSend(sr *sendRequest) { + if sr.sendAsBatch { + smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) + multiSchemaEnabled := !p.options.DisableMultiSchema + added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, + multiSchemaEnabled) + if !added { + // The current batch is full. flush it and retry + p.internalFlushCurrentBatch() - if p.options.DisableMultiSchema { - if msg.Schema != nil && p.options.Schema != nil && - msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { - runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) - p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) - return - } - } - var schema Schema - var schemaVersion []byte - if msg.Schema != nil { - schema = msg.Schema - } else if p.options.Schema != nil { - schema = p.options.Schema - } - if msg.Value != nil { - // payload and schema are mutually exclusive - // try to get payload from schema value only if payload is not set - if uncompressedPayload == nil && schema != nil { - schemaPayload, err = schema.Encode(msg.Value) - if err != nil { - runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) - p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) + // after flushing try again to add the current payload + ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, + sr.schemaVersion, multiSchemaEnabled) + if !ok { + p.log.WithField("size", sr.uncompressedSize). + WithField("properties", sr.msg.Properties). + Error("unable to add message to batch") + sr.done(nil, errFailAddToBatch) return } } - } - if uncompressedPayload == nil { - uncompressedPayload = schemaPayload - } - if schema != nil { - schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) - if schemaVersion == nil { - schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) - if err != nil { - p.log.WithError(err).Error("get schema version fail") - runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) - return - } - p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + if sr.flushImmediately { + p.internalFlushCurrentBatch() } - } - - uncompressedSize := len(uncompressedPayload) - // try to reserve memory for uncompressedPayload - if !p.canReserveMem(request, int64(uncompressedSize)) { return } - deliverAt := msg.DeliverAt - if msg.DeliverAfter.Nanoseconds() > 0 { - deliverAt = time.Now().Add(msg.DeliverAfter) - } - - // set default ReplicationClusters when DisableReplication - if msg.DisableReplication { - msg.ReplicationClusters = []string{"__local__"} - } - - mm := p.genMetadata(msg, uncompressedSize, deliverAt) - - sendAsBatch := !p.options.DisableBatching && - msg.ReplicationClusters == nil && - deliverAt.UnixNano() < 0 - - // Once the batching is enabled, it can close blockCh early to make block finish - if sendAsBatch { - request.stopBlock() - } else { - // update sequence id for metadata, make the size of msgMetadata more accurate - // batch sending will update sequence ID in the BatchBuilder - p.updateMetadataSeqID(mm, msg) - } - - maxMessageSize := int(p._getConn().GetMaxMessageSize()) - - // compress payload if not batching - var compressedPayload []byte - var compressedSize int - var checkSize int - if !sendAsBatch { - compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload) - compressedSize = len(compressedPayload) - checkSize = compressedSize - - // set the compress type in msgMetaData - compressionType := pb.CompressionType(p.options.CompressionType) - if compressionType != pb.CompressionType_NONE { - mm.Compression = &compressionType - } - } else { - // final check for batching message is in serializeMessage - // this is a double check - checkSize = uncompressedSize - } - - // if msg is too large and chunking is disabled - if checkSize > maxMessageSize && !p.options.EnableChunking { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errMessageTooLarge) - p.log.WithError(errMessageTooLarge). - WithField("size", checkSize). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", maxMessageSize) - p.metrics.PublishErrorsMsgTooLarge.Inc() + if sr.totalChunks <= 1 { + p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) return } - var totalChunks int - // max chunk payload size - var payloadChunkSize int - if sendAsBatch || !p.options.EnableChunking { - totalChunks = 1 - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - } else { - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) - if payloadChunkSize <= 0 { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, msg, errMetaTooLarge) - p.log.WithError(errMetaTooLarge). - WithField("metadata size", proto.Size(mm)). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - // set ChunkMaxMessageSize - if p.options.ChunkMaxMessageSize != 0 { - payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) - } - totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize)))) - } - - // set total chunks to send request - request.totalChunks = totalChunks - - if !sendAsBatch { - if totalChunks > 1 { - var lhs, rhs int - uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10)) - mm.Uuid = proto.String(uuid) - mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize)) - cr := newChunkRecorder() - for chunkID := 0; chunkID < totalChunks; chunkID++ { - lhs = chunkID * payloadChunkSize - if rhs = lhs + payloadChunkSize; rhs > compressedSize { - rhs = compressedSize - } - // update chunk id - mm.ChunkId = proto.Int32(int32(chunkID)) - nsr := &sendRequest{ - ctx: request.ctx, - msg: request.msg, - callback: request.callback, - callbackOnce: request.callbackOnce, - publishTime: request.publishTime, - blockCh: request.blockCh, - closeBlockChOnce: request.closeBlockChOnce, - totalChunks: totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: cr, - transaction: request.transaction, - reservedMem: int64(rhs - lhs), - } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs)) - return - } - p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize)) - } - // close the blockCh when all the chunks acquired permits - request.stopBlock() - } else { - // close the blockCh when totalChunks is 1 (it has acquired permits) - request.stopBlock() - p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize)) - } - } else { - smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize) - multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled) - if !added { - // The current batch is full. flush it and retry - - p.internalFlushCurrentBatch() + var lhs, rhs int + uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10)) + sr.mm.Uuid = proto.String(uuid) + sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks)) + sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize)) + cr := newChunkRecorder() - // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errFailAddToBatch) - p.log.WithField("size", uncompressedSize). - WithField("properties", msg.Properties). - Error("unable to add message to batch") - return - } + for chunkID := 0; chunkID < sr.totalChunks; chunkID++ { + lhs = chunkID * sr.payloadChunkSize + rhs = lhs + sr.payloadChunkSize + if rhs > sr.compressedSize { + rhs = sr.compressedSize } - if request.flushImmediately { - - p.internalFlushCurrentBatch() + // update chunk id + sr.mm.ChunkId = proto.Int32(int32(chunkID)) + nsr := &sendRequest{ + producer: sr.producer, + ctx: sr.ctx, + msg: sr.msg, + callback: sr.callback, + callbackOnce: sr.callbackOnce, + publishTime: sr.publishTime, + flushImmediately: sr.flushImmediately, + totalChunks: sr.totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: cr, + transaction: sr.transaction, + memLimit: sr.memLimit, + semaphore: sr.semaphore, + reservedSemaphore: 1, + reservedMem: int64(rhs - lhs), + sendAsBatch: sr.sendAsBatch, + schema: sr.schema, + schemaVersion: sr.schemaVersion, + uncompressedPayload: sr.uncompressedPayload, + uncompressedSize: sr.uncompressedSize, + compressedPayload: sr.compressedPayload, + compressedSize: sr.compressedSize, + payloadChunkSize: sr.payloadChunkSize, + mm: sr.mm, + deliverAt: sr.deliverAt, + maxMessageSize: sr.maxMessageSize, } + + p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } }