Skip to content

Commit

Permalink
Avro schema fixes (#84)
Browse files Browse the repository at this point in the history
* add failing test for int

* use Avro long for ints

* add support for durations

* update hamba/avro

* remove commented out code

---------

Co-authored-by: Raúl Barroso <[email protected]>
  • Loading branch information
lovromazgon and raulb authored Aug 1, 2024
1 parent 5b6da75 commit 731b460
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 66 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golangci/golangci-lint v1.59.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.23.0
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693
github.com/jackc/pgx/v5 v5.6.0
github.com/matryer/is v1.4.1
github.com/mitchellh/mapstructure v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoIS
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hamba/avro/v2 v2.23.0 h1:DYWz6UqNCi21JflaZlcwNfW+rK+D/CwnrWWJtfmO4vw=
github.com/hamba/avro/v2 v2.23.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693 h1:ECZbIygcX0RoDjemCoJ+h6FfHcbTNDIQZPQ7LDhTbao=
github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
Expand Down
11 changes: 9 additions & 2 deletions schema/avro/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
structuredDataType = reflect.TypeFor[opencdc.StructuredData]()
byteType = reflect.TypeFor[byte]()
timeType = reflect.TypeFor[time.Time]()
durationType = reflect.TypeFor[time.Duration]()
)

