diff --git a/opencdc/data.go b/opencdc/data.go index 3b7c8b3..f8b787f 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -16,6 +16,8 @@ package opencdc import ( "bytes" + "context" + "encoding/base64" "fmt" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" @@ -82,3 +84,28 @@ func (d RawData) Bytes() []byte { func (d RawData) Clone() Data { return RawData(bytes.Clone(d)) } + +func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) { + if ctx != nil { + s := ctx.Value(jsonMarshalOptionsCtxKey{}) + //nolint:forcetypeassert // We know the type of the value. + if s != nil && s.(*JSONMarshalOptions).RawDataAsString { + // We should serialize RawData as a string. + //nolint:wrapcheck // If we didn't implement MarshalJSON this would be done by the json package. + return json.Marshal(string(d)) + } + } + + // We could use json.Marshal([]byte(d)) here, but it would be 3 times slower, + // and since this is in the hot path, we need to optimize it. + + if d == nil { + return []byte(`null`), nil + } + encodedLen := base64.StdEncoding.EncodedLen(len(d)) + out := make([]byte, encodedLen+2) + out[0] = '"' // add leading quote + base64.StdEncoding.Encode(out[1:], d) + out[encodedLen+1] = '"' // add trailing quote + return out, nil +} diff --git a/opencdc/json.go b/opencdc/json.go index 2935fe6..fd785f0 100644 --- a/opencdc/json.go +++ b/opencdc/json.go @@ -15,11 +15,30 @@ package opencdc import ( + "context" "fmt" "github.com/goccy/go-json" ) +// JSONMarshalOptions can customize how a record is serialized to JSON. It can +// be attached to a context using WithJSONMarshalOptions and supplied to +// json.MarshalContext to customize the serialization behavior. +type JSONMarshalOptions struct { + // RawDataAsString is a flag that indicates if the RawData type should be + // serialized as a string. If set to false, RawData will be serialized as a + // base64 encoded string. If set to true, RawData will be serialized as a + // string without conversion. + RawDataAsString bool +} + +type jsonMarshalOptionsCtxKey struct{} + +// WithJSONMarshalOptions attaches JSONMarshalOptions to a context. +func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context { + return context.WithValue(ctx, jsonMarshalOptionsCtxKey{}, options) +} + func (r *Record) UnmarshalJSON(b []byte) error { var raw struct { Position Position `json:"position"` diff --git a/opencdc/json_test.go b/opencdc/json_test.go index fdbaa93..ae6dd66 100644 --- a/opencdc/json_test.go +++ b/opencdc/json_test.go @@ -23,7 +23,7 @@ import ( "github.com/matryer/is" ) -func TestRecord_UnmarshalJSON(t *testing.T) { +func TestRecord_JSON(t *testing.T) { is := is.New(t) have := Record{ Position: Position("standing"), @@ -46,6 +46,7 @@ func TestRecord_UnmarshalJSON(t *testing.T) { }, }, } + wantJSON := `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}` want := Record{ Position: Position("standing"), Operation: OperationUpdate, @@ -71,6 +72,8 @@ func TestRecord_UnmarshalJSON(t *testing.T) { b, err := json.Marshal(have) is.NoErr(err) + is.Equal(cmp.Diff(string(b), wantJSON), "") + var got Record err = json.Unmarshal(b, &got) is.NoErr(err) diff --git a/opencdc/serializer.go b/opencdc/serializer.go index cc9e592..c75d64b 100644 --- a/opencdc/serializer.go +++ b/opencdc/serializer.go @@ -14,8 +14,34 @@ package opencdc +import ( + "context" + "fmt" + + "github.com/goccy/go-json" +) + // RecordSerializer is a type that can serialize a record to bytes. It's used in // destination connectors to change the output structure and format. type RecordSerializer interface { Serialize(Record) ([]byte, error) } + +// JSONSerializer is a RecordSerializer that serializes records to JSON using +// the configured options. +type JSONSerializer JSONMarshalOptions + +func (s JSONSerializer) Serialize(r Record) ([]byte, error) { + ctx := WithJSONMarshalOptions(context.Background(), (*JSONMarshalOptions)(&s)) + defer func() { + // Workaround because of https://github.com/goccy/go-json/issues/499. + // TODO: Remove this when the issue is fixed and store value in context + // instead of pointer. + s = JSONSerializer{} + }() + bytes, err := json.MarshalContext(ctx, r) + if err != nil { + return nil, fmt.Errorf("failed to serialize record to JSON: %w", err) + } + return bytes, nil +} diff --git a/opencdc/serializer_test.go b/opencdc/serializer_test.go new file mode 100644 index 0000000..f1c6e58 --- /dev/null +++ b/opencdc/serializer_test.go @@ -0,0 +1,69 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/matryer/is" +) + +func TestJSONSerializer(t *testing.T) { + rec := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + + testCases := []struct { + name string + serializer JSONSerializer + want string + }{{ + name: "default", + serializer: JSONSerializer{}, + want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`, + }, { + name: "raw data as string", + serializer: JSONSerializer{RawDataAsString: true}, + want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"padlock-key","payload":{"before":"yellow","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + rec.SetSerializer(tc.serializer) + b := rec.Bytes() + is.Equal(cmp.Diff(string(b), tc.want), "") + }) + } +}