Skip to content

Commit

Permalink
sourceWithEncoding middleware works impropertly in combination with s…
Browse files Browse the repository at this point in the history
…ourceWithBatch (#248)

* ReadN method implementation added to sourceWithEncoding middleware for correct integration with sourceWithBatch middleware

* gofumpt -w source_middleware_test.go

---------

Co-authored-by: Pavel Berezhnoy <[email protected]>
Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2025
1 parent 3575aa8 commit 654967d
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 4 deletions.
31 changes: 27 additions & 4 deletions source_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,16 +433,39 @@ func (s *sourceWithEncoding) Read(ctx context.Context) (opencdc.Record, error) {
return rec, err
}

if err := s.encodeKey(ctx, &rec); err != nil {
return rec, err
}
if err := s.encodePayload(ctx, &rec); err != nil {
if err := s.encode(ctx, &rec); err != nil {
return rec, err
}

return rec, nil
}

func (s *sourceWithEncoding) ReadN(ctx context.Context, n int) ([]opencdc.Record, error) {
recs, err := s.Source.ReadN(ctx, n)
if err != nil {
return recs, err
}

for i := range recs {
if err := s.encode(ctx, &recs[i]); err != nil {
return recs, fmt.Errorf("unable to encode record %d: %w", i, err)
}
}

return recs, nil
}

func (s *sourceWithEncoding) encode(ctx context.Context, rec *opencdc.Record) error {
if err := s.encodeKey(ctx, rec); err != nil {
return err
}
if err := s.encodePayload(ctx, rec); err != nil {
return err
}

return nil
}

func (s *sourceWithEncoding) encodeKey(ctx context.Context, rec *opencdc.Record) error {
if _, ok := rec.Key.(opencdc.StructuredData); !ok {
// log warning once, to avoid spamming the logs
Expand Down
124 changes: 124 additions & 0 deletions source_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,130 @@ func TestSourceWithEncoding_Read(t *testing.T) {
}
}

func TestSourceWithEncoding_ReadN(t *testing.T) {
is := is.New(t)
ctx := context.Background()

testDataStruct := opencdc.StructuredData{
"foo": "bar",
"long": int64(1),
"float": 2.34,
"time": time.Now().UTC().Truncate(time.Microsecond), // avro precision is microseconds
}
wantSchema := `{"name":"record","type":"record","fields":[{"name":"float","type":"double"},{"name":"foo","type":"string"},{"name":"long","type":"long"},{"name":"time","type":{"type":"long","logicalType":"timestamp-micros"}}]}`

customTestSchema, err := schema.Create(ctx, schema.TypeAvro, "custom-test-schema", []byte(wantSchema))
is.NoErr(err)

bytes, err := customTestSchema.Marshal(testDataStruct)
is.NoErr(err)
testDataRaw := opencdc.RawData(bytes)

testCases := []struct {
name string
inputRecs []opencdc.Record
wantRecs []opencdc.Record
}{{
name: "no records returned",
}, {
name: "single record returned",
inputRecs: []opencdc.Record{
{
Key: testDataStruct.Clone(),
Payload: opencdc.Change{
Before: testDataStruct.Clone(),
After: testDataStruct.Clone(),
},
},
},
wantRecs: []opencdc.Record{
{
Key: testDataRaw,
Payload: opencdc.Change{
Before: testDataRaw,
After: testDataRaw,
},
},
},
}, {
name: "multiple records returned",
inputRecs: []opencdc.Record{{
Key: testDataStruct.Clone(),
Payload: opencdc.Change{
Before: testDataStruct.Clone(),
After: testDataStruct.Clone(),
},
}, {
Key: testDataStruct.Clone(),
Payload: opencdc.Change{
Before: testDataStruct.Clone(),
After: testDataStruct.Clone(),
},
}, {
Key: testDataStruct.Clone(),
Payload: opencdc.Change{
Before: testDataStruct.Clone(),
After: testDataStruct.Clone(),
},
}},
wantRecs: []opencdc.Record{{
Key: testDataRaw,
Payload: opencdc.Change{
Before: testDataRaw,
After: testDataRaw,
},
}, {
Key: testDataRaw,
Payload: opencdc.Change{
Before: testDataRaw,
After: testDataRaw,
},
}, {
Key: testDataRaw,
Payload: opencdc.Change{
Before: testDataRaw,
After: testDataRaw,
},
}},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
src := NewMockSource(gomock.NewController(t))

underTest := (&SourceWithEncoding{}).Wrap(src)

for i := range tc.inputRecs {
tc.inputRecs[i].Metadata = map[string]string{
opencdc.MetadataCollection: "foo",
opencdc.MetadataKeySchemaSubject: customTestSchema.Subject,
opencdc.MetadataKeySchemaVersion: strconv.Itoa(customTestSchema.Version),
opencdc.MetadataPayloadSchemaSubject: customTestSchema.Subject,
opencdc.MetadataPayloadSchemaVersion: strconv.Itoa(customTestSchema.Version),
}
}

src.EXPECT().ReadN(ctx, 100).Return(tc.inputRecs, nil)

got, err := underTest.ReadN(ctx, 100)
is.NoErr(err)

is.Equal(len(got), len(tc.wantRecs))

for i := range got {
gotKey := got[i].Key
gotPayloadBefore := got[i].Payload.Before
gotPayloadAfter := got[i].Payload.After

is.Equal("", cmp.Diff(tc.wantRecs[i].Key, gotKey))
is.Equal("", cmp.Diff(tc.wantRecs[i].Payload.Before, gotPayloadBefore))
is.Equal("", cmp.Diff(tc.wantRecs[i].Payload.After, gotPayloadAfter))
}
})
}
}

// -- SourceWithBatch --------------------------------------------------

func TestSourceWithBatch_ReadN(t *testing.T) {
Expand Down

0 comments on commit 654967d

Please sign in to comment.