structs outside of this package can't implement this interface. Bytes() []byte Clone() Data + ToProto(*opencdcv1.Data) error } type Change struct { diff --git a/opencdc/errors.go b/opencdc/errors.go index 201bff7..4b47984 100644 --- a/opencdc/errors.go +++ b/opencdc/errors.go @@ -25,4 +25,8 @@ var ( // ErrUnknownOperation is returned when trying to parse an Operation string // and encountering an unknown operation. ErrUnknownOperation = errors.New("unknown operation") + + // ErrInvalidProtoDataType is returned when trying to convert a proto data + // type to raw or structured data. + ErrInvalidProtoDataType = errors.New("invalid proto data type") ) diff --git a/opencdc/json.go b/opencdc/json.go new file mode 100644 index 0000000..2935fe6 --- /dev/null +++ b/opencdc/json.go @@ -0,0 +1,82 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "fmt" + + "github.com/goccy/go-json" +) + +func (r *Record) UnmarshalJSON(b []byte) error { + var raw struct { + Position Position `json:"position"` + Operation Operation `json:"operation"` + Metadata Metadata `json:"metadata"` + Payload struct { + Before json.RawMessage `json:"before"` + After json.RawMessage `json:"after"` + } `json:"payload"` + Key json.RawMessage `json:"key"` + } + + err := json.Unmarshal(b, &raw) + if err != nil { + return err //nolint:wrapcheck // no additional context to add + } + + key, err := dataUnmarshalJSON(raw.Key) + if err != nil { + return err + } + + payloadBefore, err := dataUnmarshalJSON(raw.Payload.Before) + if err != nil { + return err + } + + payloadAfter, err := dataUnmarshalJSON(raw.Payload.After) + if err != nil { + return err + } + + r.Position = raw.Position + r.Operation = raw.Operation + r.Metadata = raw.Metadata + r.Key = key + r.Payload = Change{ + Before: payloadBefore, + After: payloadAfter, + } + + return nil +} + +func dataUnmarshalJSON(b []byte) (Data, error) { + if b[0] == '"' { + var data RawData + err := json.Unmarshal(b, &data) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal raw data: %w", err) + } + return data, nil + } + var data StructuredData + err := json.Unmarshal(b, &data) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal structured data: %w", err) + } + return data, nil +} diff --git a/opencdc/json_test.go b/opencdc/json_test.go new file mode 100644 index 0000000..fdbaa93 --- /dev/null +++ b/opencdc/json_test.go @@ -0,0 +1,143 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "testing" + + "github.com/goccy/go-json" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/matryer/is" +) + +func TestRecord_UnmarshalJSON(t *testing.T) { + is := is.New(t) + have := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + want := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1.0, + "int32": 1.0, + "int64": 1.0, + + "float32": 1.2, + "float64": 1.2, + + "string": "orange", + }, + }, + } + + b, err := json.Marshal(have) + is.NoErr(err) + + var got Record + err = json.Unmarshal(b, &got) + is.NoErr(err) + + diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(Record{})) + is.Equal(diff, "") +} + +func BenchmarkRecord_MarshalJSON(b *testing.B) { + r := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + + b.ResetTimer() + var bytes []byte + for i := 0; i < b.N; i++ { + bytes, _ = json.Marshal(r) + } + _ = bytes +} + +func BenchmarkRecord_UnmarshalJSON(b *testing.B) { + r := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + bytes, _ := json.Marshal(r) + + b.ResetTimer() + var got Record + for i := 0; i < b.N; i++ { + _ = json.Unmarshal(bytes, &got) + } + _ = got +} diff --git a/opencdc/metadata_test.go b/opencdc/metadata_test.go new file mode 100644 index 0000000..4ba4893 --- /dev/null +++ b/opencdc/metadata_test.go @@ -0,0 +1,38 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "testing" + + opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/runtime/protoimpl" +) + +func TestMetadataConstants(t *testing.T) { + wantMapping := map[string]*protoimpl.ExtensionInfo{ + OpenCDCVersion: opencdcv1.E_OpencdcVersion, + MetadataOpenCDCVersion: opencdcv1.E_MetadataVersion, + MetadataCreatedAt: opencdcv1.E_MetadataCreatedAt, + MetadataReadAt: opencdcv1.E_MetadataReadAt, + } + for goConstant, extensionInfo := range wantMapping { + protoConstant := proto.GetExtension(extensionInfo.TypeDescriptor().ParentFile().Options(), extensionInfo) + if goConstant != protoConstant { + t.Fatalf("go constant %q doesn't match proto constant %q", goConstant, protoConstant) + } + } +} diff --git a/opencdc/proto.go b/opencdc/proto.go new file mode 100644 index 0000000..cc0d174 --- /dev/null +++ b/opencdc/proto.go @@ -0,0 +1,192 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "fmt" + + opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" + "google.golang.org/protobuf/types/known/structpb" +) + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + var cTypes [1]struct{} + _ = cTypes[int(OperationCreate)-int(opencdcv1.Operation_OPERATION_CREATE)] + _ = cTypes[int(OperationUpdate)-int(opencdcv1.Operation_OPERATION_UPDATE)] + _ = cTypes[int(OperationDelete)-int(opencdcv1.Operation_OPERATION_DELETE)] + _ = cTypes[int(OperationSnapshot)-int(opencdcv1.Operation_OPERATION_SNAPSHOT)] +} + +// -- From Proto To OpenCDC ---------------------------------------------------- + +// FromProto takes data from the supplied proto object and populates the +// receiver. If the proto object is nil, the receiver is set to its zero value. +// If the function returns an error, the receiver could be partially populated. +func (r *Record) FromProto(proto *opencdcv1.Record) error { + if proto == nil { + *r = Record{} + return nil + } + + var err error + r.Key, err = dataFromProto(proto.Key) + if err != nil { + return fmt.Errorf("error converting key: %w", err) + } + + if proto.Payload != nil { + err := r.Payload.FromProto(proto.Payload) + if err != nil { + return fmt.Errorf("error converting payload: %w", err) + } + } else { + r.Payload = Change{} + } + + r.Position = proto.Position + r.Metadata = proto.Metadata + r.Operation = Operation(proto.Operation) + return nil +} + +// FromProto takes data from the supplied proto object and populates the +// receiver. If the proto object is nil, the receiver is set to its zero value. +// If the function returns an error, the receiver could be partially populated. +func (c *Change) FromProto(proto *opencdcv1.Change) error { + if proto == nil { + *c = Change{} + return nil + } + + var err error + c.Before, err = dataFromProto(proto.Before) + if err != nil { + return fmt.Errorf("error converting before: %w", err) + } + + c.After, err = dataFromProto(proto.After) + if err != nil { + return fmt.Errorf("error converting after: %w", err) + } + + return nil +} + +func dataFromProto(proto *opencdcv1.Data) (Data, error) { + if proto == nil { + return nil, nil //nolint:nilnil // This is the expected behavior. + } + + switch v := proto.Data.(type) { + case *opencdcv1.Data_RawData: + return RawData(v.RawData), nil + case *opencdcv1.Data_StructuredData: + return StructuredData(v.StructuredData.AsMap()), nil + case nil: + return nil, nil //nolint:nilnil // This is the expected behavior. + default: + return nil, ErrInvalidProtoDataType + } +} + +// -- From OpenCDC To Proto ---------------------------------------------------- + +// ToProto takes data from the receiver and populates the supplied proto object. +// If the function returns an error, the proto object could be partially +// populated. +func (r Record) ToProto(proto *opencdcv1.Record) error { + if r.Key != nil { + if proto.Key == nil { + proto.Key = &opencdcv1.Data{} + } + err := r.Key.ToProto(proto.Key) + if err != nil { + return fmt.Errorf("error converting key: %w", err) + } + } else { + proto.Key = nil + } + + if proto.Payload == nil { + proto.Payload = &opencdcv1.Change{} + } + err := r.Payload.ToProto(proto.Payload) + if err != nil { + return fmt.Errorf("error converting payload: %w", err) + } + + proto.Position = r.Position + proto.Metadata = r.Metadata + proto.Operation = opencdcv1.Operation(r.Operation) + return nil +} + +// ToProto takes data from the receiver and populates the supplied proto object. +// If the function returns an error, the proto object could be partially +// populated. +func (c Change) ToProto(proto *opencdcv1.Change) error { + if c.Before != nil { + if proto.Before == nil { + proto.Before = &opencdcv1.Data{} + } + err := c.Before.ToProto(proto.Before) + if err != nil { + return fmt.Errorf("error converting before: %w", err) + } + } else { + proto.Before = nil + } + + if c.After != nil { + if proto.After == nil { + proto.After = &opencdcv1.Data{} + } + err := c.After.ToProto(proto.After) + if err != nil { + return fmt.Errorf("error converting after: %w", err) + } + } else { + proto.After = nil + } + + return nil +} + +// ToProto takes data from the receiver and populates the supplied proto object. +func (d RawData) ToProto(proto *opencdcv1.Data) error { + protoRawData, ok := proto.Data.(*opencdcv1.Data_RawData) + if !ok { + protoRawData = &opencdcv1.Data_RawData{} + proto.Data = protoRawData + } + protoRawData.RawData = d + return nil +} + +// ToProto takes data from the receiver and populates the supplied proto object. +func (d StructuredData) ToProto(proto *opencdcv1.Data) error { + protoStructuredData, ok := proto.Data.(*opencdcv1.Data_StructuredData) + if !ok { + protoStructuredData = &opencdcv1.Data_StructuredData{} + proto.Data = protoStructuredData + } + data, err := structpb.NewStruct(d) + if err != nil { + return fmt.Errorf("could not convert structured data to proto: %w", err) + } + protoStructuredData.StructuredData = data + return nil +} diff --git a/opencdc/proto_test.go b/opencdc/proto_test.go new file mode 100644 index 0000000..67c20fa --- /dev/null +++ b/opencdc/proto_test.go @@ -0,0 +1,232 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "fmt" + "testing" + + opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" + "github.com/matryer/is" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestRecord_FromProto(t *testing.T) { + is := is.New(t) + + r1 := &opencdcv1.Record{ + Position: []byte("standing"), + Operation: opencdcv1.Operation_OPERATION_UPDATE, + Metadata: map[string]string{"foo": "bar"}, + Key: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: []byte("padlock-key")}}, + Payload: &opencdcv1.Change{ + Before: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: []byte("yellow")}}, + After: &opencdcv1.Data{ + Data: &opencdcv1.Data_StructuredData{StructuredData: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "bool": {Kind: &structpb.Value_BoolValue{BoolValue: true}}, + "int": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "int32": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "int64": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "float32": {Kind: &structpb.Value_NumberValue{NumberValue: 1.2}}, + "float64": {Kind: &structpb.Value_NumberValue{NumberValue: 1.2}}, + "string": {Kind: &structpb.Value_StringValue{StringValue: "orange"}}, + "nested": {Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "bool": {Kind: &structpb.Value_BoolValue{BoolValue: true}}, + "int": {Kind: &structpb.Value_NumberValue{NumberValue: 2}}, + "float32": {Kind: &structpb.Value_NumberValue{NumberValue: 2.3}}, + "string": {Kind: &structpb.Value_StringValue{StringValue: "blue"}}, + }, + }}}, + }, + }}, + }, + }, + } + + want := Record{ + Position: r1.Position, + Operation: Operation(r1.Operation), + Metadata: r1.Metadata, + Key: RawData(r1.Key.GetRawData()), + Payload: Change{ + Before: RawData(r1.Payload.Before.GetRawData()), + After: StructuredData(r1.Payload.After.GetStructuredData().AsMap()), + }, + } + + var got Record + err := got.FromProto(r1) + is.NoErr(err) + is.Equal(got, want) + + // writing another record to the same target should overwrite the previous + + want2 := Record{} + err = got.FromProto(&opencdcv1.Record{}) + is.NoErr(err) + is.Equal(got, want2) +} + +func BenchmarkRecord_FromProto_Structured(b *testing.B) { + r1 := &opencdcv1.Record{ + Position: []byte("standing"), + Operation: opencdcv1.Operation_OPERATION_UPDATE, + Metadata: map[string]string{"foo": "bar"}, + Key: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: []byte("padlock-key")}}, + Payload: &opencdcv1.Change{ + Before: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: []byte("yellow")}}, + After: &opencdcv1.Data{ + Data: &opencdcv1.Data_StructuredData{StructuredData: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "bool": {Kind: &structpb.Value_BoolValue{BoolValue: true}}, + "int": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "int32": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "int64": {Kind: &structpb.Value_NumberValue{NumberValue: 1}}, + "float32": {Kind: &structpb.Value_NumberValue{NumberValue: 1.2}}, + "float64": {Kind: &structpb.Value_NumberValue{NumberValue: 1.2}}, + "string": {Kind: &structpb.Value_StringValue{StringValue: "orange"}}, + }, + }}, + }, + }, + } + + // reuse the same target record + var r2 Record + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = r2.FromProto(r1) + } + _ = r2 +} + +func TestRecord_ToProto(t *testing.T) { + is := is.New(t) + + r1 := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + + "nested": map[string]any{ + "bool": true, + "float": 2.3, + "string": "blue", + }, + }, + }, + } + + after, err := structpb.NewStruct(r1.Payload.After.(StructuredData)) + is.NoErr(err) + want := &opencdcv1.Record{ + Position: r1.Position, + Operation: opencdcv1.Operation(r1.Operation), + Metadata: r1.Metadata, + Key: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: r1.Key.(RawData)}}, + Payload: &opencdcv1.Change{ + Before: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: r1.Payload.Before.(RawData)}}, + After: &opencdcv1.Data{Data: &opencdcv1.Data_StructuredData{StructuredData: after}}, + }, + } + + var got opencdcv1.Record + err = r1.ToProto(&got) + is.NoErr(err) + is.Equal(&got, want) + + // writing another record to the same target should overwrite the previous + + want2 := &opencdcv1.Record{ + Payload: &opencdcv1.Change{}, // there's always a change + } + err = Record{}.ToProto(&got) + is.NoErr(err) + is.Equal(&got, want2) +} + +func BenchmarkRecord_ToProto_Structured(b *testing.B) { + r1 := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + + // reuse the same target record + var r2 opencdcv1.Record + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = r1.ToProto(&r2) + } +} + +func BenchmarkRecord_ToProto_Raw(b *testing.B) { + for _, size := range []int{1, 100, 10000, 1000000} { + payload := make([]byte, size) + r1 := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: RawData(payload), + }, + } + + b.Run(fmt.Sprintf("%d", size), func(b *testing.B) { + // reuse the same target record + var r2 opencdcv1.Record + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = r1.ToProto(&r2) + } + }) + } +} diff --git a/opencdc/record.go b/opencdc/record.go index d2a40a4..cc7b8a0 100644 --- a/opencdc/record.go +++ b/opencdc/record.go @@ -23,6 +23,10 @@ import ( // Record represents a single data record produced by a source and/or consumed // by a destination connector. +// Record should be used as a value, not a pointer, except when (de)serializing +// the record. Note that methods related to (de)serializing the record mutate +// the record and are thus not thread-safe (see SetSerializer, FromProto and +// UnmarshalJSON). type Record struct { // Position uniquely represents the record. Position Position `json:"position"` @@ -46,17 +50,16 @@ type Record struct { serializer RecordSerializer } -// WithSerializer returns a new record which is serialized using the provided -// serializer when Bytes gets called. If serializer is nil, the serializing -// behavior is reset to the default (JSON). -func (r Record) WithSerializer(serializer RecordSerializer) Record { +// SetSerializer sets the serializer used to encode the record into bytes. If +// serializer is nil, the serializing behavior is reset to the default (JSON). +// This method mutates the receiver and is not thread-safe. +func (r *Record) SetSerializer(serializer RecordSerializer) { r.serializer = serializer - return r } // Bytes returns the serialized representation of the Record. By default, this // function returns a JSON representation. The serialization logic can be changed -// using WithSerializer. +// using SetSerializer. func (r Record) Bytes() []byte { if r.serializer != nil { b, err := r.serializer.Serialize(r) diff --git a/proto/buf.gen.yaml b/proto/buf.gen.yaml new file mode 100644 index 0000000..b554049 --- /dev/null +++ b/proto/buf.gen.yaml @@ -0,0 +1,10 @@ +version: v1 +managed: + enabled: true + go_package_prefix: + default: "github.com/conduitio/conduit-commons/proto" +plugins: + - plugin: buf.build/protocolbuffers/go:v1.31.0 + out: . + opt: + - paths=source_relative \ No newline at end of file diff --git a/proto/buf.lock b/proto/buf.lock new file mode 100644 index 0000000..c91b581 --- /dev/null +++ b/proto/buf.lock @@ -0,0 +1,2 @@ +# Generated by buf. The expected format is a unix timestamp in nanoseconds. +option (metadata_created_at) = "opencdc.createdAt"; +// ReadAt can contain the time when the record was read from the 3rd party +// system. The expected format is a unix timestamp in nanoseconds. +option (metadata_read_at) = "opencdc.readAt"; +// Version contains the version of the OpenCDC format (e.g. "v1"). This field +// exists to ensure the OpenCDC format version can be easily identified in case +// the record gets marshaled into a different untyped format (e.g. JSON). +option (metadata_version) = "opencdc.version"; +// OpenCDC version is a constant that should be used as the value in the +// metadata field opencdc.version. It ensures the OpenCDC format version can be +// easily identified in case the record gets marshaled into a different untyped +// format (e.g. JSON). +option (opencdc_version) = "v1"; + +// We are (ab)using custom file options to define constants. +// See https://github.com/protocolbuffers/protobuf/issues/3520#issuecomment-323613839 +extend google.protobuf.FileOptions { + string opencdc_version = 9999; + + string metadata_version = 10000; + string metadata_created_at = 10001; + string metadata_read_at = 10002; +} + +// Operation defines what triggered the creation of a record. +enum Operation { + OPERATION_UNSPECIFIED = 0; + // Records with operation create contain data of a newly created entity. + OPERATION_CREATE = 1; + // Records with operation update contain data of an updated entity. + OPERATION_UPDATE = 2; + // Records with operation delete contain data of a deleted entity. + OPERATION_DELETE = 3; + // Records with operation snapshot contain data of a previously existing + // entity, fetched as part of a snapshot. + OPERATION_SNAPSHOT = 4; +} + +// Record contains data about a single change event related to a single entity. +message Record { + // Position uniquely identifies the record. + bytes position = 1; + + // Operation defines what triggered the creation of a record. There are four + // possibilities: create, update, delete or snapshot. The first three + // operations are encountered during normal CDC operation, while "snapshot" is + // meant to represent records during an initial load. Depending on the + // operation, the record will contain either the payload before the change, + // after the change, or both (see field payload). + Operation operation = 2; + + // Metadata contains optional information related to the record. Although the + // map can contain arbitrary keys, the standard provides a set of standard + // metadata fields (see options prefixed with metadata_*). + map metadata = 3; + + // Key represents a value that should identify the entity (e.g. database row). + Data key = 4; + // Payload holds the payload change (data before and after the operation + // occurred). + Change payload = 5; +} + +// Change represents the data before and after the operation occurred. +message Change { + // Before contains the data before the operation occurred. This field is + // optional and should only be populated for operations "update" and "delete" + // (if the system supports fetching the data before the operation). + Data before = 1; + // After contains the data after the operation occurred. This field should be + // populated for all operations except "delete". + Data after = 2; +} + +// Data is used to represent the record key and payload. It can be either raw +// data (byte array) or structured data (struct). +message Data { + oneof data { + // Raw data contains unstructured data in form of a byte array. + bytes raw_data = 1; + // Structured data contains data in form of a struct with fields. + google.protobuf.Struct structured_data = 2; + } +} diff --git a/tools.go b/tools.go index 32e6cc4..80978b6 100644 --- a/tools.go +++ b/tools.go @@ -17,6 +17,7 @@ package main import ( + _ "github.com/bufbuild/buf/cmd/buf" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "golang.org/x/tools/cmd/stringer" _ "mvdan.cc/gofumpt"