Skip to content

Commit

Permalink
Merge pull request #44 from heetch/sixstone-001-fix-empty-msg-produced
Browse files Browse the repository at this point in the history
Make null value to fail to produce
  • Loading branch information
sixstone-qq authored Apr 6, 2022
2 parents e24b378 + d5c8047 commit e29b183
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
16 changes: 10 additions & 6 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,17 @@ func (cmd *produceCmd) makeSaramaMessage(msg producerMessage) (*sarama.ProducerM
}
sm.Key = sarama.ByteEncoder(key)
}
if msg.Value != nil {
value, err := cmd.decodeValue(msg.Value)
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
sm.Value = sarama.ByteEncoder(value)

if msg.Value == nil {
return nil, fmt.Errorf(`empty "value" JSON key to produce: %+v`, msg)
}

value, err := cmd.decodeValue(msg.Value)
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
sm.Value = sarama.ByteEncoder(value)

if msg.Timestamp != nil {
sm.Timestamp = *msg.Timestamp
}
Expand Down
14 changes: 14 additions & 0 deletions produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
qt "github.com/frankban/quicktest"
Expand Down Expand Up @@ -128,6 +129,19 @@ func TestMakeSaramaMessage(t *testing.T) {
c.Assert(err, qt.IsNil)
c.Assert(gotValue, qt.ContentEquals, expectedBlob)
})

c.Run("Null value should fail", func(c *qt.C) {
cmd := new(produceCmd)
cmd.decodeKey, err = cmd.decoderForType("key", "string")
c.Assert(err, qt.IsNil)
pn := int32(1)
for _, pmsg := range []producerMessage{{}, {Key: json.RawMessage(`"k"`)}, {Partition: &pn}, {Timestamp: &time.Time{}}} {
msg, err := cmd.makeSaramaMessage(pmsg)
c.Assert(err, qt.IsNotNil)
c.Assert(err, qt.ErrorMatches, `empty "value" JSON key to produce:.*`)
c.Assert(msg, qt.IsNil)
}
})
}

func TestDeserializeLines(t *testing.T) {
Expand Down

0 comments on commit e29b183

Please sign in to comment.