Skip to content

Commit

Permalink
Merge pull request #20 from deviceinsight/feature/consumer-group-offsets
Browse files Browse the repository at this point in the history
implement consumer group offset reset
  • Loading branch information
d-rk authored May 24, 2019
2 parents c100303 + 51226aa commit cc8a65d
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add 'config view` command, to view current config
- Add `get consumer-groups` command to list available consumer groups
- Add `describe consumer-group` command to see details of consumer group
- Add `reset offset` command to reset consumer group offsets

## 1.1.0 - 2019-03-14

Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,18 @@ kafkactl describe consumer-group my-group --only-with-lag
kafkactl describe consumer-group my-group --topic my-topic
# using command alias
kafkactl describe cg my-group
```
```

### Reset consumer group offsets

in order to ensure the reset does what it is expected, per default only
the results are printed without actually executing it. Use the additional parameter `--execute` to perform the reset.

```bash
# reset offset of for all partitions to oldest offset
kafkactl reset offset my-group --topic my-topic --oldest
# reset offset of for all partitions to newest offset
kafkactl reset offset my-group --topic my-topic --newest
# reset offset for a single partition to specific offset
kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
```
28 changes: 28 additions & 0 deletions cmd/reset/reset-consumer-group-offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package reset

import (
"github.com/deviceinsight/kafkactl/operations/consumergroupoffsets"
"github.com/spf13/cobra"
)

var offsetFlags consumergroupoffsets.ResetConsumerGroupOffsetFlags

var cmdResetOffset = &cobra.Command{
Use: "consumer-group-offset GROUP",
Aliases: []string{"cgo", "offset"},
Short: "reset a consumer group offset",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
(&consumergroupoffsets.ConsumerGroupOffsetOperation{}).ResetConsumerGroupOffset(offsetFlags, args[0])
},
}

func init() {
cmdResetOffset.Flags().BoolVarP(&offsetFlags.OldestOffset, "oldest", "", false, "set the offset to oldest offset (for all partitions or the specified partition)")
cmdResetOffset.Flags().BoolVarP(&offsetFlags.NewestOffset, "newest", "", false, "set the offset to newest offset (for all partitions or the specified partition)")
cmdResetOffset.Flags().Int64VarP(&offsetFlags.Offset, "offset", "", -1, "set offset to this value. offset with value -1 is ignored")
cmdResetOffset.Flags().Int32VarP(&offsetFlags.Partition, "partition", "p", -1, "partition to apply the offset. -1 stands for all partitions")
cmdResetOffset.Flags().StringVarP(&offsetFlags.Topic, "topic", "t", offsetFlags.Topic, "topic to change offset for")
cmdResetOffset.Flags().BoolVarP(&offsetFlags.Execute, "execute", "e", false, "execute the reset (as default only the results are displayed for validation)")
cmdResetOffset.Flags().StringVarP(&offsetFlags.OutputFormat, "output", "o", offsetFlags.OutputFormat, "output format. One of: json|yaml")
}
14 changes: 14 additions & 0 deletions cmd/reset/reset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package reset

import (
"github.com/spf13/cobra"
)

var CmdReset = &cobra.Command{
Use: "reset",
Short: "reset consumerGroupsOffset",
}

func init() {
CmdReset.AddCommand(cmdResetOffset)
}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/deviceinsight/kafkactl/cmd/describe"
"github.com/deviceinsight/kafkactl/cmd/get"
"github.com/deviceinsight/kafkactl/cmd/produce"
"github.com/deviceinsight/kafkactl/cmd/reset"
"github.com/deviceinsight/kafkactl/output"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -46,6 +47,7 @@ func init() {
rootCmd.AddCommand(describe.CmdDescribe)
rootCmd.AddCommand(get.CmdGet)
rootCmd.AddCommand(produce.CmdProduce)
rootCmd.AddCommand(reset.CmdReset)

// use upper-case letters for shorthand params to avoid conflicts with local flags
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "C", "", fmt.Sprintf("config file. one of: %v", configPaths))
Expand Down
217 changes: 217 additions & 0 deletions operations/consumergroupoffsets/consumer-group-offset-operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package consumergroupoffsets

import (
"context"
"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/output"
"github.com/deviceinsight/kafkactl/util"
"strconv"
)

type ResetConsumerGroupOffsetFlags struct {
Topic string
Partition int32
Offset int64
OldestOffset bool
NewestOffset bool
Execute bool
OutputFormat string
}

type partitionOffsets struct {
Partition int32
OldestOffset int64 `json:"oldestOffset" yaml:"oldestOffset"`
NewestOffset int64 `json:"newestOffset" yaml:"newestOffset"`
CurrentOffset int64 `json:"currentOffset" yaml:"currentOffset"`
TargetOffset int64 `json:"targetOffset" yaml:"targetOffset"`
}

type ConsumerGroupOffsetOperation struct {
}

