Skip to content

Commit

Permalink
feat: make waiting on consumers optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Johny Jose committed Jul 22, 2020
1 parent c0fd28a commit 21e1336
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions domain/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *connection) close() {
}
}

func (b *Broker) Start(addr string, expectedConsumerGroupMembers map[string]int) error {
func (b *Broker) Start(addr string, expectedConsumerGroupMembers map[string]int, waitForConsumers bool) error {
var err error

b.listener, err = net.Listen("tcp", addr)
Expand All @@ -107,7 +107,7 @@ func (b *Broker) Start(addr string, expectedConsumerGroupMembers map[string]int)

log.Debug().Str("cluster_id", b.Cluster.ID).Int32("broker_id", b.ID).Msg("Broker has started accepting connections")

if expectedConsumerGroupMembers != nil {
if expectedConsumerGroupMembers != nil && waitForConsumers {
log.Info().Str("cluster_id", b.Cluster.ID).Int32("broker_id", b.ID).Msg("Waiting for consumers to join group")
if !b.waitForConsumerGroups() {
err := fmt.Errorf("Kafka consumer failed to join group")
Expand Down
4 changes: 2 additions & 2 deletions domain/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ type Cluster struct {
sync.RWMutex
}

func (c *Cluster) StartBroker(addr string, expectedMembers map[string]int) error {
func (c *Cluster) StartBroker(addr string, expectedMembers map[string]int, waitForConsumers bool) error {
broker := NewBroker(c)
c.brokers = append(c.brokers, broker)
return broker.Start(addr, expectedMembers)
return broker.Start(addr, expectedMembers, waitForConsumers)
}

func (c *Cluster) GetBrokers() []*Broker {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {

func main() {
cluster := domain.NewCluster("kafka")
err := cluster.StartBroker(":9092", nil)
err := cluster.StartBroker(":9092", nil, false)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 21e1336

Please sign in to comment.