From 58cabdb13ac670d327991d976209fe0e317f2cf3 Mon Sep 17 00:00:00 2001 From: nanic Date: Thu, 19 Oct 2023 14:31:21 -0400 Subject: [PATCH] Feature: Add JSON file produce support --- cmd/produce/produce.go | 1 + cmd/produce/produce_test.go | 47 +++++++++++++++++++++++++ internal/producer/producer-operation.go | 16 +++++++++ testutil/testdata/msg.csv | 3 ++ testutil/testdata/msg.txt | 3 ++ 5 files changed, 70 insertions(+) create mode 100644 testutil/testdata/msg.csv create mode 100644 testutil/testdata/msg.txt diff --git a/cmd/produce/produce.go b/cmd/produce/produce.go index b41cfe4..b1efada 100644 --- a/cmd/produce/produce.go +++ b/cmd/produce/produce.go @@ -36,6 +36,7 @@ func NewProduceCmd() *cobra.Command { cmdProduce.Flags().StringVarP(&flags.Value, "value", "v", "", "value to produce") cmdProduce.Flags().BoolVarP(&flags.NullValue, "null-value", "", false, "produce a null value (can be used instead of providing a value with --value)") cmdProduce.Flags().StringVarP(&flags.File, "file", "f", "", "file to read input from") + cmdProduce.Flags().StringVarP(&flags.FileType, "file-type", "", "", "file type to read input from") cmdProduce.Flags().StringArrayVarP(&flags.Headers, "header", "H", flags.Headers, "headers in format `key:value`") cmdProduce.Flags().StringVarP(&flags.Separator, "separator", "S", "", "separator to split key and value from stdin or file") cmdProduce.Flags().StringVarP(&flags.LineSeparator, "lineSeparator", "L", "\n", "separator to split multiple messages from stdin or file") diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index 50c7c6a..ee3c186 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -367,6 +367,53 @@ func TestProduceProtoFileIntegration(t *testing.T) { testutil.AssertEquals(t, value, string(actualValue)) } +func TestProduceWithCSVFileIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-csv") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, + "--file", filepath.Join(dataFilePath, "msg.csv")); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1,a\n2,b\n3,c", kafkaCtl.GetStdOut()) + +} + +func TestProduceWithJSONFileIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-json") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, + "--file", filepath.Join(dataFilePath, "msg.txt"), + "--file-type", "json"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "a\nb\nc", kafkaCtl.GetStdOut()) + +} + func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) { testutil.StartIntegrationTest(t) diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index c2a57a7..c6a7f6a 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -2,6 +2,7 @@ package producer import ( "bufio" + "encoding/json" "io" "os" "os/signal" @@ -24,6 +25,7 @@ type Flags struct { Separator string LineSeparator string File string + FileType string Key string Value string NullValue bool @@ -46,6 +48,11 @@ const DefaultMaxMessagesBytes = 1000000 type Operation struct { } +type KV struct { + Key string `json:"key"` + Value string `json:"value"` +} + func (operation *Operation) Produce(topic string, flags Flags) error { var ( @@ -113,6 +120,8 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } }() + var kv KV + var key string var value string @@ -221,6 +230,13 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } key = input[keyColumnIdx] value = input[valueColumnIdx] + } else if flags.FileType == "json" { + if err = json.Unmarshal([]byte(line), &kv); err != nil { + return errors.Errorf("Can't unmarshal line at %d", messageCount) + } + + key = kv.Key + value = kv.Value } else { value = line } diff --git a/testutil/testdata/msg.csv b/testutil/testdata/msg.csv new file mode 100644 index 0000000..864a928 --- /dev/null +++ b/testutil/testdata/msg.csv @@ -0,0 +1,3 @@ +1,a +2,b +3,c \ No newline at end of file diff --git a/testutil/testdata/msg.txt b/testutil/testdata/msg.txt new file mode 100644 index 0000000..3023ac7 --- /dev/null +++ b/testutil/testdata/msg.txt @@ -0,0 +1,3 @@ +{"key": "1", "value": "a"} +{"key": "2", "value": "b"} +{"key": "3", "value": "c"} \ No newline at end of file