Skip to content

Commit

Permalink
feat(sink): kafka cooperative consumer (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Jan 25, 2024
1 parent 5a27788 commit 64857d3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
4 changes: 4 additions & 0 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr
_ = consumerKafkaConfig.SetKey("enable.auto.commit", false)
_ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true)
// Used when offset retention resets the offset. In this case we want to consume from the latest offset as everything before should be already processed.
_ = consumerKafkaConfig.SetKey("auto.offset.reset", "latest")
// Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
_ = consumerKafkaConfig.SetKey("partition.assignment.strategy", "cooperative-sticky")

consumer, err := kafka.NewConsumer(&consumerKafkaConfig)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions internal/sink/partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sink

import (
"fmt"
"sort"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func prettyPartitions(partitions []kafka.TopicPartition) []string {
out := make([]string, 0, len(partitions))

for _, partition := range partitions {
var topicName string

if partition.Topic != nil {
topicName = *partition.Topic
}

out = append(out, fmt.Sprintf("%s-%d", topicName, partition.Partition))
}

sort.Strings(out)

return out
}
38 changes: 33 additions & 5 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,31 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {

switch e := event.(type) {
case kafka.AssignedPartitions:
logger.Info("kafka assigned partitions", "partitions", e.Partitions)
err := s.config.Consumer.Assign(e.Partitions)
logger.Info("kafka assigned partitions", "partitions", prettyPartitions(e.Partitions))

// Consumer to use the committed offset as a start position,
// with a fallback to `auto.offset.reset` if there is no committed offset.
// Auto offset reset is typically should be set to latest, so we will only consume new messages.
// Where old messages are already processed and stored in ClickHouse.
for i := range e.Partitions {
e.Partitions[i].Offset = kafka.OffsetStored
}

// IncrementalAssign adds the specified partitions to the current set of partitions to consume.
err := s.config.Consumer.IncrementalAssign(e.Partitions)
if err != nil {
return fmt.Errorf("failed to assign partitions: %w", err)
}

// TODO: this may be can be removed if IncrementalAssign overwrites previous pause
// This is here to ensure we resumse previously paused partitions
err = c.Resume(e.Partitions)
if err != nil {
return fmt.Errorf("failed to resume partitions: %w", err)
}

case kafka.RevokedPartitions:
logger.Info("kafka revoked partitions", "partitions", e.Partitions)
logger.Info("kafka revoked partitions", "partitions", prettyPartitions(e.Partitions))

// Usually, the rebalance callback for `RevokedPartitions` is called
// just before the partitions are revoked. We can be certain that a
Expand All @@ -540,13 +558,23 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
logger.Warn("assignment lost involuntarily, commit may fail")
}

err := s.flush()
// Pause the consumer to stop processing messages from revoked partitions
// This is important to avoid commit offset failures.
err := c.Pause(e.Partitions)
if err != nil {
return fmt.Errorf("failed to pause partitions: %w", err)
}

// We flush the buffer to ensure we commit offsets for revoked partitions until we can.
// After unassigning the partitions we will not be able to commit offsets for revoked partitions.
err = s.flush()
if err != nil {
// Stop processing, non-recoverable error
return fmt.Errorf("failed to flush: %w", err)
}

err = s.config.Consumer.Unassign()
// IncrementalUnassign removes the specified partitions from the current set of partitions to consume.
err = s.config.Consumer.IncrementalUnassign(e.Partitions)
if err != nil {
return fmt.Errorf("failed to unassign partitions: %w", err)
}
Expand Down

0 comments on commit 64857d3

Please sign in to comment.