diff --git a/destination_middleware.go b/destination_middleware.go index e5eccd19..60975bf8 100644 --- a/destination_middleware.go +++ b/destination_middleware.go @@ -164,8 +164,8 @@ func (d *destinationWithBatch) Open(ctx context.Context) error { return nil } -// setBatchEnabled stores the boolean in the context. If the context already -// contains the key it will update the boolean under that key and return the +// setBatchConfig stores a DestinationWithBatch instance in the context. If the context already +// contains the key it will update the DestinationWithBatch under that key and return the // same context, otherwise it will return a new context with the stored value. // This is used to signal to destinationPluginAdapter if the Destination is // wrapped into DestinationWithBatchConfig middleware. @@ -174,7 +174,7 @@ func (*destinationWithBatch) setBatchConfig(ctx context.Context, cfg Destination if ok { *ctxCfg = cfg } else { - ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, cfg) + ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, &cfg) } return ctx } @@ -425,15 +425,12 @@ type destinationWithSchemaExtraction struct { Destination config *DestinationWithSchemaExtraction - payloadEnabled bool - keyEnabled bool - payloadWarnOnce sync.Once keyWarnOnce sync.Once } func (d *destinationWithSchemaExtraction) Write(ctx context.Context, records []opencdc.Record) (int, error) { - if d.keyEnabled { + if *d.config.KeyEnabled { for i := range records { if err := d.decodeKey(ctx, &records[i]); err != nil { if len(records) > 0 { @@ -443,7 +440,7 @@ func (d *destinationWithSchemaExtraction) Write(ctx context.Context, records []o } } } - if d.payloadEnabled { + if *d.config.PayloadEnabled { for i := range records { if err := d.decodePayload(ctx, &records[i]); err != nil { if len(records) > 0 { diff --git a/destination_middleware_test.go b/destination_middleware_test.go index 4a31aba3..f9b23168 100644 --- a/destination_middleware_test.go +++ b/destination_middleware_test.go @@ -14,184 +14,91 @@ package sdk -/* +import ( + "bytes" + "context" + "errors" + "strconv" + "testing" + "time" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/lang" + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-commons/schema/avro" + "github.com/conduitio/conduit-connector-sdk/schema" + "github.com/matryer/is" + "go.uber.org/mock/gomock" + "golang.org/x/time/rate" +) // -- DestinationWithBatch ----------------------------------------------------- -func TestDestinationWithBatch_Parameters(t *testing.T) { +func TestDestinationWithBatch_Configure(t *testing.T) { is := is.New(t) - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - d := (&DestinationWithBatch{}).Wrap(dst) + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) - want := config.Parameters{ - "foo": { - Default: "bar", - Description: "baz", - }, + mw := DestinationWithBatch{ + BatchSize: 10, + BatchDelay: 123 * time.Second, } + d := mw.Wrap(dst) - dst.EXPECT().Parameters().Return(want) - got := d.Parameters() - - is.Equal(got["foo"], want["foo"]) - is.Equal(len(got), 3) // expected middleware to inject 2 parameters -} - -func TestDestinationWithBatch_Configure(t *testing.T) { - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - - testCases := []struct { - name string - middleware DestinationWithBatch - have config.Config - want DestinationWithBatchConfig - }{{ - name: "empty config", - middleware: DestinationWithBatch{}, - have: config.Config{}, - want: DestinationWithBatchConfig{ - BatchSize: 0, - BatchDelay: 0, - }, - }, { - name: "empty config, custom defaults", - middleware: DestinationWithBatch{ - Config: DestinationWithBatchConfig{ - BatchSize: 5, - BatchDelay: time.Second, - }, - }, - have: config.Config{}, - want: DestinationWithBatchConfig{ - BatchSize: 5, - BatchDelay: time.Second * 1, - }, - }, { - name: "config with values", - middleware: DestinationWithBatch{ - Config: DestinationWithBatchConfig{ - BatchSize: 5, - BatchDelay: time.Second, - }, - }, - have: config.Config{ - configDestinationBatchSize: "12", - configDestinationBatchDelay: "2s", - }, - want: DestinationWithBatchConfig{ - BatchSize: 12, - BatchDelay: time.Second * 2, - }, - }} - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - is := is.New(t) - d := tt.middleware.Wrap(dst) - - ctx := (&destinationWithBatch{}).setBatchConfig(context.Background(), DestinationWithBatchConfig{}) - dst.EXPECT().Configure(ctx, gomock.AssignableToTypeOf(config.Config{})).Return(nil) - - err := d.Configure(ctx, tt.have) + ctx := (&destinationWithBatch{}).setBatchConfig(context.Background(), DestinationWithBatch{}) + err := d.Open(ctx) + is.NoErr(err) - is.NoErr(err) - is.Equal(tt.want, (&destinationWithBatch{}).getBatchConfig(ctx)) - }) - } + is.NoErr(err) + is.Equal(mw, (&destinationWithBatch{}).getBatchConfig(ctx)) } // -- DestinationWithRateLimit ------------------------------------------------- -func TestDestinationWithRateLimit_Parameters(t *testing.T) { - is := is.New(t) - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - - d := (&DestinationWithRateLimit{}).Wrap(dst) - - want := config.Parameters{ - "foo": { - Default: "bar", - Description: "baz", - }, - } - - dst.EXPECT().Parameters().Return(want) - got := d.Parameters() - - is.Equal(got["foo"], want["foo"]) - is.Equal(len(got), 3) // expected middleware to inject 2 parameters -} - func TestDestinationWithRateLimit_Configure(t *testing.T) { - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - ctx := context.Background() - testCases := []struct { name string middleware DestinationWithRateLimit - have config.Config wantLimiter bool wantLimit rate.Limit wantBurst int }{{ name: "empty config", middleware: DestinationWithRateLimit{}, - have: config.Config{}, wantLimiter: false, }, { - name: "empty config, custom defaults", + name: "custom defaults", middleware: DestinationWithRateLimit{ - Config: DestinationWithRateLimitConfig{ - RatePerSecond: 1.23, - Burst: 4, - }, + RatePerSecond: 1.23, + Burst: 4, }, - have: config.Config{}, wantLimiter: true, wantLimit: rate.Limit(1.23), wantBurst: 4, }, { name: "negative burst default", middleware: DestinationWithRateLimit{ - Config: DestinationWithRateLimitConfig{ - RatePerSecond: 1.23, - Burst: -2, - }, + RatePerSecond: 1.23, + Burst: -2, }, - have: config.Config{}, wantLimiter: true, wantLimit: rate.Limit(1.23), wantBurst: 2, // burst will be set to ceil of rate per second }, { name: "config with values", middleware: DestinationWithRateLimit{ - Config: DestinationWithRateLimitConfig{ - RatePerSecond: 1.23, - Burst: 4, - }, - }, - have: config.Config{ - configDestinationRatePerSecond: "12.34", - configDestinationRateBurst: "5", + RatePerSecond: 1.23, + Burst: 4, }, wantLimiter: true, - wantLimit: rate.Limit(12.34), - wantBurst: 5, + wantLimit: rate.Limit(1.23), + wantBurst: 4, }, { name: "config with zero burst", middleware: DestinationWithRateLimit{ - Config: DestinationWithRateLimitConfig{ - RatePerSecond: 1.23, - Burst: 4, - }, - }, - have: config.Config{ - configDestinationRateBurst: "0", + RatePerSecond: 1.23, + Burst: 0, }, wantLimiter: true, wantLimit: rate.Limit(1.23), @@ -201,11 +108,13 @@ func TestDestinationWithRateLimit_Configure(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { is := is.New(t) - d := tt.middleware.Wrap(dst).(*destinationWithRateLimit) - dst.EXPECT().Configure(ctx, tt.have).Return(nil) + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) + + d := tt.middleware.Wrap(dst).(*destinationWithRateLimit) - err := d.Configure(ctx, tt.have) + err := d.Open(context.Background()) is.NoErr(err) if !tt.wantLimiter { @@ -220,18 +129,16 @@ func TestDestinationWithRateLimit_Configure(t *testing.T) { func TestDestinationWithRateLimit_Write(t *testing.T) { is := is.New(t) - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) ctx := context.Background() - d := (&DestinationWithRateLimit{}).Wrap(dst) - - dst.EXPECT().Configure(ctx, gomock.Any()).Return(nil) + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) - err := d.Configure(ctx, config.Config{ - configDestinationRatePerSecond: "8", - configDestinationRateBurst: "2", - }) + d := (&DestinationWithRateLimit{ + RatePerSecond: 8, + Burst: 2, + }).Wrap(dst) + err := d.Open(ctx) is.NoErr(err) recs := []opencdc.Record{{}, {}, {}, {}} @@ -271,46 +178,40 @@ func TestDestinationWithRateLimit_Write(t *testing.T) { func TestDestinationWithRateLimit_Write_CancelledContext(t *testing.T) { is := is.New(t) - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) + ctx := context.Background() - d := (&DestinationWithRateLimit{}).Wrap(dst) + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) - ctx, cancel := context.WithCancel(context.Background()) + underTest := (&DestinationWithRateLimit{ + RatePerSecond: 10, + }).Wrap(dst) - dst.EXPECT().Configure(ctx, gomock.Any()).Return(nil) - err := d.Configure(ctx, config.Config{ - configDestinationRatePerSecond: "10", - }) + err := underTest.Open(ctx) is.NoErr(err) + ctx, cancel := context.WithCancel(ctx) cancel() - _, err = d.Write(ctx, []opencdc.Record{{}}) + + _, err = underTest.Write(ctx, []opencdc.Record{{}}) is.True(errors.Is(err, ctx.Err())) } // -- DestinationWithRecordFormat ---------------------------------------------- func TestDestinationWithRecordFormat_Configure(t *testing.T) { - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - ctx := context.Background() - testCases := []struct { name string middleware DestinationWithRecordFormat - have config.Config wantSerializer RecordSerializer }{{ name: "empty config", middleware: DestinationWithRecordFormat{}, - have: config.Config{}, wantSerializer: defaultSerializer, }, { - name: "valid config", - middleware: DestinationWithRecordFormat{}, - have: config.Config{ - configDestinationRecordFormat: "debezium/json", + name: "valid config", + middleware: DestinationWithRecordFormat{ + RecordFormat: lang.Ptr("debezium/json"), }, wantSerializer: GenericRecordSerializer{ Converter: DebeziumConverter{ @@ -323,11 +224,12 @@ func TestDestinationWithRecordFormat_Configure(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { is := is.New(t) - d := tt.middleware.Wrap(dst).(*destinationWithRecordFormat) - dst.EXPECT().Configure(ctx, tt.have).Return(nil) + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) - err := d.Configure(ctx, tt.have) + d := tt.middleware.Wrap(dst).(*destinationWithRecordFormat) + err := d.Open(context.Background()) is.NoErr(err) is.Equal(d.serializer, tt.wantSerializer) @@ -337,49 +239,7 @@ func TestDestinationWithRecordFormat_Configure(t *testing.T) { // -- DestinationWithSchemaExtraction ------------------------------------------ -func TestDestinationWithSchemaExtractionConfig_Apply(t *testing.T) { - is := is.New(t) - - wantCfg := DestinationWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr(true), - KeyEnabled: lang.Ptr(true), - } - - have := &DestinationWithSchemaExtraction{} - wantCfg.Apply(have) - - is.Equal(have.Config, wantCfg) -} - -func TestDestinationWithSchemaExtraction_Parameters(t *testing.T) { - is := is.New(t) - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - - s := (&DestinationWithSchemaExtraction{}).Wrap(dst) - - want := config.Parameters{ - "foo": { - Default: "bar", - Description: "baz", - }, - } - - dst.EXPECT().Parameters().Return(want) - got := s.Parameters() - - is.Equal(got["foo"], want["foo"]) - is.Equal(len(got), 3) // expected middleware to inject 2 parameters -} - func TestDestinationWithSchemaExtraction_Configure(t *testing.T) { - ctrl := gomock.NewController(t) - dst := NewMockDestination(ctrl) - ctx := context.Background() - - connectorID := uuid.NewString() - ctx = internal.Enrich(ctx, pconnector.PluginConfig{ConnectorID: connectorID}) - testCases := []struct { name string middleware DestinationWithSchemaExtraction @@ -388,54 +248,40 @@ func TestDestinationWithSchemaExtraction_Configure(t *testing.T) { wantErr error wantPayloadEnabled bool wantKeyEnabled bool - }{{ - name: "empty config", - middleware: DestinationWithSchemaExtraction{}, - have: config.Config{}, - - wantPayloadEnabled: true, - wantKeyEnabled: true, - }, { - name: "disabled by default", - middleware: DestinationWithSchemaExtraction{ - Config: DestinationWithSchemaExtractionConfig{ + }{ + { + name: "both disabled", + middleware: DestinationWithSchemaExtraction{ PayloadEnabled: lang.Ptr(false), KeyEnabled: lang.Ptr(false), }, + wantPayloadEnabled: false, + wantKeyEnabled: false, }, - have: config.Config{}, - - wantPayloadEnabled: false, - wantKeyEnabled: false, - }, { - name: "disabled by config", - middleware: DestinationWithSchemaExtraction{}, - have: config.Config{ - configDestinationWithSchemaExtractionPayloadEnabled: "false", - configDestinationWithSchemaExtractionKeyEnabled: "false", + { + name: "payload enabled, key disabled", + middleware: DestinationWithSchemaExtraction{ + PayloadEnabled: lang.Ptr(true), + KeyEnabled: lang.Ptr(false), + }, + wantPayloadEnabled: true, + wantKeyEnabled: false, }, - - wantPayloadEnabled: false, - wantKeyEnabled: false, - }} + } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { is := is.New(t) - s := tt.middleware.Wrap(dst).(*destinationWithSchemaExtraction) - dst.EXPECT().Configure(ctx, tt.have).Return(nil) - - err := s.Configure(ctx, tt.have) - if tt.wantErr != nil { - is.True(errors.Is(err, tt.wantErr)) - return - } + dst := NewMockDestination(gomock.NewController(t)) + dst.EXPECT().Open(gomock.Any()).Return(nil) + s := tt.middleware.Wrap(dst).(*destinationWithSchemaExtraction) + err := s.Open(context.Background()) is.NoErr(err) - is.Equal(s.payloadEnabled, tt.wantPayloadEnabled) - is.Equal(s.keyEnabled, tt.wantKeyEnabled) + is.Equal(*s.config.PayloadEnabled, tt.wantPayloadEnabled) + is.Equal(*s.config.KeyEnabled, tt.wantKeyEnabled) }) } } @@ -446,11 +292,10 @@ func TestDestinationWithSchemaExtraction_Write(t *testing.T) { dst := NewMockDestination(ctrl) ctx := context.Background() - d := (&DestinationWithSchemaExtraction{}).Wrap(dst) - - dst.EXPECT().Configure(ctx, gomock.Any()).Return(nil) - err := d.Configure(ctx, config.Config{}) - is.NoErr(err) + d := (&DestinationWithSchemaExtraction{ + PayloadEnabled: lang.Ptr(true), + KeyEnabled: lang.Ptr(true), + }).Wrap(dst) testStructuredData := opencdc.StructuredData{ "foo": "bar", @@ -660,5 +505,3 @@ func TestDestinationWithSchemaExtraction_Write(t *testing.T) { }) } } - -*/ diff --git a/source.go b/source.go index 35f52a78..e98b9c1c 100644 --- a/source.go +++ b/source.go @@ -133,7 +133,7 @@ type SourceConfig interface { } // NewSourcePlugin takes a Source and wraps it into an adapter that converts it -// into a pconnector.SourcePlugin. If the parameter is nil it will wrap +// into a pconnector.SourcePlugin. If the Source is nil it will wrap // UnimplementedSource instead. func NewSourcePlugin(impl Source, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.SourcePlugin { if impl == nil { diff --git a/source_middleware.go b/source_middleware.go index 67fd35a4..6b748128 100644 --- a/source_middleware.go +++ b/source_middleware.go @@ -139,6 +139,10 @@ type SourceWithSchemaExtraction struct { // todo: use https://github.com/ConduitIO/conduit-commons/issues/142 to parse // the string value into schema.Type directly. func (c *SourceWithSchemaExtraction) SchemaType() schema.Type { + if c.SchemaTypeStr == "" { + return schema.TypeAvro + } + t := lang.Ptr(schema.Type(0)) err := t.UnmarshalText([]byte(c.SchemaTypeStr)) if err != nil { @@ -616,9 +620,6 @@ type sourceWithBatch struct { stop chan struct{} collectFn func(context.Context, int) ([]opencdc.Record, error) - - batchSize int - batchDelay time.Duration } type readNResponse struct { @@ -641,7 +642,7 @@ func (s *sourceWithBatch) Open(ctx context.Context, pos opencdc.Position) error s.readCh = make(chan readResponse, *s.config.BatchSize) s.readNCh = make(chan readNResponse, 1) - if s.batchSize > 0 || s.batchDelay > 0 { + if *s.config.BatchSize > 0 || *s.config.BatchDelay > 0 { s.collectFn = s.collectWithReadN go s.runReadN(ctx) } else { @@ -662,7 +663,7 @@ func (s *sourceWithBatch) runReadN(ctx context.Context) { } for { - recs, err := s.Source.ReadN(ctx, s.batchSize) + recs, err := s.Source.ReadN(ctx, *s.config.BatchSize) if err != nil { switch { case errors.Is(err, ErrBackoffRetry): @@ -752,7 +753,7 @@ func (s *sourceWithBatch) ReadN(ctx context.Context, n int) ([]opencdc.Record, e } func (s *sourceWithBatch) collectWithRead(ctx context.Context, _ int) ([]opencdc.Record, error) { - batch := make([]opencdc.Record, 0, s.batchSize) + batch := make([]opencdc.Record, 0, *s.config.BatchSize) var delay <-chan time.Time for { @@ -766,14 +767,14 @@ func (s *sourceWithBatch) collectWithRead(ctx context.Context, _ int) ([]opencdc } batch = append(batch, resp.Record) - if s.batchSize > 0 && len(batch) >= s.batchSize { + if *s.config.BatchSize > 0 && len(batch) >= *s.config.BatchSize { // batch is full, flush it return batch, nil } - if s.batchDelay > 0 && delay == nil { + if *s.config.BatchDelay > 0 && delay == nil { // start the delay timer after we have received the first batch - delay = time.After(s.batchDelay) + delay = time.After(*s.config.BatchDelay) } // continue reading until the batch is full or the delay timer has expired @@ -787,7 +788,7 @@ func (s *sourceWithBatch) collectWithRead(ctx context.Context, _ int) ([]opencdc } func (s *sourceWithBatch) collectWithReadN(ctx context.Context, n int) ([]opencdc.Record, error) { - batch := make([]opencdc.Record, 0, s.batchSize) + batch := make([]opencdc.Record, 0, *s.config.BatchSize) var delay <-chan time.Time for { @@ -805,20 +806,20 @@ func (s *sourceWithBatch) collectWithReadN(ctx context.Context, n int) ([]opencd return nil, resp.Err } - if len(batch) == 0 && len(resp.Records) >= s.batchSize { + if len(batch) == 0 && len(resp.Records) >= *s.config.BatchSize { // source returned a batch that is already full, flush it return resp.Records, nil } batch = append(batch, resp.Records...) - if s.batchSize > 0 && len(batch) >= s.batchSize { + if *s.config.BatchSize > 0 && len(batch) >= *s.config.BatchSize { // batch is full, flush it return batch, nil } - if s.batchDelay > 0 && delay == nil { + if *s.config.BatchDelay > 0 && delay == nil { // start the delay timer after we have received the first batch - delay = time.After(s.batchDelay) + delay = time.After(*s.config.BatchDelay) } // continue reading until the batch is full or the delay timer has expired diff --git a/source_middleware_test.go b/source_middleware_test.go index 39001415..b4c19080 100644 --- a/source_middleware_test.go +++ b/source_middleware_test.go @@ -14,163 +14,92 @@ package sdk -/* +import ( + "context" + "errors" + "strconv" + "testing" + "time" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/csync" + "github.com/conduitio/conduit-commons/lang" + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-connector-sdk/internal" + "github.com/conduitio/conduit-connector-sdk/schema" + "github.com/google/go-cmp/cmp" + "github.com/matryer/is" + "go.uber.org/mock/gomock" +) // -- SourceWithSchemaExtraction ----------------------------------------------- -func TestSourceWithSchemaExtractionConfig_Apply(t *testing.T) { +func TestSourceWithSchemaExtraction_SchemaType(t *testing.T) { is := is.New(t) - wantCfg := SourceWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr(true), - KeyEnabled: lang.Ptr(true), - PayloadSubject: lang.Ptr("foo"), - KeySubject: lang.Ptr("bar"), - } - - have := &SourceWithSchemaExtraction{} - wantCfg.Apply(have) - - is.Equal(have.Config, wantCfg) -} - -func TestSourceWithSchemaExtraction_Parameters(t *testing.T) { - is := is.New(t) - ctrl := gomock.NewController(t) - src := NewMockSource(ctrl) - - s := (&SourceWithSchemaExtraction{}).Wrap(src) - - want := config.Parameters{ - "foo": { - Default: "bar", - Description: "baz", - }, - } - - src.EXPECT().Parameters().Return(want) - got := s.Parameters() - - is.Equal(got["foo"], want["foo"]) - is.Equal(len(got), 6) // expected middleware to inject 5 parameters -} - -func TestSourceWithSchemaExtraction_Configure(t *testing.T) { - ctrl := gomock.NewController(t) - src := NewMockSource(ctrl) - ctx := context.Background() - - connectorID := uuid.NewString() - ctx = internal.Enrich(ctx, pconnector.PluginConfig{ConnectorID: connectorID}) - testCases := []struct { - name string - middleware SourceWithSchemaExtraction - have config.Config - - wantErr error - wantSchemaType schema.Type - wantPayloadSubject string - wantKeySubject string - }{{ - name: "empty config", - middleware: SourceWithSchemaExtraction{}, - have: config.Config{}, - - wantSchemaType: schema.TypeAvro, - wantPayloadSubject: "payload", - wantKeySubject: "key", - }, { - name: "invalid schema type", - middleware: SourceWithSchemaExtraction{}, - have: config.Config{ - configSourceSchemaExtractionType: "foo", + name string + schemaTypeStr string + want schema.Type + wantPanicErr error + }{ + { + name: "valid avro", + schemaTypeStr: "avro", + want: schema.TypeAvro, }, - wantErr: schema.ErrUnsupportedType, - }, { - name: "disabled by default", - middleware: SourceWithSchemaExtraction{ - Config: SourceWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr(false), - KeyEnabled: lang.Ptr(false), - }, + { + name: "invalid: unsupported schema type", + schemaTypeStr: "foo", + want: schema.TypeAvro, + wantPanicErr: schema.ErrUnsupportedType, }, - have: config.Config{}, - - wantSchemaType: schema.TypeAvro, - wantPayloadSubject: "", - wantKeySubject: "", - }, { - name: "disabled by config", - middleware: SourceWithSchemaExtraction{}, - have: config.Config{ - configSourceSchemaExtractionPayloadEnabled: "false", - configSourceSchemaExtractionKeyEnabled: "false", + { + name: "invalid: uppercase", + schemaTypeStr: "AVRO", + want: schema.TypeAvro, + wantPanicErr: schema.ErrUnsupportedType, }, - - wantSchemaType: schema.TypeAvro, - wantPayloadSubject: "", - wantKeySubject: "", - }, { - name: "static default payload subject", - middleware: SourceWithSchemaExtraction{ - Config: SourceWithSchemaExtractionConfig{ - PayloadSubject: lang.Ptr("foo"), - KeySubject: lang.Ptr("bar"), - }, + { + name: "valid: empty string defaults to avro", + schemaTypeStr: "", + want: schema.TypeAvro, }, - have: config.Config{}, + } - wantSchemaType: schema.TypeAvro, - wantPayloadSubject: "foo", - wantKeySubject: "bar", - }, { - name: "payload subject by config", - middleware: SourceWithSchemaExtraction{}, - have: config.Config{ - configSourceSchemaExtractionPayloadSubject: "foo", - configSourceSchemaExtractionKeySubject: "bar", - }, + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) - wantSchemaType: schema.TypeAvro, - wantPayloadSubject: "foo", - wantKeySubject: "bar", - }} + underTest := &SourceWithSchemaExtraction{SchemaTypeStr: tc.schemaTypeStr} - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - is := is.New(t) - s := tt.middleware.Wrap(src).(*sourceWithSchemaExtraction) + if tc.wantPanicErr != nil { + defer func() { + r := recover() + is.True(r != nil) // expected a panic, but none occurred - src.EXPECT().Configure(ctx, tt.have).Return(nil) + panicErr, ok := r.(error) + is.True(ok) // expected panic value to be an error - err := s.Configure(ctx, tt.have) - if tt.wantErr != nil { - is.True(errors.Is(err, tt.wantErr)) - return + if tc.wantPanicErr != nil { + is.True(errors.Is(panicErr, tc.wantPanicErr)) // Unexpected panic error + } + }() } - is.NoErr(err) + result := underTest.SchemaType() - is.Equal(s.schemaType, tt.wantSchemaType) - is.Equal(s.payloadSubject, tt.wantPayloadSubject) - is.Equal(s.keySubject, tt.wantKeySubject) + if tc.wantPanicErr == nil { + is.Equal(tc.want, result) + } }) } } func TestSourceWithSchemaExtraction_Read(t *testing.T) { is := is.New(t) - ctrl := gomock.NewController(t) - src := NewMockSource(ctrl) ctx := context.Background() - s := (&SourceWithSchemaExtraction{}).Wrap(src) - - src.EXPECT().Configure(ctx, gomock.Any()).Return(nil) - err := s.Configure(ctx, config.Config{}) - is.NoErr(err) - testStructuredData := opencdc.StructuredData{ "foo": "bar", "long": int64(1), @@ -343,6 +272,17 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + src := NewMockSource(ctrl) + + s := (&SourceWithSchemaExtraction{ + SchemaTypeStr: "avro", + PayloadEnabled: lang.Ptr(true), + PayloadSubject: lang.Ptr("payload"), + KeyEnabled: lang.Ptr(true), + KeySubject: lang.Ptr("key"), + }).Wrap(src) + src.EXPECT().ReadN(ctx, 1).Return([]opencdc.Record{tc.record}, nil) var wantKey, wantPayloadBefore, wantPayloadAfter opencdc.Data @@ -416,101 +356,24 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) { // -- SourceWithSchemaContext -------------------------------------------------- -func TestSourceWithSchemaContext_Parameters(t *testing.T) { - testCases := []struct { - name string - mwCfg SourceWithSchemaContextConfig - wantParams config.Parameters - }{ - { - name: "default middleware config", - mwCfg: SourceWithSchemaContextConfig{}, - wantParams: config.Parameters{ - "sdk.schema.context.enabled": { - Default: "true", - Description: "Specifies whether to use a schema context name. If set to false, no schema context name " + - "will be used, and schemas will be saved with the subject name specified in the connector " + - "(not safe because of name conflicts).", - Type: config.ParameterTypeBool, - }, - "sdk.schema.context.name": { - Default: "", - Description: "Schema context name to be used. Used as a prefix for all schema subject names. " + - "Defaults to the connector ID.", - Type: config.ParameterTypeString, - }, - }, - }, - { - name: "custom middleware config", - mwCfg: SourceWithSchemaContextConfig{ - Enabled: lang.Ptr(false), - Name: lang.Ptr("foobar"), - }, - wantParams: config.Parameters{ - "sdk.schema.context.enabled": { - Default: "false", - Description: "Specifies whether to use a schema context name. If set to false, no schema context name " + - "will be used, and schemas will be saved with the subject name specified in the connector " + - "(not safe because of name conflicts).", - Type: config.ParameterTypeBool, - }, - "sdk.schema.context.name": { - Default: "foobar", - Description: "Schema context name to be used. Used as a prefix for all schema subject names.", - Type: config.ParameterTypeString, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - is := is.New(t) - ctrl := gomock.NewController(t) - src := NewMockSource(ctrl) - - s := (&SourceWithSchemaContext{ - Config: tc.mwCfg, - }).Wrap(src) - - connectorParams := config.Parameters{ - "foo": { - Default: "bar", - Description: "baz", - }, - } - - src.EXPECT().Parameters().Return(connectorParams) - got := s.Parameters() - - want := config.Parameters{} - maps.Copy(want, connectorParams) - maps.Copy(want, tc.wantParams) - - is.Equal("", cmp.Diff(want, got)) - }) - } -} - func TestSourceWithSchemaContext_Configure(t *testing.T) { connID := "test-connector-id" testCases := []struct { name string - middlewareCfg SourceWithSchemaContextConfig + middlewareCfg SourceWithSchemaContext connectorCfg config.Config wantContextName string }{ { name: "default middleware config, no user config", - middlewareCfg: SourceWithSchemaContextConfig{}, + middlewareCfg: SourceWithSchemaContext{}, connectorCfg: config.Config{}, wantContextName: connID, }, { name: "custom context in middleware, no user config", - middlewareCfg: SourceWithSchemaContextConfig{ + middlewareCfg: SourceWithSchemaContext{ Enabled: lang.Ptr(true), Name: lang.Ptr("foobar"), }, @@ -519,7 +382,7 @@ func TestSourceWithSchemaContext_Configure(t *testing.T) { }, { name: "middleware config: use context false, no user config", - middlewareCfg: SourceWithSchemaContextConfig{ + middlewareCfg: SourceWithSchemaContext{ Enabled: lang.Ptr(false), Name: lang.Ptr("foobar"), }, @@ -528,7 +391,7 @@ func TestSourceWithSchemaContext_Configure(t *testing.T) { }, { name: "user config overrides use context", - middlewareCfg: SourceWithSchemaContextConfig{ + middlewareCfg: SourceWithSchemaContext{ Enabled: lang.Ptr(false), Name: lang.Ptr("foobar"), }, @@ -539,7 +402,7 @@ func TestSourceWithSchemaContext_Configure(t *testing.T) { }, { name: "user config overrides context name, non-empty", - middlewareCfg: SourceWithSchemaContextConfig{ + middlewareCfg: SourceWithSchemaContext{ Enabled: lang.Ptr(true), Name: lang.Ptr("foobar"), }, @@ -551,7 +414,7 @@ func TestSourceWithSchemaContext_Configure(t *testing.T) { }, { name: "user config overrides context name, empty", - middlewareCfg: SourceWithSchemaContextConfig{ + middlewareCfg: SourceWithSchemaContext{ Enabled: lang.Ptr(true), Name: lang.Ptr("foobar"), }, @@ -566,61 +429,28 @@ func TestSourceWithSchemaContext_Configure(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) + wantContextName := "user-context-name" ctx := internal.ContextWithConnectorID(context.Background(), connID) s := NewMockSource(gomock.NewController(t)) - mw := &SourceWithSchemaContext{} - - tc.middlewareCfg.Apply(mw) - underTest := mw.Wrap(s) + underTest := (&SourceWithSchemaContext{ + Enabled: lang.Ptr(true), + Name: &wantContextName, + }).Wrap(s) s.EXPECT(). - Configure(gomock.Any(), tc.connectorCfg). - DoAndReturn(func(ctx context.Context, c config.Config) error { - gotContextName := schema.GetSchemaContextName(ctx) - is.Equal(tc.wantContextName, gotContextName) + Open(gomock.Any(), opencdc.Position{}). + DoAndReturn(func(ctx context.Context, _ opencdc.Position) error { + is.Equal(wantContextName, schema.GetSchemaContextName(ctx)) return nil }) - err := underTest.Configure(ctx, tc.connectorCfg) + err := underTest.Open(ctx, opencdc.Position{}) is.NoErr(err) }) } } -func TestSourceWithSchemaContext_ContextValue(t *testing.T) { - is := is.New(t) - connID := "test-connector-id" - connectorCfg := config.Config{ - "sdk.schema.context.use": "true", - "sdk.schema.context.name": "user-context-name", - } - wantContextName := "user-context-name" - ctx := internal.ContextWithConnectorID(context.Background(), connID) - - s := NewMockSource(gomock.NewController(t)) - underTest := (&SourceWithSchemaContext{}).Wrap(s) - - s.EXPECT(). - Configure(gomock.Any(), connectorCfg). - DoAndReturn(func(ctx context.Context, _ config.Config) error { - is.Equal(wantContextName, schema.GetSchemaContextName(ctx)) - return nil - }) - s.EXPECT(). - Open(gomock.Any(), opencdc.Position{}). - DoAndReturn(func(ctx context.Context, _ opencdc.Position) error { - is.Equal(wantContextName, schema.GetSchemaContextName(ctx)) - return nil - }) - - err := underTest.Configure(ctx, connectorCfg) - is.NoErr(err) - - err = underTest.Open(ctx, opencdc.Position{}) - is.NoErr(err) -} - // -- SourceWithEncoding ------------------------------------------------------ func TestSourceWithEncoding_Read(t *testing.T) { @@ -631,10 +461,6 @@ func TestSourceWithEncoding_Read(t *testing.T) { s := (&SourceWithEncoding{}).Wrap(src) - src.EXPECT().Configure(ctx, gomock.Any()).Return(nil) - err := s.Configure(ctx, config.Config{}) - is.NoErr(err) - testDataStruct := opencdc.StructuredData{ "foo": "bar", "long": int64(1), @@ -884,13 +710,10 @@ func TestSourceWithBatch_ReadN(t *testing.T) { is := is.New(t) ctx := context.Background() - connectorCfg := config.Config{ - configSourceBatchSize: "5", - configSourceBatchDelay: "", - } - s := NewMockSource(gomock.NewController(t)) - underTest := (&SourceWithBatch{}).Wrap(s) + underTest := (&SourceWithBatch{ + BatchSize: lang.Ptr(5), + }).Wrap(s) want := []opencdc.Record{ {Position: []byte("1")}, @@ -900,7 +723,6 @@ func TestSourceWithBatch_ReadN(t *testing.T) { {Position: []byte("5")}, } - s.EXPECT().Configure(gomock.Any(), connectorCfg).Return(nil) s.EXPECT().Open(gomock.Any(), opencdc.Position{}).Return(nil) // First batch returns 5 records @@ -914,11 +736,8 @@ func TestSourceWithBatch_ReadN(t *testing.T) { return nil, ctx.Err() }).After(call.Call) - err := underTest.Configure(ctx, connectorCfg) - is.NoErr(err) - openCtx, openCancel := context.WithCancel(ctx) - err = underTest.Open(openCtx, opencdc.Position{}) + err := underTest.Open(openCtx, opencdc.Position{}) is.NoErr(err) got, err := underTest.ReadN(ctx, 1) // 1 record per batch is the default, but the middleware should overwrite it @@ -937,5 +756,3 @@ func TestSourceWithBatch_ReadN(t *testing.T) { _, err = underTest.ReadN(ctx, 1) is.True(errors.Is(err, context.Canceled)) } - -*/ diff --git a/source_test.go b/source_test.go index b9664b8b..bec7172f 100644 --- a/source_test.go +++ b/source_test.go @@ -14,14 +14,29 @@ package sdk -/* +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/conduitio/conduit-commons/cchan" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-connector-protocol/pconnector" + "github.com/conduitio/conduit-connector-sdk/internal" + "github.com/matryer/is" + "github.com/rs/zerolog" + "go.uber.org/mock/gomock" +) func TestSourcePluginAdapter_Start_OpenContext(t *testing.T) { is := is.New(t) ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured var gotCtx context.Context @@ -46,7 +61,7 @@ func TestSourcePluginAdapter_Open_ClosedContext(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured var gotCtx context.Context @@ -75,7 +90,7 @@ func TestSourcePluginAdapter_Open_Logger(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured wantLogger := zerolog.New(zerolog.NewTestWriter(t)) @@ -98,7 +113,7 @@ func TestSourcePluginAdapter_Run(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured want := opencdc.Record{ @@ -185,7 +200,7 @@ func TestSourcePluginAdapter_Run_Stuck(t *testing.T) { stopTimeout = time.Minute }() - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured want := opencdc.Record{ @@ -244,7 +259,7 @@ func TestSourcePluginAdapter_Stop_WaitsForRun(t *testing.T) { stopTimeout = time.Minute // reset }() - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured want := opencdc.Record{ @@ -306,7 +321,7 @@ func TestSourcePluginAdapter_Teardown(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) srcPlugin.state.Set(internal.StateConfigured) // Open expects state Configured src.EXPECT().Open(gomock.Any(), nil).Return(nil) @@ -366,7 +381,7 @@ func TestSourcePluginAdapter_LifecycleOnCreated(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) wantCtx := internal.Enrich(ctx, pconnector.PluginConfig{}) want := config.Config{"foo": "bar"} @@ -383,7 +398,7 @@ func TestSourcePluginAdapter_LifecycleOnUpdated(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) wantCtx := internal.Enrich(ctx, pconnector.PluginConfig{}) wantBefore := config.Config{"foo": "bar"} @@ -404,7 +419,7 @@ func TestSourcePluginAdapter_LifecycleOnDeleted(t *testing.T) { ctrl := gomock.NewController(t) src := NewMockSource(ctrl) - srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}).(*sourcePluginAdapter) + srcPlugin := NewSourcePlugin(src, pconnector.PluginConfig{}, config.Parameters{}).(*sourcePluginAdapter) wantCtx := internal.Enrich(ctx, pconnector.PluginConfig{}) want := config.Config{"foo": "bar"} @@ -414,5 +429,3 @@ func TestSourcePluginAdapter_LifecycleOnDeleted(t *testing.T) { _, err := srcPlugin.LifecycleOnDeleted(ctx, req) is.NoErr(err) } - -*/ diff --git a/specgen/specgen/tests/parse_specs/field_names/specs.go b/specgen/specgen/tests/parse_specs/field_names/specs.go index e66f4b4e..7009fb93 100644 --- a/specgen/specgen/tests/parse_specs/field_names/specs.go +++ b/specgen/specgen/tests/parse_specs/field_names/specs.go @@ -15,8 +15,9 @@ package field_names import ( - sdk "github.com/conduitio/conduit-connector-sdk" "time" + + sdk "github.com/conduitio/conduit-connector-sdk" ) type EmbeddedConfig struct { diff --git a/specgen/specgen/tests/parse_specs/type_aliases/specs.go b/specgen/specgen/tests/parse_specs/type_aliases/specs.go index b036a0e6..37ecc2af 100644 --- a/specgen/specgen/tests/parse_specs/type_aliases/specs.go +++ b/specgen/specgen/tests/parse_specs/type_aliases/specs.go @@ -28,8 +28,10 @@ type SourceConfig struct { CustomDurationTwoField CustomDuration2 } -type CustomDuration CustomDuration2 -type CustomDuration2 time.Duration +type ( + CustomDuration CustomDuration2 + CustomDuration2 time.Duration +) var Connector = sdk.Connector{ NewSpecification: nil,