From 573448c195c4969d98ba93763310533c2f82243a Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 00:12:44 +0800 Subject: [PATCH 01/12] refactor: prepare sendrequest and move to internalSendAsync Signed-off-by: tison --- pulsar/message_chunking_test.go | 26 +- pulsar/producer_partition.go | 466 ++++++++++++++++++-------------- 2 files changed, 285 insertions(+), 207 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index ee3ab17760..cb7190d6f4 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -552,11 +552,12 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { msg := &ProducerMessage{ Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)), } + wholePayload := msg.Payload producerImpl := p.(*producer).producers[0].(*partitionProducer) - mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now()) + mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now()) mm.Uuid = proto.String(uuid) mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload))) + mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) @@ -568,7 +569,26 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { callback: func(id MessageID, producerMessage *ProducerMessage, err error) { close(doneCh) }, - msg: msg, + ctx: context.Background(), + msg: msg, + flushImmediately: true, + totalChunks: totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: newChunkRecorder(), + transaction: nil, + reservedMem: 0, + sendAsBatch: false, + schema: nil, + schemaVersion: nil, + uncompressedPayload: wholePayload, + uncompressedSize: int64(len(wholePayload)), + compressedPayload: wholePayload, + compressedSize: len(wholePayload), + payloadChunkSize: internal.MaxMessageSize - proto.Size(mm), + mm: mm, + deliverAt: time.Now(), + maxMessageSize: internal.MaxMessageSize, }, uint32(internal.MaxMessageSize), ) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ac07341d3f..7f4a910481 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -478,224 +478,113 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg cb(id, msg, err) } -func (p *partitionProducer) internalSend(request *sendRequest) { - p.log.Debug("Received send request: ", *request.msg) +func (p *partitionProducer) internalSend(sr *sendRequest) { + p.log.Debug("Received send request: ", *sr.msg) - msg := request.msg + msg := sr.msg // read payload from message uncompressedPayload := msg.Payload - var schemaPayload []byte - var err error - // The block chan must be closed when returned with exception - defer request.stopBlock() - if !p.canAddToQueue(request) { + defer sr.stopBlock() + if !p.canAddToQueue(sr) { return } - var schema Schema - var schemaVersion []byte - if msg.Schema != nil { - schema = msg.Schema - } else if p.options.Schema != nil { - schema = p.options.Schema - } - if msg.Value != nil { - // payload and schema are mutually exclusive - // try to get payload from schema value only if payload is not set - if uncompressedPayload == nil && schema != nil { - schemaPayload, err = schema.Encode(msg.Value) - if err != nil { - runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) - p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) - return - } - } - } - if uncompressedPayload == nil { - uncompressedPayload = schemaPayload - } - - if schema != nil { - schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) - if schemaVersion == nil { - schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) - if err != nil { - p.log.WithError(err).Error("get schema version fail") - runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) - return - } - p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) - } - } - - uncompressedSize := len(uncompressedPayload) + uncompressedSize := sr.uncompressedSize // try to reserve memory for uncompressedPayload - if !p.canReserveMem(request, int64(uncompressedSize)) { - return - } - - deliverAt := msg.DeliverAt - if msg.DeliverAfter.Nanoseconds() > 0 { - deliverAt = time.Now().Add(msg.DeliverAfter) - } - - // set default ReplicationClusters when DisableReplication - if msg.DisableReplication { - msg.ReplicationClusters = []string{"__local__"} - } - - mm := p.genMetadata(msg, uncompressedSize, deliverAt) - - sendAsBatch := !p.options.DisableBatching && - msg.ReplicationClusters == nil && - deliverAt.UnixNano() < 0 - - // Once the batching is enabled, it can close blockCh early to make block finish - if sendAsBatch { - request.stopBlock() - } else { - // update sequence id for metadata, make the size of msgMetadata more accurate - // batch sending will update sequence ID in the BatchBuilder - p.updateMetadataSeqID(mm, msg) - } - - maxMessageSize := int(p._getConn().GetMaxMessageSize()) - - // compress payload if not batching - var compressedPayload []byte - var compressedSize int - var checkSize int - if !sendAsBatch { - compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload) - compressedSize = len(compressedPayload) - checkSize = compressedSize - - // set the compress type in msgMetaData - compressionType := pb.CompressionType(p.options.CompressionType) - if compressionType != pb.CompressionType_NONE { - mm.Compression = &compressionType - } - } else { - // final check for batching message is in serializeMessage - // this is a double check - checkSize = uncompressedSize - } - - // if msg is too large and chunking is disabled - if checkSize > maxMessageSize && !p.options.EnableChunking { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errMessageTooLarge) - p.log.WithError(errMessageTooLarge). - WithField("size", checkSize). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", maxMessageSize) - p.metrics.PublishErrorsMsgTooLarge.Inc() + if !p.canReserveMem(sr, uncompressedSize) { return } - var totalChunks int - // max chunk payload size - var payloadChunkSize int - if sendAsBatch || !p.options.EnableChunking { - totalChunks = 1 - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - } else { - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) - if payloadChunkSize <= 0 { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, msg, errMetaTooLarge) - p.log.WithError(errMetaTooLarge). - WithField("metadata size", proto.Size(mm)). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - // set ChunkMaxMessageSize - if p.options.ChunkMaxMessageSize != 0 { - payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) - } - totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize)))) - } - - // set total chunks to send request - request.totalChunks = totalChunks - - if !sendAsBatch { - if totalChunks > 1 { - var lhs, rhs int - uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10)) - mm.Uuid = proto.String(uuid) - mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize)) - cr := newChunkRecorder() - for chunkID := 0; chunkID < totalChunks; chunkID++ { - lhs = chunkID * payloadChunkSize - if rhs = lhs + payloadChunkSize; rhs > compressedSize { - rhs = compressedSize - } - // update chunk id - mm.ChunkId = proto.Int32(int32(chunkID)) - nsr := &sendRequest{ - ctx: request.ctx, - msg: request.msg, - callback: request.callback, - callbackOnce: request.callbackOnce, - publishTime: request.publishTime, - blockCh: request.blockCh, - closeBlockChOnce: request.closeBlockChOnce, - totalChunks: totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: cr, - transaction: request.transaction, - reservedMem: int64(rhs - lhs), - } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs)) - return - } - p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize)) - } - // close the blockCh when all the chunks acquired permits - request.stopBlock() - } else { - // close the blockCh when totalChunks is 1 (it has acquired permits) - request.stopBlock() - p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize)) - } - } else { - smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize) + if sr.sendAsBatch { + smm := p.genSingleMessageMetadataInBatch(msg, int(uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled) + added := addRequestToBatch( + smm, p, uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) if !added { // The current batch is full. flush it and retry - p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errFailAddToBatch) + if ok := addRequestToBatch(smm, p, uncompressedPayload, sr, msg, + sr.deliverAt, sr.schemaVersion, multiSchemaEnabled); !ok { + p.releaseSemaphoreAndMem(uncompressedSize) + runCallback(sr.callback, nil, sr.msg, errFailAddToBatch) p.log.WithField("size", uncompressedSize). WithField("properties", msg.Properties). Error("unable to add message to batch") return } } - if request.flushImmediately { - + if sr.flushImmediately { p.internalFlushCurrentBatch() + } + return + } + if sr.totalChunks <= 1 { + // close the blockCh when totalChunks is 1 (it has acquired permits) + // TODO - drop this method + sr.stopBlock() + p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) + return + } + + var lhs, rhs int + uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10)) + sr.mm.Uuid = proto.String(uuid) + sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks)) + sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize)) + cr := newChunkRecorder() + for chunkID := 0; chunkID < sr.totalChunks; chunkID++ { + lhs = chunkID * sr.payloadChunkSize + rhs = lhs + sr.payloadChunkSize + if rhs > sr.compressedSize { + rhs = sr.compressedSize + } + // update chunk id + sr.mm.ChunkId = proto.Int32(int32(chunkID)) + nsr := &sendRequest{ + ctx: sr.ctx, + msg: sr.msg, + callback: sr.callback, + callbackOnce: sr.callbackOnce, + publishTime: sr.publishTime, + blockCh: sr.blockCh, + closeBlockChOnce: sr.closeBlockChOnce, + totalChunks: sr.totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: cr, + transaction: sr.transaction, + reservedMem: int64(rhs - lhs), + sendAsBatch: sr.sendAsBatch, + schema: sr.schema, + schemaVersion: sr.schemaVersion, + uncompressedPayload: sr.uncompressedPayload, + uncompressedSize: sr.uncompressedSize, + compressedPayload: sr.compressedPayload, + compressedSize: sr.compressedSize, + payloadChunkSize: sr.payloadChunkSize, + mm: sr.mm, + deliverAt: sr.deliverAt, + maxMessageSize: sr.maxMessageSize, } + // the permit of first chunk has acquired + if chunkID != 0 && !p.canAddToQueue(nsr) { + p.releaseSemaphoreAndMem(uncompressedSize - int64(lhs)) + return + } + + p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } + + // close the blockCh when all the chunks acquired permits + // TODO - drop this method + sr.stopBlock() } func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, @@ -766,8 +655,10 @@ func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessa } } -func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage, - uncompressedSize int) (smm *pb.SingleMessageMetadata) { +func (p *partitionProducer) genSingleMessageMetadataInBatch( + msg *ProducerMessage, + uncompressedSize int, +) (smm *pb.SingleMessageMetadata) { smm = &pb.SingleMessageMetadata{ PayloadSize: proto.Int32(int32(uncompressedSize)), } @@ -1163,6 +1054,142 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { return nil } +func (p *partitionProducer) updateSchema(sr *sendRequest) error { + var schema Schema + var schemaVersion []byte + var err error + + if sr.msg.Schema != nil { + schema = sr.msg.Schema + } else if p.options.Schema != nil { + schema = p.options.Schema + } + + if schema == nil { + return nil + } + + schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) + if schemaVersion == nil { + schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) + if err != nil { + return fmt.Errorf("get schema version fail, err: %w", err) + } + p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + } + + sr.schema = schema + sr.schemaVersion = schemaVersion + return nil +} + +func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error { + // read payload from message + sr.uncompressedPayload = sr.msg.Payload + + if sr.msg.Value != nil { + if sr.schema == nil { + p.log.Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, "set schema value without setting schema") + } + + // payload and schema are mutually exclusive + // try to get payload from schema value only if payload is not set + schemaPayload, err := sr.schema.Encode(sr.msg.Value) + if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, err.Error()) + } + + sr.uncompressedPayload = schemaPayload + } + + sr.uncompressedSize = int64(len(sr.uncompressedPayload)) + return nil +} + +func (p *partitionProducer) updateMetaData(sr *sendRequest) { + deliverAt := sr.msg.DeliverAt + if sr.msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(sr.msg.DeliverAfter) + } + + // set default ReplicationClusters when DisableReplication + if sr.msg.DisableReplication { + sr.msg.ReplicationClusters = []string{"__local__"} + } + + sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt) + + sr.sendAsBatch = !p.options.DisableBatching && + sr.msg.ReplicationClusters == nil && + deliverAt.UnixNano() < 0 + + // Once the batching is enabled, it can close blockCh early to make block finish + if sr.sendAsBatch { + // TODO - drop this method + sr.stopBlock() + } else { + // update sequence id for metadata, make the size of msgMetadata more accurate + // batch sending will update sequence ID in the BatchBuilder + p.updateMetadataSeqID(sr.mm, sr.msg) + } + + sr.deliverAt = deliverAt +} + +func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { + checkSize := sr.uncompressedSize + if !sr.sendAsBatch { + sr.compressedPayload = p.compressionProvider.Compress(nil, sr.uncompressedPayload) + sr.compressedSize = len(sr.compressedPayload) + + // set the compress type in msgMetaData + compressionType := pb.CompressionType(p.options.CompressionType) + if compressionType != pb.CompressionType_NONE { + sr.mm.Compression = &compressionType + } + + checkSize = int64(sr.compressedSize) + } + + sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) + + // if msg is too large and chunking is disabled + if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { + p.log.WithError(errMessageTooLarge). + WithField("size", checkSize). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMessageTooLarge + } + + if sr.sendAsBatch || !p.options.EnableChunking { + sr.totalChunks = 1 + sr.payloadChunkSize = int(sr.maxMessageSize) + return nil + } + + sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) + if sr.payloadChunkSize <= 0 { + p.log.WithError(errMetaTooLarge). + WithField("metadata size", proto.Size(sr.mm)). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMetaTooLarge + } + + // set ChunkMaxMessageSize + if p.options.ChunkMaxMessageSize != 0 { + sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) + } + + sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize)))) + return nil +} + func (p *partitionProducer) internalSendAsync( ctx context.Context, msg *ProducerMessage, @@ -1202,6 +1229,26 @@ func (p *partitionProducer) internalSendAsync( p.options.Interceptors.BeforeSend(p, msg) + if err := p.updateSchema(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + + if err := p.updateUncompressedPayload(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + + p.updateMetaData(sr) + + if err := p.updateChunkInfo(sr); err != nil { + p.log.Error(err) + runCallback(sr.callback, nil, msg, err) + return + } + p.dataChan <- sr if !p.options.DisableBlockIfQueueFull { @@ -1435,20 +1482,31 @@ func (p *partitionProducer) Close() { } type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + blockCh chan struct{} + closeBlockChOnce *sync.Once + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + transaction *transaction + reservedMem int64 + sendAsBatch bool + schema Schema + schemaVersion []byte + uncompressedPayload []byte + uncompressedSize int64 + compressedPayload []byte + compressedSize int + payloadChunkSize int + mm *pb.MessageMetadata + deliverAt time.Time + maxMessageSize int32 } // stopBlock can be invoked multiple times safety From 58106540c7b743ca6a319a1dcf83facc8559d0f9 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 00:16:55 +0800 Subject: [PATCH 02/12] drop stopBlock Signed-off-by: tison --- pulsar/message_chunking_test.go | 5 ---- pulsar/producer_partition.go | 41 +++------------------------------ 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index cb7190d6f4..fbdcaa0ceb 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -560,14 +560,11 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) - - doneCh := make(chan struct{}) producerImpl.internalSingleSend( mm, msg.Payload, &sendRequest{ callback: func(id MessageID, producerMessage *ProducerMessage, err error) { - close(doneCh) }, ctx: context.Background(), msg: msg, @@ -592,6 +589,4 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { }, uint32(internal.MaxMessageSize), ) - - <-doneCh } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7f4a910481..e19431401c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -486,8 +486,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { // read payload from message uncompressedPayload := msg.Payload - // The block chan must be closed when returned with exception - defer sr.stopBlock() if !p.canAddToQueue(sr) { return } @@ -526,9 +524,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { } if sr.totalChunks <= 1 { - // close the blockCh when totalChunks is 1 (it has acquired permits) - // TODO - drop this method - sr.stopBlock() p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) return } @@ -553,8 +548,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { callback: sr.callback, callbackOnce: sr.callbackOnce, publishTime: sr.publishTime, - blockCh: sr.blockCh, - closeBlockChOnce: sr.closeBlockChOnce, totalChunks: sr.totalChunks, chunkID: chunkID, uuid: uuid, @@ -581,10 +574,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } - - // close the blockCh when all the chunks acquired permits - // TODO - drop this method - sr.stopBlock() } func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, @@ -1125,11 +1114,7 @@ func (p *partitionProducer) updateMetaData(sr *sendRequest) { sr.msg.ReplicationClusters == nil && deliverAt.UnixNano() < 0 - // Once the batching is enabled, it can close blockCh early to make block finish - if sr.sendAsBatch { - // TODO - drop this method - sr.stopBlock() - } else { + if !sr.sendAsBatch { // update sequence id for metadata, make the size of msgMetadata more accurate // batch sending will update sequence ID in the BatchBuilder p.updateMetadataSeqID(sr.mm, sr.msg) @@ -1153,7 +1138,7 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { checkSize = int64(sr.compressedSize) } - sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) + sr.maxMessageSize = p._getConn().GetMaxMessageSize() // if msg is too large and chunking is disabled if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { @@ -1202,19 +1187,13 @@ func (p *partitionProducer) internalSendAsync( return } - // bc only works when DisableBlockIfQueueFull is false - bc := make(chan struct{}) - // callbackOnce make sure the callback is only invoked once in chunking - callbackOnce := &sync.Once{} sr := &sendRequest{ ctx: ctx, msg: msg, callback: callback, - callbackOnce: callbackOnce, + callbackOnce: &sync.Once{}, flushImmediately: flushImmediately, publishTime: time.Now(), - blockCh: bc, - closeBlockChOnce: &sync.Once{}, } if err := p.prepareTransaction(sr); err != nil { runCallback(sr.callback, nil, msg, err) @@ -1250,11 +1229,6 @@ func (p *partitionProducer) internalSendAsync( } p.dataChan <- sr - - if !p.options.DisableBlockIfQueueFull { - // block if queue full - <-bc - } } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1488,8 +1462,6 @@ type sendRequest struct { callbackOnce *sync.Once publishTime time.Time flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once totalChunks int chunkID int uuid string @@ -1509,13 +1481,6 @@ type sendRequest struct { maxMessageSize int32 } -// stopBlock can be invoked multiple times safety -func (sr *sendRequest) stopBlock() { - sr.closeBlockChOnce.Do(func() { - close(sr.blockCh) - }) -} - type closeProducer struct { doneCh chan struct{} } From 15135bfb6fb1c04ec9b650b6888898491ccfab99 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 02:14:48 +0800 Subject: [PATCH 03/12] locate bug Signed-off-by: tison --- pulsar/producer_partition.go | 74 ++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e19431401c..b00ed6b53e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -483,35 +483,38 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { msg := sr.msg - // read payload from message - uncompressedPayload := msg.Payload - if !p.canAddToQueue(sr) { return } - uncompressedSize := sr.uncompressedSize - // try to reserve memory for uncompressedPayload - if !p.canReserveMem(sr, uncompressedSize) { + if !p.canReserveMem(sr, sr.uncompressedSize) { + return + } + + if err := p.updateChunkInfo(sr); err != nil { + p.releaseSemaphoreAndMem(sr.uncompressedSize) + runCallback(sr.callback, nil, sr.msg, err) + p.metrics.PublishErrorsMsgTooLarge.Inc() return } if sr.sendAsBatch { - smm := p.genSingleMessageMetadataInBatch(msg, int(uncompressedSize)) + smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch( - smm, p, uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) + added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, + multiSchemaEnabled) if !added { // The current batch is full. flush it and retry + p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, uncompressedPayload, sr, msg, - sr.deliverAt, sr.schemaVersion, multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(uncompressedSize) + if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, + multiSchemaEnabled); !ok { + p.releaseSemaphoreAndMem(sr.uncompressedSize) runCallback(sr.callback, nil, sr.msg, errFailAddToBatch) - p.log.WithField("size", uncompressedSize). + p.log.WithField("size", sr.uncompressedSize). WithField("properties", msg.Properties). Error("unable to add message to batch") return @@ -536,8 +539,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { cr := newChunkRecorder() for chunkID := 0; chunkID < sr.totalChunks; chunkID++ { lhs = chunkID * sr.payloadChunkSize - rhs = lhs + sr.payloadChunkSize - if rhs > sr.compressedSize { + if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize { rhs = sr.compressedSize } // update chunk id @@ -548,6 +550,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { callback: sr.callback, callbackOnce: sr.callbackOnce, publishTime: sr.publishTime, + flushImmediately: sr.flushImmediately, totalChunks: sr.totalChunks, chunkID: chunkID, uuid: uuid, @@ -568,10 +571,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { } // the permit of first chunk has acquired if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(uncompressedSize - int64(lhs)) + p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs)) return } - p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } } @@ -1146,32 +1148,28 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { WithField("size", checkSize). WithField("properties", sr.msg.Properties). Errorf("MaxMessageSize %d", sr.maxMessageSize) - return errMessageTooLarge } if sr.sendAsBatch || !p.options.EnableChunking { sr.totalChunks = 1 sr.payloadChunkSize = int(sr.maxMessageSize) - return nil - } - - sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) - if sr.payloadChunkSize <= 0 { - p.log.WithError(errMetaTooLarge). - WithField("metadata size", proto.Size(sr.mm)). - WithField("properties", sr.msg.Properties). - Errorf("MaxMessageSize %d", sr.maxMessageSize) - - return errMetaTooLarge - } - - // set ChunkMaxMessageSize - if p.options.ChunkMaxMessageSize != 0 { - sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) + } else { + sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) + if sr.payloadChunkSize <= 0 { + p.log.WithError(errMetaTooLarge). + WithField("metadata size", proto.Size(sr.mm)). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) + return errMetaTooLarge + } + // set ChunkMaxMessageSize + if p.options.ChunkMaxMessageSize != 0 { + sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) + } + sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize)))) } - sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize)))) return nil } @@ -1222,12 +1220,6 @@ func (p *partitionProducer) internalSendAsync( p.updateMetaData(sr) - if err := p.updateChunkInfo(sr); err != nil { - p.log.Error(err) - runCallback(sr.callback, nil, msg, err) - return - } - p.dataChan <- sr } From 7c353b8652b8ab4adc94867e7b235ec9ba0038dd Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 03:14:36 +0800 Subject: [PATCH 04/12] introduce new resource management methods Signed-off-by: tison --- pulsar/producer_partition.go | 188 +++++++++++++++++++++++++---------- 1 file changed, 133 insertions(+), 55 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b00ed6b53e..a24a9a7c03 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -251,7 +251,7 @@ func (p *partitionProducer) grabCnx() error { res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") - if err == internal.ErrRequestTimeOut { + if errors.Is(err, internal.ErrRequestTimeOut) { id := p.client.rpcClient.NewRequestID() _, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ @@ -1448,19 +1448,29 @@ func (p *partitionProducer) Close() { } type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + producer *partitionProducer + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + + /// resource management + + memLimit internal.MemoryLimitController + reservedMem int64 + semaphore internal.Semaphore + reservedSemaphore int + + /// convey settable state + sendAsBatch bool + transaction *transaction schema Schema schemaVersion []byte uncompressedPayload []byte @@ -1473,6 +1483,115 @@ type sendRequest struct { maxMessageSize int32 } +func (sr *sendRequest) done(msgID MessageID, err error) { + if err == nil { + sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) + sr.producer.metrics.MessagesPublished.Inc() + sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + } + + if err != nil { + sr.producer.log.WithError(err). + WithField("size", sr.reservedMem). + WithField("properties", sr.msg.Properties) + } + + if errors.Is(err, errSendTimeout) { + sr.producer.metrics.PublishErrorsTimeout.Inc() + } + + if errors.Is(err, errMessageTooLarge) { + sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() + } + + if sr.semaphore != nil { + for i := 0; i < sr.reservedSemaphore; i++ { + sr.semaphore.Release() + } + sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore)) + } + + if sr.memLimit != nil { + sr.memLimit.ReleaseMemory(sr.reservedMem) + sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) + } + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, msgID, sr.msg, err) + }) + + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(err) + } + + if sr.producer.options.Interceptors != nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } +} + +func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { + for i := 0; i < sr.totalChunks; i++ { + if p.options.DisableBlockIfQueueFull { + if !p.publishSemaphore.TryAcquire() { + return errSendQueueIsFull + } + + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + return errContextExpired + } + + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } + } + + return nil +} + +func (p *partitionProducer) reserveMem(sr *sendRequest) error { + requiredMem := sr.uncompressedSize + if !sr.sendAsBatch { + requiredMem = int64(sr.compressedSize) + } + + if p.options.DisableBlockIfQueueFull { + if !p.client.memLimit.TryReserveMemory(requiredMem) { + return errMemoryBufferIsFull + } + + } else { + if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) { + return errContextExpired + } + } + + sr.memLimit = p.client.memLimit + sr.reservedMem += requiredMem + p.metrics.BytesPending.Add(float64(requiredMem)) + return nil +} + +func (p *partitionProducer) reserveResources(sr *sendRequest) error { + if err := p.reserveSemaphore(sr); err != nil { + return err + } + if err := p.reserveMem(sr); err != nil { + return err + } + return nil +} + type closeProducer struct { doneCh chan struct{} } @@ -1502,53 +1621,12 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The conn must be non-nil for the lifetime of the partitionProducer. + // Invariant: p.conn must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection) } -func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { - p.publishSemaphore.Release() - p.client.memLimit.ReleaseMemory(size) -} - -func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { - if p.options.DisableBlockIfQueueFull { - if !p.publishSemaphore.TryAcquire() { - runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) - return false - } - } else { - if !p.publishSemaphore.Acquire(sr.ctx) { - runCallback(sr.callback, nil, sr.msg, errContextExpired) - return false - } - } - p.metrics.MessagesPending.Inc() - return true -} - -func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool { - if p.options.DisableBlockIfQueueFull { - if !p.client.memLimit.TryReserveMemory(size) { - p.publishSemaphore.Release() - runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull) - return false - } - - } else { - if !p.client.memLimit.ReserveMemory(sr.ctx, size) { - p.publishSemaphore.Release() - runCallback(sr.callback, nil, sr.msg, errContextExpired) - return false - } - } - sr.reservedMem += size - p.metrics.BytesPending.Add(float64(size)) - return true -} - type chunkRecorder struct { chunkedMsgID chunkMessageID } From 166ec81060a2989b92a57ca3e03e9bb7e62e2624 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 03:32:39 +0800 Subject: [PATCH 05/12] use sr.done everywhere Signed-off-by: tison --- pulsar/message_chunking_test.go | 6 +- pulsar/producer_partition.go | 228 ++++++++++++-------------------- 2 files changed, 85 insertions(+), 149 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index fbdcaa0ceb..653fa81961 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -568,16 +568,12 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { }, ctx: context.Background(), msg: msg, + producer: producerImpl, flushImmediately: true, totalChunks: totalChunks, chunkID: chunkID, uuid: uuid, chunkRecorder: newChunkRecorder(), - transaction: nil, - reservedMem: 0, - sendAsBatch: false, - schema: nil, - schemaVersion: nil, uncompressedPayload: wholePayload, uncompressedSize: int64(len(wholePayload)), compressedPayload: wholePayload, diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a24a9a7c03..c36210b3ac 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -481,45 +481,28 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg func (p *partitionProducer) internalSend(sr *sendRequest) { p.log.Debug("Received send request: ", *sr.msg) - msg := sr.msg - - if !p.canAddToQueue(sr) { - return - } - - // try to reserve memory for uncompressedPayload - if !p.canReserveMem(sr, sr.uncompressedSize) { - return - } - - if err := p.updateChunkInfo(sr); err != nil { - p.releaseSemaphoreAndMem(sr.uncompressedSize) - runCallback(sr.callback, nil, sr.msg, err) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - if sr.sendAsBatch { - smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize)) + smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, - multiSchemaEnabled) + + added := addRequestToBatch( + smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) if !added { // The current batch is full. flush it and retry - p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(sr.uncompressedSize) - runCallback(sr.callback, nil, sr.msg, errFailAddToBatch) + ok := addRequestToBatch( + smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) + if !ok { p.log.WithField("size", sr.uncompressedSize). - WithField("properties", msg.Properties). + WithField("properties", sr.msg.Properties). Error("unable to add message to batch") + sr.done(nil, errFailAddToBatch) return } } + if sr.flushImmediately { p.internalFlushCurrentBatch() } @@ -547,6 +530,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { nsr := &sendRequest{ ctx: sr.ctx, msg: sr.msg, + producer: sr.producer, callback: sr.callback, callbackOnce: sr.callbackOnce, publishTime: sr.publishTime, @@ -556,6 +540,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { uuid: uuid, chunkRecorder: cr, transaction: sr.transaction, + memLimit: sr.memLimit, + semaphore: sr.semaphore, + reservedSemaphore: 1, reservedMem: int64(rhs - lhs), sendAsBatch: sr.sendAsBatch, schema: sr.schema, @@ -569,11 +556,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { deliverAt: sr.deliverAt, maxMessageSize: sr.maxMessageSize, } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs)) - return - } p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } } @@ -675,11 +657,13 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch( return } -func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, +func (p *partitionProducer) internalSingleSend( + mm *pb.MessageMetadata, compressedPayload []byte, - request *sendRequest, - maxMessageSize uint32) { - msg := request.msg + sr *sendRequest, + maxMessageSize uint32, +) { + msg := sr.msg payloadBuf := internal.NewBuffer(len(compressedPayload)) payloadBuf.Write(compressedPayload) @@ -694,8 +678,8 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, var mostSigBits uint64 var leastSigBits uint64 - if request.transaction != nil { - txnID := request.transaction.GetTxnID() + if sr.transaction != nil { + txnID := sr.transaction.GetTxnID() useTxn = true mostSigBits = txnID.MostSigBits leastSigBits = txnID.LeastSigBits @@ -715,8 +699,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, ) if err != nil { - runCallback(request.callback, nil, request.msg, err) - p.releaseSemaphoreAndMem(request.reservedMem) + sr.done(nil, err) p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value) return } @@ -725,7 +708,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, sentAt: time.Now(), buffer: buffer, sequenceID: sid, - sendRequests: []interface{}{request}, + sendRequests: []interface{}{sr}, }) p._getConn().WriteData(buffer) } @@ -756,15 +739,14 @@ func (p *partitionProducer) internalFlushCurrentBatch() { if err != nil { for _, cb := range callbacks { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, err) + sr.done(nil, err) } } + if errors.Is(err, internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge). - Errorf("internal err: %s", err) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return + p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err) } + return } @@ -853,25 +835,7 @@ func (p *partitionProducer) failTimeoutMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.metrics.PublishErrorsTimeout.Inc() - p.log.WithError(errSendTimeout). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errSendTimeout) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + sr.done(nil, errSendTimeout) } // flag the sending has completed with error, flush make no effect @@ -899,15 +863,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if errs[i] != nil { for _, cb := range callbacks[i] { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, errs[i]) + sr.done(nil, errs[i]) } } + if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge). - Errorf("internal err: %s", errs[i]) - p.metrics.PublishErrorsMsgTooLarge.Inc() + p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i]) return } + continue } if batchesData[i] == nil { @@ -1036,12 +1000,6 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { } sr.transaction = txn - callback := sr.callback - sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) { - runCallback(callback, id, producerMessage, err) - txn.endSendOrAckOp(err) - } - return nil } @@ -1188,19 +1146,20 @@ func (p *partitionProducer) internalSendAsync( sr := &sendRequest{ ctx: ctx, msg: msg, + producer: p, callback: callback, callbackOnce: &sync.Once{}, flushImmediately: flushImmediately, publishTime: time.Now(), } + if err := p.prepareTransaction(sr); err != nil { - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } if p.getProducerState() != producerReady { - // Producer is closing - runCallback(sr.callback, nil, msg, errProducerClosed) + sr.done(nil, errProducerClosed) return } @@ -1208,18 +1167,31 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateSchema(sr); err != nil { p.log.Error(err) - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } p.updateMetaData(sr) + if err := p.updateChunkInfo(sr); err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + + // everything is OK, reserve required semaphore and memory + if err := p.reserveResources(sr); err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + p.dataChan <- sr } @@ -1257,55 +1229,40 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - - if sr.callback != nil || len(p.options.Interceptors) > 0 { - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - batchSize, - ) - - if sr.totalChunks > 1 { - if sr.chunkID == 0 { - sr.chunkRecorder.setFirstChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - } else if sr.chunkID == sr.totalChunks-1 { - sr.chunkRecorder.setLastChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - // use chunkMsgID to set msgID - msgID = &sr.chunkRecorder.chunkedMsgID - } - } - if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { - runCallback(sr.callback, msgID, sr.msg, nil) - p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + batchSize, + ) + + if sr.totalChunks > 1 { + if sr.chunkID == 0 { + sr.chunkRecorder.setFirstChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + } else if sr.chunkID == sr.totalChunks-1 { + sr.chunkRecorder.setLastChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + // use chunkMsgID to set msgID + msgID = &sr.chunkRecorder.chunkedMsgID } } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + + sr.done(msgID, nil) } // Mark this pending item as done @@ -1372,24 +1329,7 @@ func (p *partitionProducer) failPendingMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.log.WithError(errProducerClosed). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errProducerClosed) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + sr.done(nil, errProducerClosed) } // flag the sending has completed with error, flush make no effect From 2e74e62f2e34f2ea00ad276db329f1fb1b775f19 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 03:49:30 +0800 Subject: [PATCH 06/12] fixup nil Signed-off-by: tison --- pulsar/message_chunking_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 653fa81961..6bbb830108 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "strings" + "sync" "testing" "time" @@ -566,6 +567,7 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { &sendRequest{ callback: func(id MessageID, producerMessage *ProducerMessage, err error) { }, + callbackOnce: &sync.Once{}, ctx: context.Background(), msg: msg, producer: producerImpl, From 322335f9b502182d98340e1f05f0c865e8b954c7 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 04:27:11 +0800 Subject: [PATCH 07/12] fix unprepared chunked message Signed-off-by: tison --- pulsar/producer_partition.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c36210b3ac..a7aaba8433 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1151,6 +1151,7 @@ func (p *partitionProducer) internalSendAsync( callbackOnce: &sync.Once{}, flushImmediately: flushImmediately, publishTime: time.Now(), + chunkID: -1, } if err := p.prepareTransaction(sr); err != nil { @@ -1428,6 +1429,12 @@ func (sr *sendRequest) done(msgID MessageID, err error) { sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) sr.producer.metrics.MessagesPublished.Inc() sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + if sr.producer.options.Interceptors != nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } } if err != nil { @@ -1456,7 +1463,8 @@ func (sr *sendRequest) done(msgID MessageID, err error) { sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) } - if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + // sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately + if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 { sr.callbackOnce.Do(func() { runCallback(sr.callback, msgID, sr.msg, err) }) @@ -1464,10 +1472,6 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if sr.transaction != nil { sr.transaction.endSendOrAckOp(err) } - - if sr.producer.options.Interceptors != nil { - sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) - } } } From cc63851a0e3e2e85c23b5e6b346e4f8453b598b5 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 05:08:04 +0800 Subject: [PATCH 08/12] skip TestChunkBlockIfQueueFull Signed-off-by: tison --- pulsar/message_chunking_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 6bbb830108..9b6a2be794 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -511,6 +511,8 @@ func TestChunkMultiTopicConsumerReceive(t *testing.T) { } func TestChunkBlockIfQueueFull(t *testing.T) { + t.Skip("Currently we require preserve full resources for chunked messages.") + client, err := NewClient(ClientOptions{ URL: lookupURL, }) From 65ecbb39c981790b3ea483e820aa24e9b675b1be Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 05:15:21 +0800 Subject: [PATCH 09/12] fix reserveSemaphore need not to be at once Signed-off-by: tison --- pulsar/message_chunking_test.go | 2 - pulsar/producer_partition.go | 65 +++++++++++++++++---------------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 9b6a2be794..6bbb830108 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -511,8 +511,6 @@ func TestChunkMultiTopicConsumerReceive(t *testing.T) { } func TestChunkBlockIfQueueFull(t *testing.T) { - t.Skip("Currently we require preserve full resources for chunked messages.") - client, err := NewClient(ClientOptions{ URL: lookupURL, }) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a7aaba8433..f6c36fb12e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -481,6 +481,11 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg func (p *partitionProducer) internalSend(sr *sendRequest) { p.log.Debug("Received send request: ", *sr.msg) + if err := p.reserveSemaphore(sr); err != nil { + sr.done(nil, err) + return + } + if sr.sendAsBatch { smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema @@ -556,6 +561,15 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { deliverAt: sr.deliverAt, maxMessageSize: sr.maxMessageSize, } + + // the permit of first chunk has acquired + if chunkID != 0 { + if err := p.reserveSemaphore(sr); err != nil { + sr.done(nil, err) + return + } + } + p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } } @@ -1186,8 +1200,7 @@ func (p *partitionProducer) internalSendAsync( return } - // everything is OK, reserve required semaphore and memory - if err := p.reserveResources(sr); err != nil { + if err := p.reserveMem(sr); err != nil { p.log.Error(err) sr.done(nil, err) return @@ -1476,28 +1489,26 @@ func (sr *sendRequest) done(msgID MessageID, err error) { } func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { - for i := 0; i < sr.totalChunks; i++ { - if p.options.DisableBlockIfQueueFull { - if !p.publishSemaphore.TryAcquire() { - return errSendQueueIsFull - } - - // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case - // of that only a part of the chunks acquire succeed - sr.semaphore = p.publishSemaphore - sr.reservedSemaphore++ - p.metrics.MessagesPending.Inc() - } else { - if !p.publishSemaphore.Acquire(sr.ctx) { - return errContextExpired - } + if p.options.DisableBlockIfQueueFull { + if !p.publishSemaphore.TryAcquire() { + return errSendQueueIsFull + } - // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case - // of that only a part of the chunks acquire succeed - sr.semaphore = p.publishSemaphore - sr.reservedSemaphore++ - p.metrics.MessagesPending.Inc() + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + return errContextExpired } + + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() } return nil @@ -1526,16 +1537,6 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error { return nil } -func (p *partitionProducer) reserveResources(sr *sendRequest) error { - if err := p.reserveSemaphore(sr); err != nil { - return err - } - if err := p.reserveMem(sr); err != nil { - return err - } - return nil -} - type closeProducer struct { doneCh chan struct{} } From 0f3f849b3842774ec1fbb451db05e463d212f235 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 05:35:43 +0800 Subject: [PATCH 10/12] release exact memory Signed-off-by: tison --- pulsar/producer_partition.go | 6 ++++-- pulsar/producer_test.go | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f6c36fb12e..8062e1c280 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -565,12 +565,14 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { // the permit of first chunk has acquired if chunkID != 0 { if err := p.reserveSemaphore(sr); err != nil { - sr.done(nil, err) + // force run callback and txn close + nsr.chunkID = -1 + nsr.done(nil, err) return } } - p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) + p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize)) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 29ffa7805a..929cfe0d55 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2079,7 +2079,6 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { } func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { - c, err := NewClient(ClientOptions{ URL: serviceURL, MemoryLimitBytes: 5 * 1024, From bdcaf528d8de18a1293f33749c38cbd9666605c9 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 06:12:08 +0800 Subject: [PATCH 11/12] finish acquire message permis one by one Signed-off-by: tison --- pulsar/producer_partition.go | 37 +++++++++--------------------------- pulsar/transaction_test.go | 1 - 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8062e1c280..0f1a9d7d82 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -481,11 +481,6 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg func (p *partitionProducer) internalSend(sr *sendRequest) { p.log.Debug("Received send request: ", *sr.msg) - if err := p.reserveSemaphore(sr); err != nil { - sr.done(nil, err) - return - } - if sr.sendAsBatch { smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema @@ -547,7 +542,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { transaction: sr.transaction, memLimit: sr.memLimit, semaphore: sr.semaphore, - reservedSemaphore: 1, reservedMem: int64(rhs - lhs), sendAsBatch: sr.sendAsBatch, schema: sr.schema, @@ -562,14 +556,10 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { maxMessageSize: sr.maxMessageSize, } - // the permit of first chunk has acquired - if chunkID != 0 { - if err := p.reserveSemaphore(sr); err != nil { - // force run callback and txn close - nsr.chunkID = -1 - nsr.done(nil, err) - return - } + if err := p.reserveSemaphore(nsr); err != nil { + nsr.chunkID = -1 // force run callback and close txn + nsr.done(nil, err) + return } p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize)) @@ -1418,10 +1408,9 @@ type sendRequest struct { /// resource management - memLimit internal.MemoryLimitController - reservedMem int64 - semaphore internal.Semaphore - reservedSemaphore int + memLimit internal.MemoryLimitController + reservedMem int64 + semaphore internal.Semaphore /// convey settable state @@ -1467,10 +1456,8 @@ func (sr *sendRequest) done(msgID MessageID, err error) { } if sr.semaphore != nil { - for i := 0; i < sr.reservedSemaphore; i++ { - sr.semaphore.Release() - } - sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore)) + sr.semaphore.Release() + sr.producer.metrics.MessagesPending.Dec() } if sr.memLimit != nil { @@ -1496,20 +1483,14 @@ func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { return errSendQueueIsFull } - // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case - // of that only a part of the chunks acquire succeed sr.semaphore = p.publishSemaphore - sr.reservedSemaphore++ p.metrics.MessagesPending.Inc() } else { if !p.publishSemaphore.Acquire(sr.ctx) { return errContextExpired } - // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case - // of that only a part of the chunks acquire succeed sr.semaphore = p.publishSemaphore - sr.reservedSemaphore++ p.metrics.MessagesPending.Inc() } diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index 385b197e00..74e8dd0c99 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) { // Create transaction and register the send operation. txn, err := client.NewTransaction(time.Hour) require.Nil(t, err) - txn.(*transaction).registerSendOrAckOp() // Create a producer with chunking enabled to send a large message that will be split into chunks. producer, err := client.CreateProducer(ProducerOptions{ From 0becdc346985b5fcd252ed54193532fa27a6ef4e Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 25 Oct 2023 16:16:04 +0800 Subject: [PATCH 12/12] block reserving resource in front Signed-off-by: tison --- pulsar/message_chunking_test.go | 7 ++-- pulsar/producer_partition.go | 57 ++++++++++++++++++++------------- pulsar/producer_test.go | 5 ++- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 6bbb830108..59fdb5ec61 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -532,12 +532,13 @@ func TestChunkBlockIfQueueFull(t *testing.T) { assert.NotNil(t, producer) defer producer.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() // Large messages will be split into 11 chunks, exceeding the length of pending queue - ID, err := producer.Send(context.Background(), &ProducerMessage{ + _, err = producer.Send(ctx, &ProducerMessage{ Payload: createTestMessagePayload(100), }) - assert.NoError(t, err) - assert.NotNil(t, ID) + assert.Error(t, err) } func createTestMessagePayload(size int) []byte { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 0f1a9d7d82..f606fe0575 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -556,12 +556,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { maxMessageSize: sr.maxMessageSize, } - if err := p.reserveSemaphore(nsr); err != nil { - nsr.chunkID = -1 // force run callback and close txn - nsr.done(nil, err) - return - } - p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize)) } } @@ -1192,7 +1186,7 @@ func (p *partitionProducer) internalSendAsync( return } - if err := p.reserveMem(sr); err != nil { + if err := p.reserveResources(sr); err != nil { p.log.Error(err) sr.done(nil, err) return @@ -1408,9 +1402,10 @@ type sendRequest struct { /// resource management - memLimit internal.MemoryLimitController - reservedMem int64 - semaphore internal.Semaphore + memLimit internal.MemoryLimitController + reservedMem int64 + semaphore internal.Semaphore + reservedSemaphore int /// convey settable state @@ -1478,20 +1473,28 @@ func (sr *sendRequest) done(msgID MessageID, err error) { } func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { - if p.options.DisableBlockIfQueueFull { - if !p.publishSemaphore.TryAcquire() { - return errSendQueueIsFull - } + for i := 0; i < sr.totalChunks; i++ { + if p.options.DisableBlockIfQueueFull { + if !p.publishSemaphore.TryAcquire() { + return errSendQueueIsFull + } - sr.semaphore = p.publishSemaphore - p.metrics.MessagesPending.Inc() - } else { - if !p.publishSemaphore.Acquire(sr.ctx) { - return errContextExpired - } + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + return errContextExpired + } - sr.semaphore = p.publishSemaphore - p.metrics.MessagesPending.Inc() + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } } return nil @@ -1520,6 +1523,16 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) error { return nil } +func (p *partitionProducer) reserveResources(sr *sendRequest) error { + if err := p.reserveSemaphore(sr); err != nil { + return err + } + if err := p.reserveMem(sr); err != nil { + return err + } + return nil +} + type closeProducer struct { doneCh chan struct{} } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 929cfe0d55..49e225f3e1 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2135,12 +2135,11 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { SendTimeout: 2 * time.Second, }) - // producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk) - // because it reaches MaxPendingMessages in chunking + // producer3 cannot reserve 2*1024 bytes because it reaches MaxPendingMessages in chunking _, _ = producer3.Send(context.Background(), &ProducerMessage{ Payload: make([]byte, 2*1024), }) - assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage()) + assert.Zero(t, c.(*client).memLimit.CurrentUsage()) } func TestMemLimitContextCancel(t *testing.T) {