From 1efc5e510e58ee7bf4de7d9366c78ad907bfd6b9 Mon Sep 17 00:00:00 2001 From: nanic Date: Thu, 19 Oct 2023 10:56:43 -0400 Subject: [PATCH 1/3] Feature: Reset by time --- README.md | 4 ++ cmd/reset/reset-consumer-group-offset.go | 1 + cmd/reset/reset-consumer-group-offset_test.go | 43 +++++++++++++++++++ internal/consume/PartitionConsumer.go | 6 +-- .../OffsetResettingConsumer.go | 23 +++++++--- .../consumer-group-offset-operation.go | 1 + util/util.go | 9 ++-- 7 files changed, 75 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 38bf25b..9b0739b 100644 --- a/README.md +++ b/README.md @@ -720,6 +720,10 @@ kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100 kafkactl reset offset my-group --all-topics --newest # reset offset of for all partitions on multiple topics to oldest offset kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest +# reset offset to offset at a given timestamp(epoch)/datetime +kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z +# reset offset to offset at a given timestamp(epoch)/datetime +kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352 ``` ### Delete consumer group offsets diff --git a/cmd/reset/reset-consumer-group-offset.go b/cmd/reset/reset-consumer-group-offset.go index 953ba73..2781559 100644 --- a/cmd/reset/reset-consumer-group-offset.go +++ b/cmd/reset/reset-consumer-group-offset.go @@ -35,6 +35,7 @@ func newResetOffsetCmd() *cobra.Command { cmdResetOffset.Flags().StringArrayVarP(&offsetFlags.Topic, "topic", "t", offsetFlags.Topic, "one ore more topics to change offset for") cmdResetOffset.Flags().BoolVarP(&offsetFlags.Execute, "execute", "e", false, "execute the reset (as default only the results are displayed for validation)") cmdResetOffset.Flags().StringVarP(&offsetFlags.OutputFormat, "output", "o", offsetFlags.OutputFormat, "output format. One of: json|yaml") + cmdResetOffset.Flags().StringVarP(&offsetFlags.ToDatetime, "to-datetime", "", "", "set the offset to offset of given timestamp") return cmdResetOffset } diff --git a/cmd/reset/reset-consumer-group-offset_test.go b/cmd/reset/reset-consumer-group-offset_test.go index 7e78233..7cef0fc 100644 --- a/cmd/reset/reset-consumer-group-offset_test.go +++ b/cmd/reset/reset-consumer-group-offset_test.go @@ -3,6 +3,7 @@ package reset_test import ( "strings" "testing" + "time" "github.com/deviceinsight/kafkactl/testutil" ) @@ -130,6 +131,48 @@ func TestResetCGOForAllTopicsInTheGroupIntegration(t *testing.T) { testutil.VerifyTopicNotInConsumerGroup(t, group, topicOther) } +func TestResetCGOToDatetimeIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + + topicName := testutil.CreateTopic(t, "reset-cgo-datetime") + + group := testutil.CreateConsumerGroup(t, "reset-cgo-datetime", topicName) + + testutil.ProduceMessage(t, topicName, "test-key", "a", 0, 0) + testutil.ProduceMessage(t, topicName, "test-key", "b", 0, 1) + + time.Sleep(1 * time.Millisecond) // need to have messaged produced at different milliseconds to have reproducible test + + t1 := time.Now() + t2 := t1.Format("2006-01-02T15:04:05.000Z") + + testutil.ProduceMessage(t, topicName, "test-key", "c", 0, 2) + testutil.ProduceMessage(t, topicName, "test-key", "d", 0, 3) + testutil.ProduceMessage(t, topicName, "test-key", "e", 0, 4) + testutil.ProduceMessage(t, topicName, "test-key", "f", 0, 5) + + testutil.VerifyConsumerGroupOffset(t, group, topicName, 0) + + //test with --to-datetime + kafkaCtl := testutil.CreateKafkaCtlCommand() + + if _, err := kafkaCtl.Execute("reset", "offset", group, "--topic", topicName, "--to-datetime", t2, "--execute"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.VerifyConsumerGroupOffset(t, group, topicName, 2) + + kafkaCtl = testutil.CreateKafkaCtlCommand() + + if _, err := kafkaCtl.Execute("consume", topicName, "--group", group, "--max-messages", "4"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + messages := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n") + testutil.AssertArraysEquals(t, []string{"c", "d", "e", "f"}, messages) +} + func TestResetCGOAutoCompletionIntegration(t *testing.T) { testutil.StartIntegrationTest(t) diff --git a/internal/consume/PartitionConsumer.go b/internal/consume/PartitionConsumer.go index 9cb3ac0..3904168 100644 --- a/internal/consume/PartitionConsumer.go +++ b/internal/consume/PartitionConsumer.go @@ -166,7 +166,7 @@ func getOffsetBounds(client *sarama.Client, topic string, flags Flags, currentPa // Converts string to epoch unix timestamp // The string might be null in that case, the flag is considered absent and the value -1 is returned -func convertToEpocUnixMillis(timestamp string) (int64, error) { +func ConvertToEpocUnixMillis(timestamp string) (int64, error) { if timestamp == "" { return -1, nil } @@ -178,7 +178,7 @@ func convertToEpocUnixMillis(timestamp string) (int64, error) { } func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) { - var fromUnixMillis, err = convertToEpocUnixMillis(flags.FromTimestamp) + var fromUnixMillis, err = ConvertToEpocUnixMillis(flags.FromTimestamp) if err != nil { return ErrOffset, err } @@ -197,7 +197,7 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar } func getEndOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) { - var toUnixMillis, err = convertToEpocUnixMillis(flags.ToTimestamp) + var toUnixMillis, err = ConvertToEpocUnixMillis(flags.ToTimestamp) if err != nil { return ErrOffset, err } diff --git a/internal/consumergroupoffsets/OffsetResettingConsumer.go b/internal/consumergroupoffsets/OffsetResettingConsumer.go index 6f0bac7..472d874 100644 --- a/internal/consumergroupoffsets/OffsetResettingConsumer.go +++ b/internal/consumergroupoffsets/OffsetResettingConsumer.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/IBM/sarama" + "github.com/deviceinsight/kafkactl/internal/consume" "github.com/deviceinsight/kafkactl/output" "github.com/pkg/errors" ) @@ -49,7 +50,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi offsets := make([]partitionOffsets, 0) if flags.Partition > -1 { - offset, err := resetOffset(consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session) + offset, err := resetOffset(&consumer.client, consumer.topicName, flags.Partition, flags, groupOffsets, session) if err != nil { return err } @@ -62,7 +63,7 @@ func (consumer *OffsetResettingConsumer) Setup(session sarama.ConsumerGroupSessi } for _, partition := range partitions { - offset, err := resetOffset(consumer.client, consumer.topicName, partition, flags, groupOffsets, session) + offset, err := resetOffset(&consumer.client, consumer.topicName, partition, flags, groupOffsets, session) if err != nil { return err } @@ -102,7 +103,7 @@ func (consumer *OffsetResettingConsumer) ConsumeClaim(sarama.ConsumerGroupSessio return nil } -func resetOffset(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) { +func resetOffset(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags, groupOffsets *sarama.OffsetFetchResponse, session sarama.ConsumerGroupSession) (partitionOffsets, error) { offset, err := getPartitionOffsets(client, topic, partition, flags) if err != nil { return offset, err @@ -121,16 +122,26 @@ func resetOffset(client sarama.Client, topic string, partition int32, flags Rese return offset, nil } -func getPartitionOffsets(client sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) { +func getPartitionOffsets(client *sarama.Client, topic string, partition int32, flags ResetConsumerGroupOffsetFlags) (partitionOffsets, error) { var err error offsets := partitionOffsets{Partition: partition} - if offsets.OldestOffset, err = client.GetOffset(topic, partition, sarama.OffsetOldest); err != nil { + if flags.ToDatetime != "" { + milliTime, err := consume.ConvertToEpocUnixMillis(flags.ToDatetime) + if err != nil { + return offsets, err + } + if offsets.TargetOffset, err = (*client).GetOffset(topic, partition, milliTime); err == nil { + return offsets, nil + } + } + + if offsets.OldestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetOldest); err != nil { return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err) } - if offsets.NewestOffset, err = client.GetOffset(topic, partition, sarama.OffsetNewest); err != nil { + if offsets.NewestOffset, err = (*client).GetOffset(topic, partition, sarama.OffsetNewest); err != nil { return offsets, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, partition, err) } diff --git a/internal/consumergroupoffsets/consumer-group-offset-operation.go b/internal/consumergroupoffsets/consumer-group-offset-operation.go index c78f84b..a9c2161 100644 --- a/internal/consumergroupoffsets/consumer-group-offset-operation.go +++ b/internal/consumergroupoffsets/consumer-group-offset-operation.go @@ -21,6 +21,7 @@ type ResetConsumerGroupOffsetFlags struct { Execute bool OutputFormat string allowedGroupState string + ToDatetime string } type ConsumerGroupOffsetOperation struct { diff --git a/util/util.go b/util/util.go index fea972d..6c35e3c 100644 --- a/util/util.go +++ b/util/util.go @@ -8,8 +8,9 @@ import ( ) var dateFormats = []string{ - "2006-01-02T15:04:05.123Z", - "2006-01-02T15:04:05.123", + "2006-01-02T15:04:05.00000Z", + "2006-01-02T15:04:05.000Z", + "2006-01-02T15:04:05.000", "2006-01-02T15:04:05Z", "2006-01-02T15:04:05", "2006-01-02T15:04", @@ -21,8 +22,10 @@ func ParseTimestamp(timestamp string) (time.Time, error) { return time.UnixMilli(timeMs), nil } + loc, _ := time.LoadLocation("Local") + for _, format := range dateFormats { - if val, e := time.Parse(format, timestamp); e == nil { + if val, e := time.ParseInLocation(format, timestamp, loc); e == nil { return val, nil } } From 4ad9f416d4279e440d1bd9e248d8ffb17cb55268 Mon Sep 17 00:00:00 2001 From: nanic Date: Mon, 23 Oct 2023 16:36:45 -0400 Subject: [PATCH 2/3] lint fixes --- cmd/alter/alter-partition.go | 3 +- internal/broker/broker-operation.go | 48 +++++++++---------- internal/common-operation.go | 3 +- internal/consume/AvroMessageDeserializer.go | 3 +- internal/consume/PartitionConsumer.go | 6 +-- .../OffsetResettingConsumer.go | 3 +- internal/producer/AvroMessageSerializer.go | 3 +- internal/producer/producer-operation.go | 3 +- 8 files changed, 32 insertions(+), 40 deletions(-) diff --git a/cmd/alter/alter-partition.go b/cmd/alter/alter-partition.go index 95a9f4c..9fe60fb 100644 --- a/cmd/alter/alter-partition.go +++ b/cmd/alter/alter-partition.go @@ -44,9 +44,8 @@ func newAlterPartitionCmd() *cobra.Command { return topic.CompleteTopicNames(cmd, args, toComplete) } else if len(args) == 1 { return partition.CompletePartitionIds(cmd, args, toComplete) - } else { - return nil, cobra.ShellCompDirectiveNoFileComp } + return nil, cobra.ShellCompDirectiveNoFileComp }, } diff --git a/internal/broker/broker-operation.go b/internal/broker/broker-operation.go index 2109f04..3444696 100644 --- a/internal/broker/broker-operation.go +++ b/internal/broker/broker-operation.go @@ -163,40 +163,40 @@ func (operation *Operation) DescribeBroker(id int32, flags DescribeBrokerFlags) return output.PrintObject(brokerInfo, flags.OutputFormat) } else if flags.OutputFormat != "" && flags.OutputFormat != "wide" { return errors.Errorf("unknown outputFormat: %s", flags.OutputFormat) - } else { - - tableWriter := output.CreateTableWriter() + } - // write broker info table - if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil { - return err - } + tableWriter := output.CreateTableWriter() - if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil { - return err - } + // write broker info table + if err := tableWriter.WriteHeader("ID", "ADDRESS"); err != nil { + return err + } - if err := tableWriter.Flush(); err != nil { - return err - } + if err := tableWriter.Write(fmt.Sprint(brokerInfo.ID), brokerInfo.Address); err != nil { + return err + } - output.PrintStrings("") + if err := tableWriter.Flush(); err != nil { + return err + } - // first write config table - if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil { - return err - } + output.PrintStrings("") - for _, c := range brokerInfo.Configs { - if err := tableWriter.Write(c.Name, c.Value); err != nil { - return err - } - } + // first write config table + if err := tableWriter.WriteHeader("CONFIG", "VALUE"); err != nil { + return err + } - if err := tableWriter.Flush(); err != nil { + for _, c := range brokerInfo.Configs { + if err := tableWriter.Write(c.Name, c.Value); err != nil { return err } } + + if err := tableWriter.Flush(); err != nil { + return err + } + return nil } diff --git a/internal/common-operation.go b/internal/common-operation.go index 6531f09..53044ec 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -213,9 +213,8 @@ func GetClientID(context *ClientContext, defaultPrefix string) string { } else if usr, err = user.Current(); err != nil { output.Warnf("Failed to read current user: %v", err) return strings.TrimSuffix(defaultPrefix, "-") - } else { - return defaultPrefix + sanitizeUsername(usr.Username) } + return defaultPrefix + sanitizeUsername(usr.Username) } func sanitizeUsername(u string) string { diff --git a/internal/consume/AvroMessageDeserializer.go b/internal/consume/AvroMessageDeserializer.go index 079784d..33e6b27 100644 --- a/internal/consume/AvroMessageDeserializer.go +++ b/internal/consume/AvroMessageDeserializer.go @@ -171,9 +171,8 @@ func (deserializer AvroMessageDeserializer) CanDeserialize(topic string) (bool, return true, nil } else if util.ContainsString(subjects, topic+"-value") { return true, nil - } else { - return false, nil } + return false, nil } func (deserializer AvroMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error { diff --git a/internal/consume/PartitionConsumer.go b/internal/consume/PartitionConsumer.go index 3904168..ae17ae0 100644 --- a/internal/consume/PartitionConsumer.go +++ b/internal/consume/PartitionConsumer.go @@ -191,9 +191,8 @@ func getStartOffset(client *sarama.Client, topic string, flags Flags, currentPar return (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest) } else if len(flags.Offsets) > 0 { return extractOffsetForPartition(flags, currentPartition) - } else { - return sarama.OffsetNewest, nil } + return sarama.OffsetNewest, nil } func getEndOffset(client *sarama.Client, topic string, flags Flags, currentPartition int32) (int64, error) { @@ -210,9 +209,8 @@ func getEndOffset(client *sarama.Client, topic string, flags Flags, currentParti return ErrOffset, err } return newestOffset, nil - } else { - return sarama.OffsetNewest, nil } + return sarama.OffsetNewest, nil } func extractOffsetForPartition(flags Flags, currentPartition int32) (int64, error) { diff --git a/internal/consumergroupoffsets/OffsetResettingConsumer.go b/internal/consumergroupoffsets/OffsetResettingConsumer.go index 472d874..325428b 100644 --- a/internal/consumergroupoffsets/OffsetResettingConsumer.go +++ b/internal/consumergroupoffsets/OffsetResettingConsumer.go @@ -150,9 +150,8 @@ func getPartitionOffsets(client *sarama.Client, topic string, partition int32, f return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) < oldest offset (%d)", partition, flags.Offset, offsets.OldestOffset) } else if flags.Offset > offsets.NewestOffset { return offsets, errors.Errorf("cannot set offset for Partition %d: offset (%d) > newest offset (%d)", partition, flags.Offset, offsets.NewestOffset) - } else { - offsets.TargetOffset = flags.Offset } + offsets.TargetOffset = flags.Offset } else { if flags.OldestOffset { offsets.TargetOffset = offsets.OldestOffset diff --git a/internal/producer/AvroMessageSerializer.go b/internal/producer/AvroMessageSerializer.go index 4abda6a..d2af98f 100644 --- a/internal/producer/AvroMessageSerializer.go +++ b/internal/producer/AvroMessageSerializer.go @@ -106,9 +106,8 @@ func (serializer AvroMessageSerializer) CanSerialize(topic string) (bool, error) return true, nil } else if util.ContainsString(subjects, topic+"-value") { return true, nil - } else { - return false, nil } + return false, nil } func (serializer AvroMessageSerializer) Serialize(key, value []byte, flags Flags) (*sarama.ProducerMessage, error) { diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index c2a57a7..350ac21 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -339,9 +339,8 @@ func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount in } else if isTimestamp(line[1]) { output.Warnf("assuming column 1 to be message timestamp. Column will be ignored") return 0, 2, 3, nil - } else { - return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line) } + return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line) } func isTimestamp(value string) bool { From 6685de374a7aac8a44b5b0a40571535dd431de1e Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Fri, 10 Nov 2023 08:33:25 +0100 Subject: [PATCH 3/3] rename var --- cmd/reset/reset-consumer-group-offset_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/reset/reset-consumer-group-offset_test.go b/cmd/reset/reset-consumer-group-offset_test.go index 7cef0fc..6be7214 100644 --- a/cmd/reset/reset-consumer-group-offset_test.go +++ b/cmd/reset/reset-consumer-group-offset_test.go @@ -145,7 +145,7 @@ func TestResetCGOToDatetimeIntegration(t *testing.T) { time.Sleep(1 * time.Millisecond) // need to have messaged produced at different milliseconds to have reproducible test t1 := time.Now() - t2 := t1.Format("2006-01-02T15:04:05.000Z") + t1Formatted := t1.Format("2006-01-02T15:04:05.000Z") testutil.ProduceMessage(t, topicName, "test-key", "c", 0, 2) testutil.ProduceMessage(t, topicName, "test-key", "d", 0, 3) @@ -157,7 +157,7 @@ func TestResetCGOToDatetimeIntegration(t *testing.T) { //test with --to-datetime kafkaCtl := testutil.CreateKafkaCtlCommand() - if _, err := kafkaCtl.Execute("reset", "offset", group, "--topic", topicName, "--to-datetime", t2, "--execute"); err != nil { + if _, err := kafkaCtl.Execute("reset", "offset", group, "--topic", topicName, "--to-datetime", t1Formatted, "--execute"); err != nil { t.Fatalf("failed to execute command: %v", err) }