func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags ResetConsumerGroupOffsetFlags, groupName string) {

if flags.Topic == "" {
output.Failf("no topic specified")
}

if !flags.Execute {
output.Warnf("nothing will be changed (include --execute to perform the reset)")
}

ctx := operations.CreateClientContext()
config := operations.CreateClientConfig(&ctx)

var (
err error
client sarama.Client
)

if client, err = operations.CreateClient(&ctx); err != nil {
output.Failf("failed to create client err=%v", err)
}

if topics, err := client.Topics(); err != nil {
output.Failf("failed to list available topics: %v", err)
} else if !util.ContainsString(topics, flags.Topic) {
output.Failf("topic does not exist: %s", flags.Topic)
}

consumerGroup, err := sarama.NewConsumerGroup(ctx.Brokers, groupName, config)
if err != nil {
output.Failf("failed to create consumer group %s: %v", groupName, err)
}

backgroundCtx := context.Background()

consumer := Consumer{
client: client,
groupName: groupName,
flags: flags,
}

topics := []string{flags.Topic}

consumer.ready = make(chan bool, 0)
err = consumerGroup.Consume(backgroundCtx, topics, &consumer)
if err != nil {
panic(err)
}

<-consumer.ready

err = consumerGroup.Close()
if err != nil {
panic(err)
}
}

type Consumer struct {
ready chan bool
client sarama.Client
groupName string
flags ResetConsumerGroupOffsetFlags
}

func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {

flags := consumer.flags

// admin.ListConsumerGroupOffsets(group, nil) can be used to fetch the offsets when
// https://github.com/Shopify/sarama/pull/1374 is merged
coordinator, err := consumer.client.Coordinator(consumer.groupName)
if err != nil {
output.Failf("failed to get coordinator: %v", err)
}

request := &sarama.OffsetFetchRequest{
// this will only work starting from version 0.10.2.0
Version: 2,
ConsumerGroup: consumer.groupName,
}

groupOffsets, err := coordinator.FetchOffset(request)
if err != nil {
output.Failf("failed to get fetch group offsets: %v", err)
}

offsets := make([]partitionOffsets, 0)

if flags.Partition > -1 {
offset := resetOffset(consumer.client, flags.Partition, flags, groupOffsets, session)
offsets = append(offsets, offset)
} else {

partitions, err := consumer.client.Partitions(flags.Topic)
if err != nil {
output.Failf("failed to list partitions: %v", err)
}

for _, partition := range partitions {
offset := resetOffset(consumer.client, partition, flags, groupOffsets, session)
offsets = append(offsets, offset)
}
}

if flags.OutputFormat != "" {
output.PrintObject(offsets, flags.OutputFormat)
} else {
tableWriter := output.CreateTableWriter()
tableWriter.WriteHeader("PARTITION", "OLDEST_OFFSET", "NEWEST_OFFSET", "CURRENT_OFFSET", "TARGET_OFFSET")
for _, o := range offsets {
tableWriter.Write(strconv.FormatInt(int64(o.Partition), 10),
strconv.FormatInt(int64(o.OldestOffset), 10), strconv.FormatInt(int64(o.NewestOffset), 10),
strconv.FormatInt(int64(o.CurrentOffset), 10), strconv.FormatInt(int64(o.TargetOffset), 10))
}
tableWriter.Flush()
}
return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
return nil
}

func getPartitionOffsets(client sarama.Client, partition int32, flags ResetConsumerGroupOffsetFlags) partitionOffsets {

var err error
offsets := partitionOffsets{Partition: partition}

if offsets.OldestOffset, err = client.GetOffset(flags.Topic, partition, sarama.OffsetOldest); err != nil {
output.Failf("failed to get offset for topic %s Partition %d: %v", flags.Topic, partition, err)
}

if offsets.NewestOffset, err = client.GetOffset(flags.Topic, partition, sarama.OffsetNewest); err != nil {
output.Failf("failed to get offset for topic %s Partition %d: %v", flags.Topic, partition, err)
}

if flags.Offset > -1 {
if flags.Offset < offsets.OldestOffset {
output.Failf("cannot set offset for Partition %d: offset (%d) < oldest offset (%d)", partition, flags.Offset, offsets.OldestOffset)
} else if flags.Offset > offsets.NewestOffset {
output.Failf("cannot set offset for Partition %d: offset (%d) > newest offset (%d)", partition, flags.Offset, offsets.NewestOffset)
} else {
offsets.TargetOffset = flags.Offset
}
} else {
if flags.OldestOffset {
offsets.TargetOffset = offsets.OldestOffset
} else if flags.NewestOffset {
offsets.TargetOffset = offsets.NewestOffset
} else {
output.Failf("either offset,oldest,newest parameter needs to be specified")
}
}

return offsets
}

func resetOffset(client sarama.Client, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) partitionOffsets {
offset := getPartitionOffsets(client, partition, flags)
offset.CurrentOffset = getGroupOffset(groupOffsets, flags.Topic, partition)

if flags.Execute {
if offset.TargetOffset > offset.CurrentOffset {
session.MarkOffset(flags.Topic, partition, offset.TargetOffset, "")
} else if offset.TargetOffset < offset.CurrentOffset {
session.ResetOffset(flags.Topic, partition, offset.TargetOffset, "")
}
}

return offset
}

func getGroupOffset(offsetFetchResponse *sarama.OffsetFetchResponse, topic string, partition int32) int64 {
block := offsetFetchResponse.Blocks[topic][partition]
if block != nil {
return block.Offset
} else {
return -1
}
}

0 comments on commit cc8a65d

Please sign in to comment.