Skip to content

Commit

Permalink
bulker: optimize retry consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 15, 2024
1 parent f027674 commit 8e6e92d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 57 deletions.
102 changes: 51 additions & 51 deletions bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package app

import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jitsucom/bulker/bulkerapp/metrics"
Expand All @@ -25,6 +24,7 @@ const errorHeader = "error"
const pauseHeartBeatInterval = 120 * time.Second

type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error)
type ShouldConsumeFunction func(committedOffset, highOffset int64) bool

type BatchConsumer interface {
RunJob()
Expand All @@ -38,16 +38,15 @@ type BatchConsumer interface {
type AbstractBatchConsumer struct {
sync.Mutex
*AbstractConsumer
repository *Repository
destinationId string
batchPeriodSec int
consumerConfig kafka.ConfigMap
consumer atomic.Pointer[kafka.Consumer]
producerConfig kafka.ConfigMap
transactionalProducer atomic.Pointer[kafka.Producer]
mode string
tableName string
waitForMessages time.Duration
repository *Repository
destinationId string
batchPeriodSec int
consumerConfig kafka.ConfigMap
consumer atomic.Pointer[kafka.Consumer]
producerConfig kafka.ConfigMap
mode string
tableName string
waitForMessages time.Duration

closed chan struct{}

Expand All @@ -61,7 +60,8 @@ type AbstractBatchConsumer struct {
paused atomic.Bool
resumeChannel chan struct{}

batchFunc BatchFunction
batchFunc BatchFunction
shouldConsumeFunc ShouldConsumeFunction
}

func NewAbstractBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId, mode string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*AbstractBatchConsumer, error) {
Expand Down Expand Up @@ -122,14 +122,24 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc
return nil, abstract.NewError("Failed to subscribe to topic: %v", err)
}

return bc, nil
}

func (bc *AbstractBatchConsumer) initTransactionalProducer() (*kafka.Producer, error) {
//start := time.Now()
producer, err := kafka.NewProducer(&bc.producerConfig)
if err != nil {
metrics.ConsumerErrors(bc.topicId, bc.mode, bc.destinationId, bc.tableName, metrics.KafkaErrorCode(err)).Inc()
return nil, fmt.Errorf("error creating kafka producer: %v", err)
}
err = producer.InitTransactions(nil)
if err != nil {
metrics.ConsumerErrors(bc.topicId, bc.mode, bc.destinationId, bc.tableName, metrics.KafkaErrorCode(err)).Inc()
return nil, fmt.Errorf("error initializing kafka producer transactions: %v", err)
}
// Delivery reports channel for 'failed' producer messages
safego.RunWithRestart(func() {
for {
producer := bc.transactionalProducer.Load()
if producer == nil {
time.Sleep(time.Second * 10)
continue
}
select {
case <-bc.closed:
bc.Infof("Closing producer.")
Expand All @@ -146,43 +156,17 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc
kafkabase.ProducerMessages(ProducerMessageLabels(*ev.TopicPartition.Topic, "delivered", "")).Inc()
bc.Debugf("Message ID: %s delivered to topic %s [%d] at offset %v", messageId, *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
}
//case kafka.Error:
// bc.Errorf("Producer error: %v", ev)
case *kafka.Error, kafka.Error:
bc.Errorf("Producer error: %v", ev)
case nil:
bc.Debugf("Producer closed")
return
}
}
}
})
return bc, nil
}

