Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSONSerializer for records #22

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions opencdc/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package opencdc

import (
"bytes"
"context"
"encoding/base64"
"fmt"

opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1"
Expand Down Expand Up @@ -82,3 +84,28 @@ func (d RawData) Bytes() []byte {
func (d RawData) Clone() Data {
return RawData(bytes.Clone(d))
}

func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) {
if ctx != nil {
s := ctx.Value(jsonMarshalOptionsCtxKey{})
//nolint:forcetypeassert // We know the type of the value.
if s != nil && s.(*JSONMarshalOptions).RawDataAsString {
// We should serialize RawData as a string.
//nolint:wrapcheck // If we didn't implement MarshalJSON this would be done by the json package.
return json.Marshal(string(d))
}
}

// We could use json.Marshal([]byte(d)) here, but it would be 3 times slower,
// and since this is in the hot path, we need to optimize it.

if d == nil {
return []byte(`null`), nil
}
encodedLen := base64.StdEncoding.EncodedLen(len(d))
out := make([]byte, encodedLen+2)
out[0] = '"' // add leading quote
base64.StdEncoding.Encode(out[1:], d)
out[encodedLen+1] = '"' // add trailing quote
return out, nil
}
19 changes: 19 additions & 0 deletions opencdc/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,30 @@
package opencdc

import (
"context"
"fmt"

"github.com/goccy/go-json"
)

// JSONMarshalOptions can customize how a record is serialized to JSON. It can
// be attached to a context using WithJSONMarshalOptions and supplied to
// json.MarshalContext to customize the serialization behavior.
type JSONMarshalOptions struct {
// RawDataAsString is a flag that indicates if the RawData type should be
// serialized as a string. If set to false, RawData will be serialized as a
// base64 encoded string. If set to true, RawData will be serialized as a
// string without conversion.
RawDataAsString bool
}

type jsonMarshalOptionsCtxKey struct{}

// WithJSONMarshalOptions attaches JSONMarshalOptions to a context.
func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context {
return context.WithValue(ctx, jsonMarshalOptionsCtxKey{}, options)
}

func (r *Record) UnmarshalJSON(b []byte) error {
var raw struct {
Position Position `json:"position"`
Expand Down
5 changes: 4 additions & 1 deletion opencdc/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/matryer/is"
)

func TestRecord_UnmarshalJSON(t *testing.T) {
func TestRecord_JSON(t *testing.T) {
is := is.New(t)
have := Record{
Position: Position("standing"),
Expand All @@ -46,6 +46,7 @@ func TestRecord_UnmarshalJSON(t *testing.T) {
},
},
}
wantJSON := `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`
want := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Expand All @@ -71,6 +72,8 @@ func TestRecord_UnmarshalJSON(t *testing.T) {
b, err := json.Marshal(have)
is.NoErr(err)

is.Equal(cmp.Diff(string(b), wantJSON), "")

var got Record
err = json.Unmarshal(b, &got)
is.NoErr(err)
Expand Down
26 changes: 26 additions & 0 deletions opencdc/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,34 @@

package opencdc

import (
"context"
"fmt"

"github.com/goccy/go-json"
)

// RecordSerializer is a type that can serialize a record to bytes. It's used in
// destination connectors to change the output structure and format.
type RecordSerializer interface {
Serialize(Record) ([]byte, error)
}

// JSONSerializer is a RecordSerializer that serializes records to JSON using
// the configured options.
type JSONSerializer JSONMarshalOptions

func (s JSONSerializer) Serialize(r Record) ([]byte, error) {
ctx := WithJSONMarshalOptions(context.Background(), (*JSONMarshalOptions)(&s))
defer func() {
// Workaround because of https://github.com/goccy/go-json/issues/499.
// TODO: Remove this when the issue is fixed and store value in context
// instead of pointer.
s = JSONSerializer{}
}()
bytes, err := json.MarshalContext(ctx, r)
if err != nil {
return nil, fmt.Errorf("failed to serialize record to JSON: %w", err)
}
return bytes, nil
}
69 changes: 69 additions & 0 deletions opencdc/serializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opencdc

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
)

func TestJSONSerializer(t *testing.T) {
rec := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Metadata: Metadata{"foo": "bar"},
Key: RawData("padlock-key"),
Payload: Change{
Before: RawData("yellow"),
After: StructuredData{
"bool": true,

"int": 1,
"int32": int32(1),
"int64": int64(1),

"float32": float32(1.2),
"float64": 1.2,

"string": "orange",
},
},
}

testCases := []struct {
name string
serializer JSONSerializer
want string
}{{
name: "default",
serializer: JSONSerializer{},
want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`,
}, {
name: "raw data as string",
serializer: JSONSerializer{RawDataAsString: true},
want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"padlock-key","payload":{"before":"yellow","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
rec.SetSerializer(tc.serializer)
b := rec.Bytes()
is.Equal(cmp.Diff(string(b), tc.want), "")
})
}
}
Loading