Skip to content

Commit

Permalink
-Adding manual assignment in the application ✨
Browse files Browse the repository at this point in the history
  • Loading branch information
fairyhunter13 committed Jun 24, 2020
1 parent 9e2d45f commit 2b9d001
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
5 changes: 3 additions & 2 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package kafkaclient

// List of const for consumers
const (
GoEventsChannelEnable = "go.events.channel.enable"
ClientID = "client.id"
GoEventsChannelEnable = "go.events.channel.enable"
GoApplicationRebalanceEnable = "go.application.rebalance.enable"
ClientID = "client.id"
)

// List of resources type
Expand Down
8 changes: 8 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (c *Consumer) consume(args ConsumeArgs) (err error) {
switch realType := event.(type) {
case *kafka.Message:
c.handleMessage(realType, &args)
case kafka.AssignedPartitions:
c.Assign(realType.Partitions)
case kafka.RevokedPartitions:
c.Unassign()
}
}
}(c, args)
Expand Down Expand Up @@ -67,6 +71,10 @@ func (c *Consumer) consumeBatch(args ConsumeArgs) (err error) {
switch realType := event.(type) {
case *kafka.Message:
c.handleMessage(realType, &args)
case kafka.AssignedPartitions:
c.Assign(realType.Partitions)
case kafka.RevokedPartitions:
c.Unassign()
}
}
}(c, args)
Expand Down
9 changes: 6 additions & 3 deletions container_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import "github.com/confluentinc/confluent-kafka-go/kafka"

// Consume create consumers based on per thread and directly consume messages from the Kafka broker.
func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
newConfig := c.cloneConfig(config)
newConfig[GoApplicationRebalanceEnable] = true
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
cons, err = c.NewConsumer(config)
cons, err = c.NewConsumer(newConfig)
if err != nil {
return
}
Expand Down Expand Up @@ -39,10 +41,11 @@ func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (cons
// ConsumeBatch create consumers based on per thread and directly consume messages from the Kafka broker.
// ConsumeBatch is an improved version of Consume but polling in a batch manner.
func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
newConfig := c.cloneConfig(config)
newConfig[GoEventsChannelEnable] = true
newConfig[GoApplicationRebalanceEnable] = true
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
newConfig := c.cloneConfig(config)
newConfig[GoEventsChannelEnable] = true
cons, err = c.NewConsumer(newConfig)
if err != nil {
return
Expand Down

0 comments on commit 2b9d001

Please sign in to comment.