From 731b460a2c5855f75584cff835d5f713ebe04eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 1 Aug 2024 13:32:02 +0200 Subject: [PATCH] Avro schema fixes (#84) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add failing test for int * use Avro long for ints * add support for durations * update hamba/avro * remove commented out code --------- Co-authored-by: Raúl Barroso --- go.mod | 2 +- go.sum | 4 +- schema/avro/extractor.go | 11 +++- schema/avro/serde_test.go | 128 +++++++++++++++++++++++--------------- schema/avro/union_test.go | 25 +++++--- 5 files changed, 104 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 7a83ca8..dc865f3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/golangci/golangci-lint v1.59.1 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/hamba/avro/v2 v2.23.0 + github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693 github.com/jackc/pgx/v5 v5.6.0 github.com/matryer/is v1.4.1 github.com/mitchellh/mapstructure v1.5.0 diff --git a/go.sum b/go.sum index 9606094..12f4d4f 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,8 @@ github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoIS github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= -github.com/hamba/avro/v2 v2.23.0 h1:DYWz6UqNCi21JflaZlcwNfW+rK+D/CwnrWWJtfmO4vw= -github.com/hamba/avro/v2 v2.23.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= +github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693 h1:ECZbIygcX0RoDjemCoJ+h6FfHcbTNDIQZPQ7LDhTbao= +github.com/hamba/avro/v2 v2.23.1-0.20240731181311-3fc81b66c693/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= diff --git a/schema/avro/extractor.go b/schema/avro/extractor.go index 3d5e2fc..3b9c9d5 100644 --- a/schema/avro/extractor.go +++ b/schema/avro/extractor.go @@ -28,6 +28,7 @@ var ( structuredDataType = reflect.TypeFor[opencdc.StructuredData]() byteType = reflect.TypeFor[byte]() timeType = reflect.TypeFor[time.Time]() + durationType = reflect.TypeFor[time.Duration]() ) // extractor exposes a way to extract an Avro schema from a Go value. @@ -66,9 +67,15 @@ func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro switch t.Kind() { //nolint:exhaustive // some types are not supported case reflect.Bool: return avro.NewPrimitiveSchema(avro.Boolean, nil), nil - case reflect.Int64, reflect.Uint32: + case reflect.Int, reflect.Int64, reflect.Uint32: + if t == durationType { + return avro.NewPrimitiveSchema( + avro.Long, + avro.NewPrimitiveLogicalSchema(avro.TimeMicros), + ), nil + } return avro.NewPrimitiveSchema(avro.Long, nil), nil - case reflect.Int, reflect.Int32, reflect.Int16, reflect.Uint16, reflect.Int8, reflect.Uint8: + case reflect.Int32, reflect.Int16, reflect.Uint16, reflect.Int8, reflect.Uint8: return avro.NewPrimitiveSchema(avro.Int, nil), nil case reflect.Float32: return avro.NewPrimitiveSchema(avro.Float, nil), nil diff --git a/schema/avro/serde_test.go b/schema/avro/serde_test.go index 74b120f..f05ac53 100644 --- a/schema/avro/serde_test.go +++ b/schema/avro/serde_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/conduitio/conduit-commons/opencdc" + "github.com/google/go-cmp/cmp" "github.com/hamba/avro/v2" "github.com/matryer/is" ) @@ -32,7 +33,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { name string // haveValue is the value we use to extract the schema and which gets marshaled haveValue any - // wantValue is the expected value we get when haveValue gets marshaled and unmarshaled + // wantValue is the expected value we get when haveValue gets marshaled and unmarshalled wantValue any // wantSchema is the schema expected to be extracted from haveValue wantSchema avro.Schema @@ -54,7 +55,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "boolean ptr (nil)", haveValue: (*bool)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Boolean, nil), @@ -63,26 +64,26 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { )), }, { name: "int", - haveValue: int(1), - wantValue: int(1), - wantSchema: avro.NewPrimitiveSchema(avro.Int, nil), + haveValue: int(4525347614434344400), + wantValue: int64(4525347614434344400), + wantSchema: avro.NewPrimitiveSchema(avro.Long, nil), }, { name: "int ptr (0)", haveValue: func() *int { var v int; return &v }(), - wantValue: 0, // ptr is unmarshalled into value + wantValue: int64(0), // ptr is unmarshalled into value wantSchema: must(avro.NewUnionSchema( []avro.Schema{ - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.Null, nil), }, )), }, { name: "int ptr (nil)", haveValue: (*int)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.Null, nil), }, )), @@ -104,7 +105,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "int64 ptr (nil)", haveValue: (*int64)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Long, nil), @@ -129,7 +130,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "int32 ptr (nil)", haveValue: (*int32)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Int, nil), @@ -154,7 +155,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "int16 ptr (nil)", haveValue: (*int16)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Int, nil), @@ -179,7 +180,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "int8 ptr (nil)", haveValue: (*int8)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Int, nil), @@ -204,7 +205,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "uint32 ptr (nil)", haveValue: (*uint32)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Long, nil), @@ -229,7 +230,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "uint16 ptr (nil)", haveValue: (*uint16)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Int, nil), @@ -254,7 +255,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "uint8 ptr (nil)", haveValue: (*uint8)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Int, nil), @@ -279,7 +280,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "float64 ptr (nil)", haveValue: (*float64)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Double, nil), @@ -304,7 +305,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "float32 ptr (nil)", haveValue: (*float32)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.Float, nil), @@ -329,7 +330,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "string ptr (nil)", haveValue: (*string)(nil), - wantValue: nil, // when unmarshaling we get an untyped nil + wantValue: nil, // when unmarshalling we get an untyped nil wantSchema: must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.String, nil), @@ -356,19 +357,44 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { avro.NewPrimitiveSchema(avro.Null, nil), }, )), + }, { + name: "duration", + haveValue: time.Duration(12345678999), + wantValue: time.Duration(12345678000), // duration is truncated to milliseconds + wantSchema: avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)), + }, { + name: "duration ptr (0)", + haveValue: func() *time.Duration { var v time.Duration; return &v }(), + wantValue: time.Duration(0), // ptr is unmarshalled into value + wantSchema: must(avro.NewUnionSchema( + []avro.Schema{ + avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)), + avro.NewPrimitiveSchema(avro.Null, nil), + }, + )), + }, { + name: "duration ptr (nil)", + haveValue: (*time.Duration)(nil), + wantValue: nil, // when unmarshaling we get an untyped nil + wantSchema: must(avro.NewUnionSchema( + []avro.Schema{ + avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)), + avro.NewPrimitiveSchema(avro.Null, nil), + }, + )), }, { name: "[]int", haveValue: []int{1, 2, 3}, - wantValue: []any{1, 2, 3}, - wantSchema: avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)), + wantValue: []any{int64(1), int64(2), int64(3)}, + wantSchema: avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)), }, { name: "[]any (with data)", haveValue: []any{1, "foo"}, - wantValue: []any{1, "foo"}, + wantValue: []any{int64(1), "foo"}, wantSchema: avro.NewArraySchema(must(avro.NewUnionSchema( []avro.Schema{ avro.NewPrimitiveSchema(avro.String, nil), - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.Null, nil), }, ))), @@ -385,19 +411,19 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { }, { name: "[][]int", haveValue: [][]int{{1}, {2, 3}}, - wantValue: []any{[]any{1}, []any{2, 3}}, - wantSchema: avro.NewArraySchema(avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil))), + wantValue: []any{[]any{int64(1)}, []any{int64(2), int64(3)}}, + wantSchema: avro.NewArraySchema(avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil))), }, { name: "map[string]int", haveValue: map[string]int{ "foo": 1, "bar": 2, }, - wantValue: map[string]any{ // all maps are unmarshaled into map[string]any - "foo": 1, - "bar": 2, + wantValue: map[string]any{ // all maps are unmarshalled into map[string]any + "foo": int64(1), + "bar": int64(2), }, - wantSchema: avro.NewMapSchema(avro.NewPrimitiveSchema(avro.Int, nil)), + wantSchema: avro.NewMapSchema(avro.NewPrimitiveSchema(avro.Long, nil)), }, { name: "map[string]any (with primitive data)", haveValue: map[string]any{ @@ -409,12 +435,12 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { wantValue: map[string]any{ "foo": "bar", "foo2": "bar2", - "bar": 1, + "bar": int64(1), "baz": true, }, wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.String, nil), avro.NewPrimitiveSchema(avro.Boolean, nil), }))), @@ -429,14 +455,14 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { wantValue: map[string]any{ "foo": "bar", "foo2": "bar2", - "bar": 1, - "baz": []any{1, 2, 3}, + "bar": int64(1), + "baz": []any{int64(1), int64(2), int64(3)}, }, wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.String, nil), - avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)), + avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)), }))), }, { name: "map[string]any (with union array)", @@ -450,17 +476,17 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { wantValue: map[string]any{ "foo": "bar", "foo2": "bar2", - "bar": 1, - "baz": []any{1, 2, 3}, + "bar": int64(1), + "baz": []any{int64(1), int64(2), int64(3)}, "baz2": []any{"foo", true}, }, wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.String, nil), avro.NewArraySchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.String, nil), avro.NewPrimitiveSchema(avro.Boolean, nil), }))), @@ -484,14 +510,14 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { wantValue: map[string]any{ "foo": map[string]any{ "bar": "baz", - "baz": 1, + "baz": int64(1), }, }, wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ &avro.NullSchema{}, - avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.Long, nil), avro.NewPrimitiveSchema(avro.String, nil), }))), }))), @@ -503,10 +529,10 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { "baz": []int{1, 2, 3}, "tz": now, }, - wantValue: map[string]any{ // structured data is unmarshaled into a map + wantValue: map[string]any{ // structured data is unmarshalled into a map "foo": "bar", - "bar": 1, - "baz": []any{1, 2, 3}, + "bar": int64(1), + "baz": []any{int64(1), int64(2), int64(3)}, "tz": now.Truncate(time.Microsecond), // Avro cannot does not support nanoseconds }, wantSchema: must(avro.NewRecordSchema( @@ -514,8 +540,8 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { "", []*avro.Field{ must(avro.NewField("foo", avro.NewPrimitiveSchema(avro.String, nil))), - must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Int, nil))), - must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)))), + must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Long, nil))), + must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Long, nil)))), must(avro.NewField("tz", avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros)))), }, )), @@ -542,7 +568,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { )), } wantSerde.sort() - is.Equal(wantSerde.String(), gotSerde.String()) + is.Equal("", cmp.Diff(wantSerde.String(), gotSerde.String())) // now try to marshal the value with the schema bytes, err := gotSerde.Marshal(haveValue) @@ -554,7 +580,7 @@ func TestSerde_MarshalUnmarshal(t *testing.T) { is.NoErr(err) wantValue := newRecord(tc.wantValue) - is.Equal(wantValue, gotValue) + is.Equal("", cmp.Diff(wantValue, gotValue)) }) } } @@ -618,9 +644,9 @@ func TestSerdeForType_NestedStructuredData(t *testing.T) { bytes, err := got.Marshal(have) is.NoErr(err) // only try to unmarshal to ensure there's no error, other tests assert that - // umarshaled data matches the expectations - var unmarshaled opencdc.StructuredData - err = got.Unmarshal(bytes, &unmarshaled) + // unmarshalled data matches the expectations + var unmarshalled opencdc.StructuredData + err = got.Unmarshal(bytes, &unmarshalled) is.NoErr(err) } diff --git a/schema/avro/union_test.go b/schema/avro/union_test.go index 50c9aa0..71e41ed 100644 --- a/schema/avro/union_test.go +++ b/schema/avro/union_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/conduitio/conduit-commons/opencdc" + "github.com/google/go-cmp/cmp" "github.com/matryer/is" ) @@ -36,25 +37,29 @@ func TestUnionResolver(t *testing.T) { }, { name: "int", have: 123, - want: map[string]any{"int": 123}, + want: map[string]any{"long": 123}, }, { - name: "boolean", + name: "int32", + have: int32(123), + want: map[string]any{"int": int32(123)}, + }, { + name: "bool", have: true, want: map[string]any{"boolean": true}, }, { - name: "double", + name: "float64", have: 1.23, want: map[string]any{"double": 1.23}, }, { - name: "float", + name: "float32", have: float32(1.23), want: map[string]any{"float": float32(1.23)}, }, { - name: "long", + name: "int64", have: int64(321), want: map[string]any{"long": int64(321)}, }, { - name: "bytes", + name: "[]byte", have: []byte{1, 2, 3, 4}, want: map[string]any{"bytes": []byte{1, 2, 3, 4}}, }, { @@ -62,11 +67,11 @@ func TestUnionResolver(t *testing.T) { have: nil, want: nil, }, { - name: "int array", + name: "[]int", have: []int{1, 2, 3, 4}, want: map[string]any{"array": []int{1, 2, 3, 4}}, }, { - name: "nil bool array", + name: "nil []bool", have: []bool(nil), want: map[string]any{"array": []bool(nil)}, }} @@ -144,12 +149,12 @@ func TestUnionResolver(t *testing.T) { // before marshal we should change the nested map err = mur.BeforeMarshal(have) is.NoErr(err) - is.Equal(want, have) + is.Equal("", cmp.Diff(want, have)) // after unmarshal we should have the same record as at the start err = mur.AfterUnmarshal(have) is.NoErr(err) - is.Equal(newRecord(), have) + is.Equal("", cmp.Diff(newRecord(), have)) }) } }