Skip to content

Commit

Permalink
consume: basic test for defaulting to all partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Apr 3, 2016
1 parent c4254be commit a752c6d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
9 changes: 5 additions & 4 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,25 +244,26 @@ func consumeCommand() command {
os.Exit(1)
}

consume(closer, consumer, partitions)
consume(config.consume, closer, consumer, partitions)
},
}
}

func consume(
config consumeConfig,
closer chan struct{},
consumer sarama.Consumer,
partitions []int32,
) {
var wg sync.WaitGroup
consuming:
for _, partition := range partitions {
offsets, ok := config.consume.offsets[partition]
offsets, ok := config.offsets[partition]
if !ok {
offsets, ok = config.consume.offsets[-1]
offsets, ok = config.offsets[-1]
}
partitionConsumer, err := consumer.ConsumePartition(
config.consume.topic,
config.topic,
partition,
offsets.start,
)
Expand Down
63 changes: 56 additions & 7 deletions consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"reflect"
"testing"
"time"

"github.com/Shopify/sarama"
)
Expand Down Expand Up @@ -180,19 +181,65 @@ Input: config=%+v
}
}

func TestConsume(t *testing.T) {
closer := make(chan struct{})
config := consumeConfig{
topic: "hans",
offsets: map[int32]interval{-1: {1, 5}},
}
messageChan := make(<-chan *sarama.ConsumerMessage)
calls := make(chan tConsumePartition)
consumer := tConsumer{
consumePartition: map[tConsumePartition]tPartitionConsumer{
tConsumePartition{"hans", 1, 1}: tPartitionConsumer{messages: messageChan},
tConsumePartition{"hans", 2, 1}: tPartitionConsumer{messages: messageChan},
},
calls: calls,
}
partitions := []int32{1, 2}

go consume(config, closer, consumer, partitions)
defer close(closer)

end := make(chan struct{})
go func(c chan tConsumePartition, e chan struct{}) {
for {
actual := []tConsumePartition{}
expected := []tConsumePartition{
tConsumePartition{"hans", 1, 1},
tConsumePartition{"hans", 2, 1},
}
for {
select {
case call := <-c:
actual = append(actual, call)
if reflect.DeepEqual(actual, expected) {
e <- struct{}{}
return
}
case _, ok := <-e:
if !ok {
return
}
}
}
}
}(calls, end)

select {
case <-end:
case <-time.After(1 * time.Second):
t.Errorf("Did not receive calls to consume partitions before timeout.")
close(end)
}
}

type tConsumePartition struct {
topic string
partition int32
offset int64
}

type tConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
}

type tConsumerError struct {
Topic string
Partition int32
Expand Down Expand Up @@ -228,6 +275,7 @@ type tConsumer struct {
consumePartition map[tConsumePartition]tPartitionConsumer
consumePartitionErr map[tConsumePartition]error
closeErr error
calls chan tConsumePartition
}

func (c tConsumer) Topics() ([]string, error) {
Expand All @@ -240,6 +288,7 @@ func (c tConsumer) Partitions(topic string) ([]int32, error) {

func (c tConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
cp := tConsumePartition{topic, partition, offset}
c.calls <- cp
return c.consumePartition[cp], c.consumePartitionErr[cp]
}

Expand Down

0 comments on commit a752c6d

Please sign in to comment.