// extractor exposes a way to extract an Avro schema from a Go value.
Expand Down Expand Up @@ -66,9 +67,15 @@ func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro
switch t.Kind() { //nolint:exhaustive // some types are not supported
case reflect.Bool:
return avro.NewPrimitiveSchema(avro.Boolean, nil), nil
case reflect.Int64, reflect.Uint32:
case reflect.Int, reflect.Int64, reflect.Uint32:
if t == durationType {
return avro.NewPrimitiveSchema(
avro.Long,
avro.NewPrimitiveLogicalSchema(avro.TimeMicros),
), nil
}
return avro.NewPrimitiveSchema(avro.Long, nil), nil
case reflect.Int, reflect.Int32, reflect.Int16, reflect.Uint16, reflect.Int8, reflect.Uint8:
case reflect.Int32, reflect.Int16, reflect.Uint16, reflect.Int8, reflect.Uint8:
return avro.NewPrimitiveSchema(avro.Int, nil), nil
case reflect.Float32:
return avro.NewPrimitiveSchema(avro.Float, nil), nil
Expand Down
128 changes: 77 additions & 51 deletions schema/avro/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/google/go-cmp/cmp"
"github.com/hamba/avro/v2"
"github.com/matryer/is"
)
Expand All @@ -32,7 +33,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
name string
// haveValue is the value we use to extract the schema and which gets marshaled
haveValue any
// wantValue is the expected value we get when haveValue gets marshaled and unmarshaled
// wantValue is the expected value we get when haveValue gets marshaled and unmarshalled
wantValue any
// wantSchema is the schema expected to be extracted from haveValue
wantSchema avro.Schema
Expand All @@ -54,7 +55,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "boolean ptr (nil)",
haveValue: (*bool)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Boolean, nil),
Expand All @@ -63,26 +64,26 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
)),
}, {
name: "int",
haveValue: int(1),
wantValue: int(1),
wantSchema: avro.NewPrimitiveSchema(avro.Int, nil),
haveValue: int(4525347614434344400),
wantValue: int64(4525347614434344400),
wantSchema: avro.NewPrimitiveSchema(avro.Long, nil),
}, {
name: "int ptr (0)",
haveValue: func() *int { var v int; return &v }(),
wantValue: 0, // ptr is unmarshalled into value
wantValue: int64(0), // ptr is unmarshalled into value
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.Null, nil),
},
)),
}, {
name: "int ptr (nil)",
haveValue: (*int)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.Null, nil),
},
)),
Expand All @@ -104,7 +105,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "int64 ptr (nil)",
haveValue: (*int64)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Long, nil),
Expand All @@ -129,7 +130,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "int32 ptr (nil)",
haveValue: (*int32)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
Expand All @@ -154,7 +155,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "int16 ptr (nil)",
haveValue: (*int16)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
Expand All @@ -179,7 +180,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "int8 ptr (nil)",
haveValue: (*int8)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
Expand All @@ -204,7 +205,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "uint32 ptr (nil)",
haveValue: (*uint32)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Long, nil),
Expand All @@ -229,7 +230,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "uint16 ptr (nil)",
haveValue: (*uint16)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
Expand All @@ -254,7 +255,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "uint8 ptr (nil)",
haveValue: (*uint8)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Int, nil),
Expand All @@ -279,7 +280,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "float64 ptr (nil)",
haveValue: (*float64)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Double, nil),
Expand All @@ -304,7 +305,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "float32 ptr (nil)",
haveValue: (*float32)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Float, nil),
Expand All @@ -329,7 +330,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "string ptr (nil)",
haveValue: (*string)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantValue: nil, // when unmarshalling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.String, nil),
Expand All @@ -356,19 +357,44 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
avro.NewPrimitiveSchema(avro.Null, nil),
},
)),
}, {
name: "duration",
haveValue: time.Duration(12345678999),
wantValue: time.Duration(12345678000), // duration is truncated to milliseconds
wantSchema: avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)),
}, {
name: "duration ptr (0)",
haveValue: func() *time.Duration { var v time.Duration; return &v }(),
wantValue: time.Duration(0), // ptr is unmarshalled into value
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)),
avro.NewPrimitiveSchema(avro.Null, nil),
},
)),
}, {
name: "duration ptr (nil)",
haveValue: (*time.Duration)(nil),
wantValue: nil, // when unmarshaling we get an untyped nil
wantSchema: must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)),
avro.NewPrimitiveSchema(avro.Null, nil),
},
)),
}, {
name: "[]int",
haveValue: []int{1, 2, 3},
wantValue: []any{1, 2, 3},
wantSchema: avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)),
wantValue: []any{int64(1), int64(2), int64(3)},
wantSchema: avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)),
}, {
name: "[]any (with data)",
haveValue: []any{1, "foo"},
wantValue: []any{1, "foo"},
wantValue: []any{int64(1), "foo"},
wantSchema: avro.NewArraySchema(must(avro.NewUnionSchema(
[]avro.Schema{
avro.NewPrimitiveSchema(avro.String, nil),
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.Null, nil),
},
))),
Expand All @@ -385,19 +411,19 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
}, {
name: "[][]int",
haveValue: [][]int{{1}, {2, 3}},
wantValue: []any{[]any{1}, []any{2, 3}},
wantSchema: avro.NewArraySchema(avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil))),
wantValue: []any{[]any{int64(1)}, []any{int64(2), int64(3)}},
wantSchema: avro.NewArraySchema(avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil))),
}, {
name: "map[string]int",
haveValue: map[string]int{
"foo": 1,
"bar": 2,
},
wantValue: map[string]any{ // all maps are unmarshaled into map[string]any
"foo": 1,
"bar": 2,
wantValue: map[string]any{ // all maps are unmarshalled into map[string]any
"foo": int64(1),
"bar": int64(2),
},
wantSchema: avro.NewMapSchema(avro.NewPrimitiveSchema(avro.Int, nil)),
wantSchema: avro.NewMapSchema(avro.NewPrimitiveSchema(avro.Long, nil)),
}, {
name: "map[string]any (with primitive data)",
haveValue: map[string]any{
Expand All @@ -409,12 +435,12 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
wantValue: map[string]any{
"foo": "bar",
"foo2": "bar2",
"bar": 1,
"bar": int64(1),
"baz": true,
},
wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.String, nil),
avro.NewPrimitiveSchema(avro.Boolean, nil),
}))),
Expand All @@ -429,14 +455,14 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
wantValue: map[string]any{
"foo": "bar",
"foo2": "bar2",
"bar": 1,
"baz": []any{1, 2, 3},
"bar": int64(1),
"baz": []any{int64(1), int64(2), int64(3)},
},
wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.String, nil),
avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)),
avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)),
}))),
}, {
name: "map[string]any (with union array)",
Expand All @@ -450,17 +476,17 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
wantValue: map[string]any{
"foo": "bar",
"foo2": "bar2",
"bar": 1,
"baz": []any{1, 2, 3},
"bar": int64(1),
"baz": []any{int64(1), int64(2), int64(3)},
"baz2": []any{"foo", true},
},
wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.String, nil),
avro.NewArraySchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.String, nil),
avro.NewPrimitiveSchema(avro.Boolean, nil),
}))),
Expand All @@ -484,14 +510,14 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
wantValue: map[string]any{
"foo": map[string]any{
"bar": "baz",
"baz": 1,
"baz": int64(1),
},
},
wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{
&avro.NullSchema{},
avro.NewPrimitiveSchema(avro.Int, nil),
avro.NewPrimitiveSchema(avro.Long, nil),
avro.NewPrimitiveSchema(avro.String, nil),
}))),
}))),
Expand All @@ -503,19 +529,19 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
"baz": []int{1, 2, 3},
"tz": now,
},
wantValue: map[string]any{ // structured data is unmarshaled into a map
wantValue: map[string]any{ // structured data is unmarshalled into a map
"foo": "bar",
"bar": 1,
"baz": []any{1, 2, 3},
"bar": int64(1),
"baz": []any{int64(1), int64(2), int64(3)},
"tz": now.Truncate(time.Microsecond), // Avro cannot does not support nanoseconds
},
wantSchema: must(avro.NewRecordSchema(
"record.foo",
"",
[]*avro.Field{
must(avro.NewField("foo", avro.NewPrimitiveSchema(avro.String, nil))),
must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Int, nil))),
must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)))),
must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Long, nil))),
must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)))),
must(avro.NewField("tz", avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros)))),
},
)),
Expand All @@ -542,7 +568,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
)),
}
wantSerde.sort()
is.Equal(wantSerde.String(), gotSerde.String())
is.Equal("", cmp.Diff(wantSerde.String(), gotSerde.String()))

// now try to marshal the value with the schema
bytes, err := gotSerde.Marshal(haveValue)
Expand All @@ -554,7 +580,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) {
is.NoErr(err)

wantValue := newRecord(tc.wantValue)
is.Equal(wantValue, gotValue)
is.Equal("", cmp.Diff(wantValue, gotValue))
})
}
}
Expand Down Expand Up @@ -618,9 +644,9 @@ func TestSerdeForType_NestedStructuredData(t *testing.T) {
bytes, err := got.Marshal(have)
is.NoErr(err)
// only try to unmarshal to ensure there's no error, other tests assert that
// umarshaled data matches the expectations
var unmarshaled opencdc.StructuredData
err = got.Unmarshal(bytes, &unmarshaled)
// unmarshalled data matches the expectations
var unmarshalled opencdc.StructuredData
err = got.Unmarshal(bytes, &unmarshalled)
is.NoErr(err)
}

Expand Down
Loading

0 comments on commit 731b460

Please sign in to comment.