func (bc *AbstractBatchConsumer) initTransactionalProducer() *kafka.Producer {
producer := bc.transactionalProducer.Load()
if producer != nil {
return producer
}
bc.Infof("Setting up transactional producer.")
producer, err := kafka.NewProducer(&bc.producerConfig)
if err != nil {
metrics.ConsumerErrors(bc.topicId, bc.mode, bc.destinationId, bc.tableName, metrics.KafkaErrorCode(err)).Inc()
panic(bc.NewError("error creating kafka producer: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
//enable transactions support for producer
err = producer.InitTransactions(ctx)
if err != nil {
metrics.ConsumerErrors(bc.topicId, bc.mode, bc.destinationId, bc.tableName, metrics.KafkaErrorCode(err)).Inc()
panic(bc.NewError("error initializing kafka producer transactions for 'failed' producer: %v", err))
}
bc.transactionalProducer.Store(producer)
return producer
}

func (bc *AbstractBatchConsumer) closeTransactionalProducer() {
producer := bc.transactionalProducer.Swap(nil)
if producer != nil {
producer.Close()
}
//bc.Infof("Producer initialized in %s", time.Since(start))
return producer, nil
}

func (bc *AbstractBatchConsumer) BatchPeriodSec() int {
Expand Down Expand Up @@ -218,7 +202,8 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
counters.firstOffset = int64(kafka.OffsetBeginning)
bc.Debugf("Starting consuming messages from topic")
bc.idle.Store(false)
var lowOffset, highOffset int64
lowOffset := int64(kafka.OffsetBeginning)
var highOffset int64
defer func() {
bc.idle.Store(true)
bc.pause()
Expand Down Expand Up @@ -276,6 +261,10 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
bc.errorMetric("query_watermark_failed")
return BatchCounters{}, bc.NewError("Failed to query watermark offsets: %v", err)
}
if !bc.shouldConsume(lowOffset, highOffset) {
bc.Debugf("Consumer should not consume. offsets: %d-%d", lowOffset, highOffset)
return BatchCounters{}, nil
}
batchNumber := 1
for {
if bc.retired.Load() {
Expand Down Expand Up @@ -310,6 +299,17 @@ func (bc *AbstractBatchConsumer) processBatch(destination *Destination, batchNum
return bc.batchFunc(destination, batchNum, batchSize, retryBatchSize, highOffset)
}

func (bc *AbstractBatchConsumer) shouldConsume(committedOffset, highOffset int64) bool {
if highOffset == 0 || committedOffset == highOffset {
return false
}
if bc.shouldConsumeFunc != nil {
bc.resume()
return bc.shouldConsumeFunc(committedOffset, highOffset)
}
return true
}

// pause consumer.
func (bc *AbstractBatchConsumer) pause() {
if bc.idle.Load() && bc.retired.Load() {
Expand Down
10 changes: 8 additions & 2 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum

// processFailed consumes the latest failed batch of messages and sends them to the 'failed' topic
func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition, failedPosition *kafka.TopicPartition, originalErr error) (counters BatchCounters, err error) {
var producer *kafka.Producer
defer func() {
//recover
if r := recover(); r != nil {
Expand All @@ -193,10 +194,15 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition,
}
if err != nil {
err = bc.NewError("Failed to put unsuccessful batch to 'failed' producer: %v", err)
bc.closeTransactionalProducer()
}
if producer != nil {
producer.Close()
}
}()
producer := bc.initTransactionalProducer()
producer, err = bc.initTransactionalProducer()
if err != nil {
return
}

bc.resume()

Expand Down
53 changes: 51 additions & 2 deletions bulkerapp/app/retry_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,54 @@ func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodS
AbstractBatchConsumer: base,
}
rc.batchFunc = rc.processBatchImpl
rc.shouldConsumeFunc = rc.shouldConsumeFuncImpl
rc.pause()
return &rc, nil
}

func (rc *RetryConsumer) shouldConsumeFuncImpl(committedOffset, highOffset int64) bool {
var firstPosition *kafka.TopicPartition
defer func() {
//recover
if r := recover(); r != nil {
rc.SystemErrorf("Recovered from panic: %v", r)
}
if firstPosition != nil {
_, err := rc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition})
if err != nil {
rc.SystemErrorf("Failed to seek to first position: %v", err)
//rc.restartConsumer()
}
}
}()
currentOffset := committedOffset
for currentOffset < highOffset {
message, err := rc.consumer.Load().ReadMessage(rc.waitForMessages)
if err != nil {
kafkaErr := err.(kafka.Error)
if kafkaErr.Code() == kafka.ErrTimedOut && currentOffset == highOffset-1 {
rc.Debugf("Timeout. No messages to retry. %d-%d", committedOffset, highOffset)
return false
}
rc.Infof("Failed to check shouldConsume. %d-%d. Error: %v", committedOffset, highOffset, err)
// we don't handle errors here. allow consuming to handle error properly
return true
}
if firstPosition == nil {
firstPosition = &message.TopicPartition
}
currentOffset = int64(message.TopicPartition.Offset)
if rc.isTimeToRetry(message) {
rc.Debugf("Found message to retry: %d of %d-%d", currentOffset, committedOffset, highOffset)
//at least one message is ready to retry. we should consume
return true
}

}
rc.Debugf("No messages to retry. %d-%d", committedOffset, highOffset)
return false
}

func (rc *RetryConsumer) processBatchImpl(_ *Destination, _, _, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error) {
counters.firstOffset = int64(kafka.OffsetBeginning)

Expand Down Expand Up @@ -55,9 +99,11 @@ func (rc *RetryConsumer) processBatchImpl(_ *Destination, _, _, retryBatchSize i
if txOpened {
_ = producer.AbortTransaction(context.Background())
}
rc.closeTransactionalProducer()
nextBatch = false
}
if producer != nil {
producer.Close()
}
}()

nextBatch = true
Expand Down Expand Up @@ -86,7 +132,10 @@ func (rc *RetryConsumer) processBatchImpl(_ *Destination, _, _, retryBatchSize i
if counters.consumed == 1 {
counters.firstOffset = int64(message.TopicPartition.Offset)
firstPosition = &message.TopicPartition
producer = rc.initTransactionalProducer()
producer, err = rc.initTransactionalProducer()
if err != nil {
return counters, false, err
}
err = producer.BeginTransaction()
if err != nil {
return counters, false, fmt.Errorf("failed to begin kafka transaction: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions kafkabase/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (p *Producer) Start() {
ProducerMessages(p.metricsLabelFunc(*ev.TopicPartition.Topic, "delivered", "")).Inc()
p.Debugf("Message ID: %s delivered to topic %s [%d] at offset %v", messageId, *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
}
//case kafka.Error:
// p.Debugf("Producer error: %v", ev)
case *kafka.Error, kafka.Error:
p.Errorf("Producer error: %v", ev)
}
}
p.Infof("Producer closed")
Expand Down

0 comments on commit 8e6e92d

Please sign in to comment.