Skip to content

Commit

Permalink
Add JSONSerializer for records (#22)
Browse files Browse the repository at this point in the history
* add string serializer for raw data

* document code

* expose json marshal options context func

* fix linter errors
  • Loading branch information
lovromazgon authored Feb 29, 2024
1 parent 58737e4 commit 1221866
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 1 deletion.
27 changes: 27 additions & 0 deletions opencdc/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package opencdc

import (
"bytes"
"context"
"encoding/base64"
"fmt"

opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1"
Expand Down Expand Up @@ -82,3 +84,28 @@ func (d RawData) Bytes() []byte {
func (d RawData) Clone() Data {
return RawData(bytes.Clone(d))
}

func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) {
if ctx != nil {
s := ctx.Value(jsonMarshalOptionsCtxKey{})
//nolint:forcetypeassert // We know the type of the value.
if s != nil && s.(*JSONMarshalOptions).RawDataAsString {
// We should serialize RawData as a string.
//nolint:wrapcheck // If we didn't implement MarshalJSON this would be done by the json package.
return json.Marshal(string(d))
}
}

// We could use json.Marshal([]byte(d)) here, but it would be 3 times slower,
// and since this is in the hot path, we need to optimize it.

if d == nil {
return []byte(`null`), nil
}
encodedLen := base64.StdEncoding.EncodedLen(len(d))
out := make([]byte, encodedLen+2)
out[0] = '"' // add leading quote
base64.StdEncoding.Encode(out[1:], d)
out[encodedLen+1] = '"' // add trailing quote
return out, nil
}
19 changes: 19 additions & 0 deletions opencdc/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,30 @@
package opencdc

import (
"context"
"fmt"

"github.com/goccy/go-json"
)

// JSONMarshalOptions can customize how a record is serialized to JSON. It can
// be attached to a context using WithJSONMarshalOptions and supplied to
// json.MarshalContext to customize the serialization behavior.
type JSONMarshalOptions struct {
// RawDataAsString is a flag that indicates if the RawData type should be
// serialized as a string. If set to false, RawData will be serialized as a
// base64 encoded string. If set to true, RawData will be serialized as a
// string without conversion.
RawDataAsString bool
}

type jsonMarshalOptionsCtxKey struct{}

// WithJSONMarshalOptions attaches JSONMarshalOptions to a context.
func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context {
return context.WithValue(ctx, jsonMarshalOptionsCtxKey{}, options)
}

func (r *Record) UnmarshalJSON(b []byte) error {
var raw struct {
Position Position `json:"position"`
Expand Down
5 changes: 4 additions & 1 deletion opencdc/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/matryer/is"
)

func TestRecord_UnmarshalJSON(t *testing.T) {
func TestRecord_JSON(t *testing.T) {
is := is.New(t)
have := Record{
Position: Position("standing"),
Expand All @@ -46,6 +46,7 @@ func TestRecord_UnmarshalJSON(t *testing.T) {
},
},
}
wantJSON := `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`
want := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Expand All @@ -71,6 +72,8 @@ func TestRecord_UnmarshalJSON(t *testing.T) {
b, err := json.Marshal(have)
is.NoErr(err)

is.Equal(cmp.Diff(string(b), wantJSON), "")

var got Record
err = json.Unmarshal(b, &got)
is.NoErr(err)
Expand Down
26 changes: 26 additions & 0 deletions opencdc/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,34 @@

package opencdc

import (
"context"
"fmt"

"github.com/goccy/go-json"
)

// RecordSerializer is a type that can serialize a record to bytes. It's used in
// destination connectors to change the output structure and format.
type RecordSerializer interface {
Serialize(Record) ([]byte, error)
}

// JSONSerializer is a RecordSerializer that serializes records to JSON using
// the configured options.
type JSONSerializer JSONMarshalOptions

func (s JSONSerializer) Serialize(r Record) ([]byte, error) {
ctx := WithJSONMarshalOptions(context.Background(), (*JSONMarshalOptions)(&s))
defer func() {
// Workaround because of https://github.com/goccy/go-json/issues/499.
// TODO: Remove this when the issue is fixed and store value in context
// instead of pointer.
s = JSONSerializer{}
}()
bytes, err := json.MarshalContext(ctx, r)
if err != nil {
return nil, fmt.Errorf("failed to serialize record to JSON: %w", err)
}
return bytes, nil
}
69 changes: 69 additions & 0 deletions opencdc/serializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opencdc

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
)

func TestJSONSerializer(t *testing.T) {
rec := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Metadata: Metadata{"foo": "bar"},
Key: RawData("padlock-key"),
Payload: Change{
Before: RawData("yellow"),
After: StructuredData{
"bool": true,

"int": 1,
"int32": int32(1),
"int64": int64(1),

"float32": float32(1.2),
"float64": 1.2,

"string": "orange",
},
},
}

testCases := []struct {
name string
serializer JSONSerializer
want string
}{{
name: "default",
serializer: JSONSerializer{},
want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`,
}, {
name: "raw data as string",
serializer: JSONSerializer{RawDataAsString: true},
want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"padlock-key","payload":{"before":"yellow","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
rec.SetSerializer(tc.serializer)
b := rec.Bytes()
is.Equal(cmp.Diff(string(b), tc.want), "")
})
}
}

0 comments on commit 1221866

Please sign in to comment.