Skip to content

Commit

Permalink
Feature: Add JSON file produce support
Browse files Browse the repository at this point in the history
  • Loading branch information
nanic committed Oct 19, 2023
1 parent 1efc5e5 commit 58cabdb
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewProduceCmd() *cobra.Command {
cmdProduce.Flags().StringVarP(&flags.Value, "value", "v", "", "value to produce")
cmdProduce.Flags().BoolVarP(&flags.NullValue, "null-value", "", false, "produce a null value (can be used instead of providing a value with --value)")
cmdProduce.Flags().StringVarP(&flags.File, "file", "f", "", "file to read input from")
cmdProduce.Flags().StringVarP(&flags.FileType, "file-type", "", "", "file type to read input from")
cmdProduce.Flags().StringArrayVarP(&flags.Headers, "header", "H", flags.Headers, "headers in format `key:value`")
cmdProduce.Flags().StringVarP(&flags.Separator, "separator", "S", "", "separator to split key and value from stdin or file")
cmdProduce.Flags().StringVarP(&flags.LineSeparator, "lineSeparator", "L", "\n", "separator to split multiple messages from stdin or file")
Expand Down
47 changes: 47 additions & 0 deletions cmd/produce/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,53 @@ func TestProduceProtoFileIntegration(t *testing.T) {
testutil.AssertEquals(t, value, string(actualValue))
}

func TestProduceWithCSVFileIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
topic := testutil.CreateTopic(t, "produce-topic-csv")
kafkaCtl := testutil.CreateKafkaCtlCommand()

dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic,
"--file", filepath.Join(dataFilePath, "msg.csv")); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1,a\n2,b\n3,c", kafkaCtl.GetStdOut())

}

func TestProduceWithJSONFileIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
topic := testutil.CreateTopic(t, "produce-topic-json")
kafkaCtl := testutil.CreateKafkaCtlCommand()

dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic,
"--file", filepath.Join(dataFilePath, "msg.txt"),
"--file-type", "json"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "a\nb\nc", kafkaCtl.GetStdOut())

}

func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

Expand Down
16 changes: 16 additions & 0 deletions internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package producer

import (
"bufio"
"encoding/json"
"io"
"os"
"os/signal"
Expand All @@ -24,6 +25,7 @@ type Flags struct {
Separator string
LineSeparator string
File string
FileType string
Key string
Value string
NullValue bool
Expand All @@ -46,6 +48,11 @@ const DefaultMaxMessagesBytes = 1000000
type Operation struct {
}

type KV struct {
Key string `json:"key"`
Value string `json:"value"`
}

func (operation *Operation) Produce(topic string, flags Flags) error {

var (
Expand Down Expand Up @@ -113,6 +120,8 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
}
}()

var kv KV

var key string
var value string

Expand Down Expand Up @@ -221,6 +230,13 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
}
key = input[keyColumnIdx]
value = input[valueColumnIdx]
} else if flags.FileType == "json" {
if err = json.Unmarshal([]byte(line), &kv); err != nil {
return errors.Errorf("Can't unmarshal line at %d", messageCount)
}

key = kv.Key
value = kv.Value
} else {
value = line
}
Expand Down
3 changes: 3 additions & 0 deletions testutil/testdata/msg.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,a
2,b
3,c
3 changes: 3 additions & 0 deletions testutil/testdata/msg.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"key": "1", "value": "a"}
{"key": "2", "value": "b"}
{"key": "3", "value": "c"}

0 comments on commit 58cabdb

Please sign in to comment.