diff --git a/CHANGELOG.md b/CHANGELOG.md index 1579b50..4076241 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- [#227](https://github.com/deviceinsight/kafkactl/issues/227) Incorrect handling of Base64-encoded values when producing from JSON ## 5.4.0 - 2024-11-28 ### Added diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index b19de1a..4e4b8e7 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -456,6 +456,30 @@ func TestProduceWithJSONFileIntegration(t *testing.T) { testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut()) } +func TestProduceWithJSONFileBase64ValuesIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-json-base64-values") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "internal", "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, + "--file", filepath.Join(dataFilePath, "msg-base64.json"), + "--value-encoding", "base64", + "--input-format", "json"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--value-encoding", "hex", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1#000000000001\n2#68656c6c6f\n3#6b61666b61", kafkaCtl.GetStdOut()) +} + func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) { testutil.StartIntegrationTest(t) diff --git a/internal/producer/MessageSerializer.go b/internal/producer/MessageSerializer.go index 51ff436..3051a92 100644 --- a/internal/producer/MessageSerializer.go +++ b/internal/producer/MessageSerializer.go @@ -63,27 +63,19 @@ func decodeBytes(data []byte, encoding string) ([]byte, error) { switch encoding { case HEX: out = make([]byte, hex.DecodedLen(len(data))) - if _, err := hex.Decode(out, data); err != nil { + bytelen, err := hex.Decode(out, data) + if err != nil { return nil, err } - return out, nil + return out[:bytelen], nil case BASE64: out = make([]byte, base64.StdEncoding.DecodedLen(len(data))) - if _, err := base64.StdEncoding.Decode(out, data); err != nil { + bytelen, err := base64.StdEncoding.Decode(out, data) + if err != nil { return nil, err } - return out[:clen(out)], nil + return out[:bytelen], nil default: return data, nil } } - -// https://stackoverflow.com/a/27834860/12143351 -func clen(n []byte) int { - for i := 0; i < len(n); i++ { - if n[i] == 0 { - return i - } - } - return len(n) -} diff --git a/internal/producer/MessageSerializer_test.go b/internal/producer/MessageSerializer_test.go new file mode 100644 index 0000000..a273be7 --- /dev/null +++ b/internal/producer/MessageSerializer_test.go @@ -0,0 +1,65 @@ +package producer + +import ( + "reflect" + "testing" +) + +func TestDecodeBytesBase64WhenInputIsNil(t *testing.T) { + got, err := decodeBytes(nil, "base64") + if got != nil { + t.Errorf("Expected nil, got %v", got) + } + if err != nil { + t.Errorf("Expected nil, got %v", err) + } +} + +func TestDecodeBytesBase64WhenInputIsInvalid(t *testing.T) { + got, err := decodeBytes([]byte("..this..is..not..base64.."), "base64") + if got != nil { + t.Errorf("Expected nil, got %v", got) + } + if err == nil { + t.Errorf("Expected error, got nil") + } +} + +func TestDecodeBytesBase64WithRegularInput(t *testing.T) { + // length 4 + got, err := decodeBytes([]byte("dGVzdA=="), "base64") + if string(got) != "test" { + t.Errorf("Expected hello, got %v", string(got)) + } + if err != nil { + t.Errorf("Expected nil, got %v", err) + } + // length 5 + got, err = decodeBytes([]byte("aGVsbG8="), "base64") + if string(got) != "hello" { + t.Errorf("Expected hello, got %v", string(got)) + } + if err != nil { + t.Errorf("Expected nil, got %v", err) + } + + // length 6 + got, err = decodeBytes([]byte("aGVsbG8h"), "base64") + if string(got) != "hello!" { + t.Errorf("Expected hello, got %v", string(got)) + } + if err != nil { + t.Errorf("Expected nil, got %v", err) + } +} + +func TestDecodeBytesBase64WithZeroPaddedInput(t *testing.T) { + got, err := decodeBytes([]byte("AAAAAAAD"), "base64") + expected := []byte{0, 0, 0, 0, 0, 3} + if !reflect.DeepEqual(got, expected) { + t.Errorf("Expected hello, got %v", got) + } + if err != nil { + t.Errorf("Expected nil, got %v", err) + } +} diff --git a/internal/testutil/testdata/msg-base64.json b/internal/testutil/testdata/msg-base64.json new file mode 100644 index 0000000..2535c96 --- /dev/null +++ b/internal/testutil/testdata/msg-base64.json @@ -0,0 +1,3 @@ +{"key": "1", "value": "AAAAAAAB"} +{"key": "2", "value": "aGVsbG8="} +{"key": "3", "value": "a2Fma2E="}