From 64857d3651bfaef900815090d89e3ac44914d499 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Thu, 25 Jan 2024 07:56:28 -0800 Subject: [PATCH] feat(sink): kafka cooperative consumer (#568) --- cmd/sink-worker/main.go | 4 ++++ internal/sink/partition.go | 26 ++++++++++++++++++++++++++ internal/sink/sink.go | 38 +++++++++++++++++++++++++++++++++----- 3 files changed, 63 insertions(+), 5 deletions(-) create mode 100644 internal/sink/partition.go diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 198591eaf..da1c6eac6 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -258,6 +258,10 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr _ = consumerKafkaConfig.SetKey("enable.auto.commit", false) _ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false) _ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true) + // Used when offset retention resets the offset. In this case we want to consume from the latest offset as everything before should be already processed. + _ = consumerKafkaConfig.SetKey("auto.offset.reset", "latest") + // Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible. + _ = consumerKafkaConfig.SetKey("partition.assignment.strategy", "cooperative-sticky") consumer, err := kafka.NewConsumer(&consumerKafkaConfig) if err != nil { diff --git a/internal/sink/partition.go b/internal/sink/partition.go new file mode 100644 index 000000000..0711befbb --- /dev/null +++ b/internal/sink/partition.go @@ -0,0 +1,26 @@ +package sink + +import ( + "fmt" + "sort" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func prettyPartitions(partitions []kafka.TopicPartition) []string { + out := make([]string, 0, len(partitions)) + + for _, partition := range partitions { + var topicName string + + if partition.Topic != nil { + topicName = *partition.Topic + } + + out = append(out, fmt.Sprintf("%s-%d", topicName, partition.Partition)) + } + + sort.Strings(out) + + return out +} diff --git a/internal/sink/sink.go b/internal/sink/sink.go index e99bd9775..ae74cade9 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -517,13 +517,31 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { switch e := event.(type) { case kafka.AssignedPartitions: - logger.Info("kafka assigned partitions", "partitions", e.Partitions) - err := s.config.Consumer.Assign(e.Partitions) + logger.Info("kafka assigned partitions", "partitions", prettyPartitions(e.Partitions)) + + // Consumer to use the committed offset as a start position, + // with a fallback to `auto.offset.reset` if there is no committed offset. + // Auto offset reset is typically should be set to latest, so we will only consume new messages. + // Where old messages are already processed and stored in ClickHouse. + for i := range e.Partitions { + e.Partitions[i].Offset = kafka.OffsetStored + } + + // IncrementalAssign adds the specified partitions to the current set of partitions to consume. + err := s.config.Consumer.IncrementalAssign(e.Partitions) if err != nil { return fmt.Errorf("failed to assign partitions: %w", err) } + + // TODO: this may be can be removed if IncrementalAssign overwrites previous pause + // This is here to ensure we resumse previously paused partitions + err = c.Resume(e.Partitions) + if err != nil { + return fmt.Errorf("failed to resume partitions: %w", err) + } + case kafka.RevokedPartitions: - logger.Info("kafka revoked partitions", "partitions", e.Partitions) + logger.Info("kafka revoked partitions", "partitions", prettyPartitions(e.Partitions)) // Usually, the rebalance callback for `RevokedPartitions` is called // just before the partitions are revoked. We can be certain that a @@ -540,13 +558,23 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { logger.Warn("assignment lost involuntarily, commit may fail") } - err := s.flush() + // Pause the consumer to stop processing messages from revoked partitions + // This is important to avoid commit offset failures. + err := c.Pause(e.Partitions) + if err != nil { + return fmt.Errorf("failed to pause partitions: %w", err) + } + + // We flush the buffer to ensure we commit offsets for revoked partitions until we can. + // After unassigning the partitions we will not be able to commit offsets for revoked partitions. + err = s.flush() if err != nil { // Stop processing, non-recoverable error return fmt.Errorf("failed to flush: %w", err) } - err = s.config.Consumer.Unassign() + // IncrementalUnassign removes the specified partitions from the current set of partitions to consume. + err = s.config.Consumer.IncrementalUnassign(e.Partitions) if err != nil { return fmt.Errorf("failed to unassign partitions: %w", err) }