Skip to content

Commit

Permalink
refactor input parsing for produce command
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Nov 10, 2023
1 parent baf603c commit c37c44c
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 76 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- [#171](https://github.com/deviceinsight/kafkactl/issues/171) Support for reset command to reset offset to offset from provided datetime
- [#172](https://github.com/deviceinsight/kafkactl/issues/172) Support for json format in produce command

## 3.4.0 - 2023-09-25

### Fixed
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile
**NOTE:** if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce
command is able to detect the message timestamp in the input and will ignore it.

It is also possible to produce messages in json format:

```bash
# each line in myfile.json is expected to contain a json object with fields key, value
kafkactl produce my-topic --file=myfile.json --input-format=json
cat myfile.json | kafkactl produce my-topic --input-format=json
```

the number of messages produced per second can be controlled with the `--rate` parameter:

```bash
Expand Down
2 changes: 1 addition & 1 deletion cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewProduceCmd() *cobra.Command {
cmdProduce.Flags().StringVarP(&flags.Value, "value", "v", "", "value to produce")
cmdProduce.Flags().BoolVarP(&flags.NullValue, "null-value", "", false, "produce a null value (can be used instead of providing a value with --value)")
cmdProduce.Flags().StringVarP(&flags.File, "file", "f", "", "file to read input from")
cmdProduce.Flags().StringVarP(&flags.FileType, "file-type", "", "", "file type to read input from")
cmdProduce.Flags().StringVarP(&flags.InputFormat, "input-format", "", "", "input format. One of: csv,json (default is csv)")
cmdProduce.Flags().StringArrayVarP(&flags.Headers, "header", "H", flags.Headers, "headers in format `key:value`")
cmdProduce.Flags().StringVarP(&flags.Separator, "separator", "S", "", "separator to split key and value from stdin or file")
cmdProduce.Flags().StringVarP(&flags.LineSeparator, "lineSeparator", "L", "\n", "separator to split multiple messages from stdin or file")
Expand Down
58 changes: 50 additions & 8 deletions cmd/produce/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,19 +375,62 @@ func TestProduceWithCSVFileIntegration(t *testing.T) {

dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic,
if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",",
"--file", filepath.Join(dataFilePath, "msg.csv")); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil {
if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut())
}

func TestProduceWithCSVFileWithTimestampsFirstColumnIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
topic := testutil.CreateTopic(t, "produce-topic-csv")
kafkaCtl := testutil.CreateKafkaCtlCommand()

dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",",
"--file", filepath.Join(dataFilePath, "msg-ts1.csv")); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1,a\n2,b\n3,c", kafkaCtl.GetStdOut())
testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut())
}

func TestProduceWithCSVFileWithTimestampsSecondColumnIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
topic := testutil.CreateTopic(t, "produce-topic-csv")
kafkaCtl := testutil.CreateKafkaCtlCommand()

dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",",
"--file", filepath.Join(dataFilePath, "msg-ts2.csv")); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut())
}

func TestProduceWithJSONFileIntegration(t *testing.T) {
Expand All @@ -399,19 +442,18 @@ func TestProduceWithJSONFileIntegration(t *testing.T) {
dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic,
"--file", filepath.Join(dataFilePath, "msg.txt"),
"--file-type", "json"); err != nil {
"--file", filepath.Join(dataFilePath, "msg.json"),
"--input-format", "json"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil {
if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "a\nb\nc", kafkaCtl.GetStdOut())

testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut())
}

func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) {
Expand Down
72 changes: 72 additions & 0 deletions internal/producer/input/csv_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package input

import (
"fmt"
"strings"
"time"

"github.com/deviceinsight/kafkactl/output"
"github.com/deviceinsight/kafkactl/util"
"github.com/pkg/errors"
)

const defaultKeyColumnIdx = 0
const defaultValueColumnIdx = 1
const defaultColumnCount = 2

type csvParser struct {
key string
separator string
keyColumnIdx int
valueColumnIdx int
columnCount int
}

func NewCsvParser(key string, separator string) Parser {
return &csvParser{
key: key,
separator: separator,
keyColumnIdx: defaultKeyColumnIdx,
valueColumnIdx: defaultValueColumnIdx,
columnCount: defaultColumnCount,
}
}

func (p *csvParser) ParseLine(line string) (Message, error) {

if p.separator == "" {
return Message{Key: p.key, Value: line}, nil
}

input := strings.Split(line, util.ConvertControlChars(p.separator))
if len(input) < 2 {
return Message{}, fmt.Errorf("the provided input does not contain the separator %s", p.separator)
} else if len(input) == 3 && p.columnCount == defaultColumnCount {
// lazy resolving of column indices
var err error
p.keyColumnIdx, p.valueColumnIdx, p.columnCount, err = resolveColumns(input)
if err != nil {
return Message{}, err
}
} else if len(input) != p.columnCount {
return Message{}, fmt.Errorf("line contains unexpected amount of separators:\n%s", line)
}

return Message{input[p.keyColumnIdx], input[p.valueColumnIdx]}, nil
}

func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount int, err error) {
if isTimestamp(line[0]) {
output.Warnf("assuming column 0 to be message timestamp. Column will be ignored")
return 1, 2, 3, nil
} else if isTimestamp(line[1]) {
output.Warnf("assuming column 1 to be message timestamp. Column will be ignored")
return 0, 2, 3, nil
}
return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line)
}

func isTimestamp(value string) bool {
_, e := time.Parse(time.RFC3339, value)
return e == nil
}
24 changes: 24 additions & 0 deletions internal/producer/input/json_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package input

import (
"encoding/json"
"fmt"
)

type jsonParser struct {
}

func NewJSONParser() Parser {
return &jsonParser{}
}

func (p *jsonParser) ParseLine(line string) (Message, error) {

var message Message

if err := json.Unmarshal([]byte(line), &message); err != nil {
return message, fmt.Errorf("can't unmarshal line: %w", err)
}

return message, nil
}
10 changes: 10 additions & 0 deletions internal/producer/input/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package input

type Message struct {
Key string `json:"key"`
Value string `json:"value"`
}

type Parser interface {
ParseLine(line string) (Message, error)
}
Loading

0 comments on commit c37c44c

Please sign in to comment.