Skip to content

Commit

Permalink
Merge pull request #22 from jeroenrinzema/v0.4.0-rc
Browse files Browse the repository at this point in the history
V0.4.0 rc
  • Loading branch information
jeroenrinzema authored Aug 22, 2019
2 parents 3bb9cb0 + 00eace9 commit 172885a
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 22 deletions.
13 changes: 6 additions & 7 deletions dialects/kafka/consumer/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package consumer

import (
"context"
"log"
"sync"

"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)

// NewGroupHandle initializes a new GroupHandle
Expand All @@ -24,15 +24,14 @@ type GroupHandle struct {
consumer sarama.ConsumerGroup
group string
ready chan bool
config *sarama.Config
consumptions sync.WaitGroup
closing bool
}

// Connect initializes a new Sarama consumer group and awaits till the consumer
// group is set up and ready to consume messages.
func (handle *GroupHandle) Connect(brokers []string, topics []string, group string, config *sarama.Config) error {
consumer, err := sarama.NewConsumerGroup(brokers, group, config)
func (handle *GroupHandle) Connect(conn sarama.Client, topics []string, group string) error {
consumer, err := sarama.NewConsumerGroupFromClient(group, conn)
if err != nil {
return err
}
Expand All @@ -45,8 +44,9 @@ func (handle *GroupHandle) Connect(brokers []string, topics []string, group stri

ctx := context.Background()
err := consumer.Consume(ctx, topics, handle)

log.Println(err)
if err != nil {
logrus.Error(err)
}
}
}()

Expand All @@ -58,7 +58,6 @@ func (handle *GroupHandle) Connect(brokers []string, topics []string, group stri

handle.consumer = consumer
handle.group = group
handle.config = config

return nil
}
Expand Down
23 changes: 20 additions & 3 deletions dialects/kafka/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,36 @@ type Client struct {
brokers []string
topics map[string]*Topic
ready chan bool
conn sarama.Client
group string
}

// Healthy checks the health of the Kafka client
func (client *Client) Healthy() bool {
if len(client.conn.Brokers()) == 0 {
return false
}

return true
}

// Connect opens a new Kafka consumer
func (client *Client) Connect(initialOffset int64, config *sarama.Config, ts ...types.Topic) error {
func (client *Client) Connect(brokers []string, config *sarama.Config, initialOffset int64, ts ...types.Topic) error {
conn, err := sarama.NewClient(brokers, config)
if err != nil {
return err
}

topics := []string{}
for _, topic := range ts {
topics = append(topics, topic.Name)
}

client.conn = conn

if client.group != "" {
handle := NewGroupHandle(client)
err := handle.Connect(client.brokers, topics, client.group, config)
err := handle.Connect(conn, topics, client.group)
if err != nil {
return err
}
Expand All @@ -90,7 +107,7 @@ func (client *Client) Connect(initialOffset int64, config *sarama.Config, ts ...
}

handle := NewPartitionHandle(client)
err := handle.Connect(client.brokers, topics, initialOffset, config)
err = handle.Connect(conn, topics, initialOffset)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dialects/kafka/consumer/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func (handle *PartitionHandle) Rebalance() error {

// Connect initializes a new Sarama partition consumer and awaits till the consumer
// group is set up and ready to consume messages.
func (handle *PartitionHandle) Connect(brokers []string, topics []string, initialOffset int64, config *sarama.Config) error {
consumer, err := sarama.NewConsumer(brokers, config)
func (handle *PartitionHandle) Connect(conn sarama.Client, topics []string, initialOffset int64) error {
consumer, err := sarama.NewConsumerFromClient(conn)
if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion dialects/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (dialect *Dialect) Assigned(topic types.Topic) {

// Open opens a kafka consumer and producer
func (dialect *Dialect) Open() (err error) {
err = dialect.consumer.Connect(dialect.Connection.InitialOffset, dialect.Config, dialect.Topics...)
err = dialect.consumer.Connect(dialect.Connection.Brokers, dialect.Config, dialect.Connection.InitialOffset, dialect.Topics...)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,5 +92,17 @@ func (dialect *Dialect) Close() error {

// Healthy returns a boolean that reprisents if the dialect is healthy
func (dialect *Dialect) Healthy() bool {
if dialect.consumer == nil && dialect.producer == nil {
return false
}

if dialect.consumer != nil && !dialect.consumer.Healthy() {
return false
}

if dialect.producer != nil && !dialect.producer.Healthy() {
return false
}

return true
}
22 changes: 17 additions & 5 deletions dialects/kafka/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,33 @@ func NewClient() *Client {
// Client produces kafka messages
type Client struct {
producer sarama.SyncProducer
brokers []string
config *sarama.Config
conn sarama.Client
production sync.WaitGroup
}

// Healthy checks the health of the Kafka client
func (client *Client) Healthy() bool {
if len(client.conn.Brokers()) == 0 {
return false
}

return true
}

// Connect initializes and opens a new Sarama producer group.
func (client *Client) Connect(brokers []string, config *sarama.Config) error {
producer, err := sarama.NewSyncProducer(brokers, config)
conn, err := sarama.NewClient(brokers, config)
if err != nil {
return err
}

producer, err := sarama.NewSyncProducerFromClient(conn)
if err != nil {
return err
}

client.producer = producer
client.brokers = brokers
client.config = config
client.conn = conn

return nil
}
Expand Down
9 changes: 5 additions & 4 deletions dialects/mock/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ type Consumer struct {
func (consumer *Consumer) Emit(message *types.Message) {
consumer.logger.Debug("emitting message!")

consumer.mutex.RLock()
consumer.mutex.Lock()
consumer.consumptions.Add(1)

defer consumer.consumptions.Done()

collection, has := consumer.subscriptions[message.Topic.Name]
if !has {
consumer.mutex.RUnlock()
consumer.mutex.Unlock()
return
}

length := len(collection.list)
if length == 0 {
consumer.mutex.RUnlock()
consumer.mutex.Unlock()
return
}

Expand All @@ -49,7 +50,7 @@ func (consumer *Consumer) Emit(message *types.Message) {
close(resolved)
}(collection, message)

consumer.mutex.RUnlock()
consumer.mutex.Unlock()
<-resolved
}

Expand Down

0 comments on commit 172885a

Please sign in to comment.