diff --git a/kafka/encoder.go b/kafka/encoder.go index 420ac37..ed1cc49 100644 --- a/kafka/encoder.go +++ b/kafka/encoder.go @@ -6,12 +6,28 @@ type Decoder interface { Decode([]byte) (interface{}, error) } +// DecoderFunc is an adapter allowing to use a function as a decoder. +type DecoderFunc func(value []byte) (interface{}, error) + +// Decode transforms byte data to the desired type. +func (f DecoderFunc) Decode(value []byte) (interface{}, error) { + return f(value) +} + // Encoder represents a Kafka data encoder. type Encoder interface { // Encode transforms the typed data to bytes. Encode(interface{}) ([]byte, error) } +// EncoderFunc is an adapter allowing to use a function as an encoder. +type EncoderFunc func(interface{}) ([]byte, error) + +// Encode transforms the typed data to bytes. +func (f EncoderFunc) Encode(value interface{}) ([]byte, error) { + return f(value) +} + // ByteDecoder represents a byte decoder. type ByteDecoder struct{} diff --git a/kafka/encoder_test.go b/kafka/encoder_test.go index 5b3ff28..c93f467 100644 --- a/kafka/encoder_test.go +++ b/kafka/encoder_test.go @@ -1,6 +1,7 @@ package kafka_test import ( + "errors" "testing" "github.com/msales/streams/v2/kafka" @@ -76,3 +77,39 @@ func TestStringEncoder_Encode(t *testing.T) { assert.Equal(t, tt.want, got) } } + +func TestDecoderFunc_Decode(t *testing.T) { + b := []byte("payload") + e := errors.New("test") + i := interface{}("entity") + + f := func(value []byte) (interface{}, error) { + assert.Equal(t, b, value) + + return i, e + } + + decoder := kafka.DecoderFunc(f) + result, err := decoder.Decode(b) + + assert.True(t, i == result, "Received object is not exactly the same object that was returned by the function.") + assert.True(t, e == err, "Received error is not exactly the same object that was returned by the function.") +} + +func TestEncoderFunc_Encode(t *testing.T) { + b := []byte("payload") + e := errors.New("test") + i := interface{}("entity") + + f := func(object interface{}) ([]byte, error) { + assert.True(t, i == object, "Received object is not exactly the same as one that was that passed to the encoder.") + + return b, e + } + + encoder := kafka.EncoderFunc(f) + result, err := encoder.Encode(i) + + assert.Equal(t, b, result) + assert.True(t, e == err, "Received error is not exactly the same object that was returned by the function.") +}