Skip to content

Commit

Permalink
Merge pull request #171 from nanic/main
Browse files Browse the repository at this point in the history
Feature: Reset by time
  • Loading branch information
d-rk authored Nov 10, 2023
2 parents 4bc4019 + 6685de3 commit 321a6eb
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 52 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
kafkactl reset offset my-group --all-topics --newest
# reset offset of for all partitions on multiple topics to oldest offset
kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352
```

### Delete consumer group offsets
Expand Down
3 changes: 1 addition & 2 deletions cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ func newAlterPartitionCmd() *cobra.Command {
return topic.CompleteTopicNames(cmd, args, toComplete)
} else if len(args) == 1 {
return partition.CompletePartitionIds(cmd, args, toComplete)
} else {
return nil, cobra.ShellCompDirectiveNoFileComp
}
return nil, cobra.ShellCompDirectiveNoFileComp
},
}

Expand Down
1 change: 1 addition & 0 deletions cmd/reset/reset-consumer-group-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newResetOffsetCmd() *cobra.Command {
cmdResetOffset.Flags().StringArrayVarP(&offsetFlags.Topic, "topic", "t", offsetFlags.Topic, "one ore more topics to change offset for")
cmdResetOffset.Flags().BoolVarP(&offsetFlags.Execute, "execute", "e", false, "execute the reset (as default only the results are displayed for validation)")
cmdResetOffset.Flags().StringVarP(&offsetFlags.OutputFormat, "output", "o", offsetFlags.OutputFormat, "output format. One of: json|yaml")
cmdResetOffset.Flags().StringVarP(&offsetFlags.ToDatetime, "to-datetime", "", "", "set the offset to offset of given timestamp")

return cmdResetOffset
}
43 changes: 43 additions & 0 deletions cmd/reset/reset-consumer-group-offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reset_test
import (
"strings"
"testing"
"time"

"github.com/deviceinsight/kafkactl/testutil"
)
Expand Down Expand Up @@ -130,6 +131,48 @@ func TestResetCGOForAllTopicsInTheGroupIntegration(t *testing.T) {
testutil.VerifyTopicNotInConsumerGroup(t, group, topicOther)
}

func TestResetCGOToDatetimeIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

topicName := testutil.CreateTopic(t, "reset-cgo-datetime")

group := testutil.CreateConsumerGroup(t, "reset-cgo-datetime", topicName)

testutil.ProduceMessage(t, topicName, "test-key", "a", 0, 0)
testutil.ProduceMessage(t, topicName, "test-key", "b", 0, 1)

time.Sleep(1 * time.Millisecond) // need to have messaged produced at different milliseconds to have reproducible test

t1 := time.Now()
t1Formatted := t1.Format("2006-01-02T15:04:05.000Z")

testutil.ProduceMessage(t, topicName, "test-key", "c", 0, 2)
testutil.ProduceMessage(t, topicName, "test-key", "d", 0, 3)
testutil.ProduceMessage(t, topicName, "test-key", "e", 0, 4)
testutil.ProduceMessage(t, topicName, "test-key", "f", 0, 5)

testutil.VerifyConsumerGroupOffset(t, group, topicName, 0)

//test with --to-datetime
kafkaCtl := testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("reset", "offset", group, "--topic", topicName, "--to-datetime", t1Formatted, "--execute"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.VerifyConsumerGroupOffset(t, group, topicName, 2)

kafkaCtl = testutil.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("consume", topicName, "--group", group, "--max-messages", "4"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")
testutil.AssertArraysEquals(t, []string{"c", "d", "e", "f"}, messages)
}

func TestResetCGOAutoCompletionIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
Expand Down
48 changes: 24 additions & 24 deletions internal/broker/broker-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,40 +163,40 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags)
return output.PrintObject(brokerInfo, flags.OutputFormat)
} else if flags.OutputFormat != "" && flags.OutputFormat != "wide" {
return errors.Errorf("unknown outputFormat: %s", flags.OutputFormat)
} else {

tableWriter := output.CreateTableWriter()
}

// write broker info table
if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil {
return err
}
tableWriter := output.CreateTableWriter()

if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil {
return err
}
// write broker info table
if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil {
return err
}

if err := tableWriter.Flush(); err != nil {
return err
}
if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil {
return err
}

output.PrintStrings("")
if err := tableWriter.Flush(); err != nil {
return err
}

// first write config table
if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil {
return err
}
output.PrintStrings("")

for _, c := range brokerInfo.Configs {
if err := tableWriter.Write(c.Name, c.Value); err != nil {
return err
}
}
// first write config table
if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil {
return err
}

if err := tableWriter.Flush(); err != nil {
for _, c := range brokerInfo.Configs {
if err := tableWriter.Write(c.Name, c.Value); err != nil {
return err
}
}

if err := tableWriter.Flush(); err != nil {
return err
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ func GetClientID(context *ClientContext, defaultPrefix string) string {
} else if usr, err = user.Current(); err != nil {
output.Warnf("Failed to read current user: %v", err)
return strings.TrimSuffix(defaultPrefix, "-")
} else {
return defaultPrefix + sanitizeUsername(usr.Username)
}
return defaultPrefix + sanitizeUsername(usr.Username)
}

func sanitizeUsername(u string) string {
Expand Down
3 changes: 1 addition & 2 deletions internal/consume/AvroMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func (deserializer AvroMessageDeserializer) CanDeserialize(topic string) (bool,
return true, nil
} else if util.ContainsString(subjects, topic+"-value") {
return true, nil
} else {
return false, nil
}
return false, nil
}

func (deserializer AvroMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error {
Expand Down
12 changes: 5 additions & 7 deletions internal/consume/PartitionConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func getOffsetBounds(client *sarama.Client, topic string, flags Flags, currentPa

// Converts string to epoch unix timestamp
// The string might be null in that case, the flag is considered absent and the value -1 is returned
func convertToEpocUnixMillis(timestamp string) (int64, error) {
func ConvertToEpocUnixMillis(timestamp string) (int64, error) {
if timestamp == "" {
return -1, nil
}
Expand All @@ -178,7 +178,7 @@ func convertToEpocUnixMillis(timestamp string) (int64, error) {
}

func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
var fromUnixMillis, err = convertToEpocUnixMillis(flags.FromTimestamp)
var fromUnixMillis, err = ConvertToEpocUnixMillis(flags.FromTimestamp)
if err != nil {
return ErrOffset, err
}
Expand All @@ -191,13 +191,12 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar
return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
} else if len(flags.Offsets) > 0 {
return extractOffsetForPartition(flags, currentPartition)
} else {
return sarama.OffsetNewest, nil
}
return sarama.OffsetNewest, nil
}

func getEndOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) {
var toUnixMillis, err = convertToEpocUnixMillis(flags.ToTimestamp)
var toUnixMillis, err = ConvertToEpocUnixMillis(flags.ToTimestamp)
if err != nil {
return ErrOffset, err
}
Expand All @@ -210,9 +209,8 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti
return ErrOffset, err
}
return newestOffset, nil
} else {
return sarama.OffsetNewest, nil
}
return sarama.OffsetNewest, nil
}

func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) {
Expand Down
26 changes: 18 additions & 8 deletions internal/consumergroupoffsets/OffsetResettingConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"

"github.com/IBM/sarama"
"github.com/deviceinsight/kafkactl/internal/consume"
"github.com/deviceinsight/kafkactl/output"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi
offsets := make([]partitionOffsets, 0)

if flags.Partition > -1 {
offset, err := resetOffset(consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session)
offset, err := resetOffset(&consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session)
if err != nil {
return err
}
Expand All @@ -62,7 +63,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi
}

for _, partition := range partitions {
offset, err := resetOffset(consumer.client, consumer.topicName, partition, flags, groupOffsets, session)
offset, err := resetOffset(&consumer.client, consumer.topicName, partition, flags, groupOffsets, session)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,7 +103,7 @@ func (consumer *OffsetResettingConsumer) ConsumeClaim(sarama.ConsumerGroupSessio
return nil
}

func resetOffset(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) {
func resetOffset(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) {
offset, err := getPartitionOffsets(client, topic, partition, flags)
if err != nil {
return offset, err
Expand All @@ -121,16 +122,26 @@ func resetOffset(client sarama.Client, topic string, partition int32, flags Rese
return offset, nil
}

func getPartitionOffsets(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) {
func getPartitionOffsets(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) {

var err error
offsets := partitionOffsets{Partition: partition}

if offsets.OldestOffset, err = client.GetOffset(topic, partition, sarama.OffsetOldest); err != nil {
if flags.ToDatetime != "" {
milliTime, err := consume.ConvertToEpocUnixMillis(flags.ToDatetime)
if err != nil {
return offsets, err
}
if offsets.TargetOffset, err = (*client).GetOffset(topic, partition, milliTime); err == nil {
return offsets, nil
}
}

if offsets.OldestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetOldest); err != nil {
return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
}

if offsets.NewestOffset, err = client.GetOffset(topic, partition, sarama.OffsetNewest); err != nil {
if offsets.NewestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetNewest); err != nil {
return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err)
}

Expand All @@ -139,9 +150,8 @@ func getPartitionOffsets(client sarama.Client, topic string, partition int32, fl
return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) < oldest offset (%d)", partition, flags.Offset, offsets.OldestOffset)
} else if flags.Offset > offsets.NewestOffset {
return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) > newest offset (%d)", partition, flags.Offset, offsets.NewestOffset)
} else {
offsets.TargetOffset = flags.Offset
}
offsets.TargetOffset = flags.Offset
} else {
if flags.OldestOffset {
offsets.TargetOffset = offsets.OldestOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ResetConsumerGroupOffsetFlags struct {
Execute bool
OutputFormat string
allowedGroupState string
ToDatetime string
}

type ConsumerGroupOffsetOperation struct {
Expand Down
3 changes: 1 addition & 2 deletions internal/producer/AvroMessageSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func (serializer AvroMessageSerializer) CanSerialize(topic string) (bool, error)
return true, nil
} else if util.ContainsString(subjects, topic+"-value") {
return true, nil
} else {
return false, nil
}
return false, nil
}

func (serializer AvroMessageSerializer) Serialize(key, value []byte, flags Flags) (*sarama.ProducerMessage, error) {
Expand Down
3 changes: 1 addition & 2 deletions internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,8 @@ func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount in
} else if isTimestamp(line[1]) {
output.Warnf("assuming column 1 to be message timestamp. Column will be ignored")
return 0, 2, 3, nil
} else {
return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line)
}
return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line)
}

func isTimestamp(value string) bool {
Expand Down
9 changes: 6 additions & 3 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

var dateFormats = []string{
"2006-01-02T15:04:05.123Z",
"2006-01-02T15:04:05.123",
"2006-01-02T15:04:05.00000Z",
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05.000",
"2006-01-02T15:04:05Z",
"2006-01-02T15:04:05",
"2006-01-02T15:04",
Expand All @@ -21,8 +22,10 @@ func ParseTimestamp(timestamp string) (time.Time, error) {
return time.UnixMilli(timeMs), nil
}

loc, _ := time.LoadLocation("Local")

for _, format := range dateFormats {
if val, e := time.Parse(format, timestamp); e == nil {
if val, e := time.ParseInLocation(format, timestamp, loc); e == nil {
return val, nil
}
}
Expand Down

0 comments on commit 321a6eb

Please sign in to comment.