From a752c6de08ad23c31cea0b1e1bde48c93f84a879 Mon Sep 17 00:00:00 2001 From: Felix Geller Date: Sun, 3 Apr 2016 16:06:38 +1200 Subject: [PATCH] consume: basic test for defaulting to all partitions. --- consume.go | 9 +++---- consume_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/consume.go b/consume.go index 143d4e5..aa0260c 100644 --- a/consume.go +++ b/consume.go @@ -244,12 +244,13 @@ func consumeCommand() command { os.Exit(1) } - consume(closer, consumer, partitions) + consume(config.consume, closer, consumer, partitions) }, } } func consume( + config consumeConfig, closer chan struct{}, consumer sarama.Consumer, partitions []int32, @@ -257,12 +258,12 @@ func consume( var wg sync.WaitGroup consuming: for _, partition := range partitions { - offsets, ok := config.consume.offsets[partition] + offsets, ok := config.offsets[partition] if !ok { - offsets, ok = config.consume.offsets[-1] + offsets, ok = config.offsets[-1] } partitionConsumer, err := consumer.ConsumePartition( - config.consume.topic, + config.topic, partition, offsets.start, ) diff --git a/consume_test.go b/consume_test.go index b27c8d9..108292a 100644 --- a/consume_test.go +++ b/consume_test.go @@ -3,6 +3,7 @@ package main import ( "reflect" "testing" + "time" "github.com/Shopify/sarama" ) @@ -180,19 +181,65 @@ Input: config=%+v } } +func TestConsume(t *testing.T) { + closer := make(chan struct{}) + config := consumeConfig{ + topic: "hans", + offsets: map[int32]interval{-1: {1, 5}}, + } + messageChan := make(<-chan *sarama.ConsumerMessage) + calls := make(chan tConsumePartition) + consumer := tConsumer{ + consumePartition: map[tConsumePartition]tPartitionConsumer{ + tConsumePartition{"hans", 1, 1}: tPartitionConsumer{messages: messageChan}, + tConsumePartition{"hans", 2, 1}: tPartitionConsumer{messages: messageChan}, + }, + calls: calls, + } + partitions := []int32{1, 2} + + go consume(config, closer, consumer, partitions) + defer close(closer) + + end := make(chan struct{}) + go func(c chan tConsumePartition, e chan struct{}) { + for { + actual := []tConsumePartition{} + expected := []tConsumePartition{ + tConsumePartition{"hans", 1, 1}, + tConsumePartition{"hans", 2, 1}, + } + for { + select { + case call := <-c: + actual = append(actual, call) + if reflect.DeepEqual(actual, expected) { + e <- struct{}{} + return + } + case _, ok := <-e: + if !ok { + return + } + } + } + } + }(calls, end) + + select { + case <-end: + case <-time.After(1 * time.Second): + t.Errorf("Did not receive calls to consume partitions before timeout.") + close(end) + } +} + type tConsumePartition struct { topic string partition int32 offset int64 } -type tConsumerMessage struct { - Key, Value []byte - Topic string - Partition int32 - Offset int64 -} - type tConsumerError struct { Topic string Partition int32 @@ -228,6 +275,7 @@ type tConsumer struct { consumePartition map[tConsumePartition]tPartitionConsumer consumePartitionErr map[tConsumePartition]error closeErr error + calls chan tConsumePartition } func (c tConsumer) Topics() ([]string, error) { @@ -240,6 +288,7 @@ func (c tConsumer) Partitions(topic string) ([]int32, error) { func (c tConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { cp := tConsumePartition{topic, partition, offset} + c.calls <- cp return c.consumePartition[cp], c.consumePartitionErr[cp] }