Skip to content

Commit

Permalink
Author: Haris Osmanagic <[email protected]>
Browse files Browse the repository at this point in the history
Date:   Thu Nov 28 12:44:48 2024 +0100

    add open

commit 2705320
Author: Haris Osmanagic <[email protected]>
Date:   Thu Nov 28 11:21:36 2024 +0100

    destination_middleware_test.go

commit b3f8172
Author: Haris Osmanagic <[email protected]>
Date:   Thu Nov 28 10:08:22 2024 +0100

    fix source_test

commit 6a53d24
Author: Haris Osmanagic <[email protected]>
Date:   Thu Nov 28 10:02:23 2024 +0100

    fix source middleware

commit 6cf5795
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 27 15:39:46 2024 +0100

    add test

commit be36d59
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 16:01:29 2024 +0100

    fix source_middleware_test.go

commit abdb4b7
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:54:48 2024 +0100

    fix DefaultDestinationMiddleware

commit b07a4b5
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:50:00 2024 +0100

    linter

commit 727489f
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:49:46 2024 +0100

    comments

commit 768c935
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:46:33 2024 +0100

    add comment

commit d9f7f64
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:42:57 2024 +0100

    fix test expectation

commit 9f5c6c2
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 12:35:06 2024 +0100

    handle schema type

commit ddd70f5
Author: Haris Osmanagic <[email protected]>
Date:   Wed Nov 20 11:03:34 2024 +0100

    workaround for schema type, optional dest mw param, fix validation

commit fe751ca
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 15:17:30 2024 +0100

    restructure

commit 7318263
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 14:59:25 2024 +0100

    fix tests

commit 04e9fd4
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 14:06:01 2024 +0100

    re-enable acceptance tests

commit c5174a6
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 13:08:09 2024 +0100

    partial specs test

commit dcc08ed
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 12:48:02 2024 +0100

    write yaml, overwrite existiang source and destination

commit 8af981f
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 12:45:18 2024 +0100

    more tests, restructure

commit 1755bac
Author: Haris Osmanagic <[email protected]>
Date:   Tue Nov 19 12:37:09 2024 +0100

    revert changes to input file

commit dabaf2e
Author: Haris Osmanagic <[email protected]>
Date:   Mon Nov 18 19:12:42 2024 +0100

    update test

commit 2b4394a
Author: Haris Osmanagic <[email protected]>
Date:   Mon Nov 18 19:09:57 2024 +0100

    simplify

commit ab77011
Author: Haris Osmanagic <[email protected]>
Date:   Mon Nov 18 19:08:12 2024 +0100

    TestWriteAndCombine

commit f51b771
Author: Haris Osmanagić <[email protected]>
Date:   Mon Nov 18 18:22:24 2024 +0100

    specgen: add more tests, rename connector to specification (#206)

commit 07483f5
Author: Lovro Mažgon <[email protected]>
Date:   Fri Nov 15 15:06:29 2024 +0100

    Specgen (#198)

    * specgen spike

    * start nicer rewrite of specgen

    * support for parsing builtin params

    * support for time.Duration

    * support types in different packages

    * support underlying type

    * better tests

    * specgen specification parsing and default overwriting

    * add support for destinations

    * fix test

    * add parsing and validation of config

    * adjust most destination tests

    ---------

    Co-authored-by: Haris Osmanagic <[email protected]>
  • Loading branch information
hariso committed Nov 28, 2024
1 parent de0a7fe commit bafb76f
Show file tree
Hide file tree
Showing 86 changed files with 16,307 additions and 1,711 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
specgen/main
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
echo
echo "Running integration tests..."
echo
cd specgen/specgen/tests/parse_specs/ && go test $(GOTEST_FLAGS) -race ./...
cd specgen/specgen/tests/write_and_combine/ && go test $(GOTEST_FLAGS) -race ./...


.PHONY: fmt
fmt:
Expand Down
93 changes: 77 additions & 16 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []
// writing something to the destination should result in the same record
// being produced by the source
dest := d.Connector().NewDestination()
err := dest.Configure(ctx, d.DestinationConfig(t))
err := configureDestination(ctx, dest, d.DestinationConfig(t), d.Connector().NewSpecification().DestinationParams)
is.NoErr(err)

err = dest.Open(ctx)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
// writing something to the destination should result in the same record
// being produced by the source
src := d.Connector().NewSource()
err := src.Configure(ctx, d.SourceConfig(t))
err := configureSource(ctx, src, d.SourceConfig(t), d.Connector().NewSpecification().SourceParams)
is.NoErr(err)

err = src.Open(ctx, nil)
Expand Down Expand Up @@ -547,8 +547,7 @@ func (a acceptanceTest) TestSource_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
params := source.Parameters()
params := a.driver.Connector().NewSpecification().SourceParams

// we enforce that there is at least 1 parameter, any real source will
// require some configuration
Expand All @@ -569,7 +568,7 @@ func (a acceptanceTest) TestSource_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -582,10 +581,10 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

srcSpec := a.driver.Connector().NewSource()
srcParams := a.driver.Connector().NewSpecification().SourceParams
origCfg := a.driver.SourceConfig(t)

for name, p := range srcSpec.Parameters() {
for name, p := range srcParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -602,7 +601,7 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is.Equal(len(haveCfg)+1, len(origCfg)) // source config does not contain required parameter, please check the test setup

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, haveCfg)
err := configureSource(ctx, source, haveCfg, a.driver.Connector().NewSpecification().SourceParams)
is.True(err != nil)

err = source.Teardown(ctx)
Expand Down Expand Up @@ -781,8 +780,7 @@ func (a acceptanceTest) TestDestination_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
params := dest.Parameters()
params := a.driver.Connector().NewSpecification().DestinationParams

// we enforce that there is at least 1 parameter, any real destination will
// require some configuration
Expand All @@ -803,7 +801,7 @@ func (a acceptanceTest) TestDestination_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -816,10 +814,10 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

destSpec := a.driver.Connector().NewDestination()
dstParams := a.driver.Connector().NewSpecification().DestinationParams
origCfg := a.driver.DestinationConfig(t)

for name, p := range destSpec.Parameters() {
for name, p := range dstParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -836,7 +834,12 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is.Equal(len(origCfg), len(haveCfg)+1) // destination config does not contain required parameter, please check the test setup

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, haveCfg)
err := configureDestination(
ctx,
dest,
haveCfg,
a.driver.Connector().NewSpecification().DestinationParams,
)
is.True(err != nil) // expected error if required param is removed

err = dest.Teardown(ctx)
Expand Down Expand Up @@ -905,7 +908,7 @@ func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos opencd
is := is.New(t)

source = a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand All @@ -929,7 +932,7 @@ func (a acceptanceTest) openDestination(ctx context.Context, t *testing.T) (dest
is := is.New(t)

dest = a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand Down Expand Up @@ -1119,3 +1122,61 @@ func (a acceptanceTest) context(t *testing.T) context.Context {
}
return ctx
}

func (a acceptanceTest) configureSource(ctx context.Context, t *testing.T, src Source) error {
return configureSource(
ctx,
src,
a.driver.SourceConfig(t),
a.driver.Connector().NewSpecification().SourceParams,
)
}

func (a acceptanceTest) configureDestination(ctx context.Context, t *testing.T, dest Destination) error {
return configureDestination(
ctx,
dest,
a.driver.DestinationConfig(t),
a.driver.Connector().NewSpecification().DestinationParams,
)
}

func configureSource(ctx context.Context, src Source, cfgMap config.Config, params config.Parameters) error {
cfg := src.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}

func configureDestination(ctx context.Context, dest Destination, cfgMap config.Config, params config.Parameters) error {
cfg := dest.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}
5 changes: 5 additions & 0 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

/*
import (
"context"
"fmt"
Expand Down Expand Up @@ -183,3 +185,6 @@ func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(bm.firstAck.Seconds(), "firstAck")
b.ReportMetric(float64(b.N-1)/bm.allAcks.Seconds(), "acks/s")
}
*/
88 changes: 55 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ import (
// All implementations must embed UnimplementedDestination for forward
// compatibility.
type Destination interface {
// Parameters is a map of named Parameters that describe how to configure
// the Destination.
Parameters() config.Parameters

// Configure is the first function to be called in a connector. It provides the
// connector with the configuration that needs to be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The connector SDK will sanitize, apply defaults and validate the
// configuration before calling this function. This means that the
// configuration will always contain all keys defined in Parameters
// (unprovided keys will have their default values) and all non-empty
// values will be of the correct type.
Configure(context.Context, config.Config) error
// Config returns the configuration that the destination expects. It should
// return a pointer to a struct that contains all the configuration keys that
// the destination expects. The struct should be annotated with the necessary
// validation tags. The value should be a pointer to allow the SDK to
// populate it using the values from the configuration.
//
// The returned DestinationConfig should contain all the configuration keys
// that the destination expects, including middleware fields (see
// [DefaultDestinationMiddleware]).
Config() DestinationConfig

// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
Expand Down Expand Up @@ -93,20 +88,35 @@ type Destination interface {
mustEmbedUnimplementedDestination()
}

// DestinationConfig represents the configuration containing all configuration
// keys that a destination expects. The type needs to implement [Validatable],
// which will be used to automatically validate the config when configuring the
// connector.
type DestinationConfig interface {
Validatable

mustEmbedUnimplementedDestinationConfig()
}

// NewDestinationPlugin takes a Destination and wraps it into an adapter that
// converts it into a pconnector.DestinationPlugin. If the parameter is nil it
// will wrap UnimplementedDestination instead.
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin {
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.DestinationPlugin {
if impl == nil {
// prevent nil pointers
impl = UnimplementedDestination{}
}
return &destinationPluginAdapter{impl: impl, cfg: cfg}
return &destinationPluginAdapter{
impl: impl,
cfg: cfg,
parameters: parameters,
}
}

type destinationPluginAdapter struct {
impl Destination
cfg pconnector.PluginConfig
impl Destination
cfg pconnector.PluginConfig
parameters config.Parameters

// lastPosition holds the position of the last record passed to the connector's
// Write method. It is used to determine when the connector should stop.
Expand All @@ -119,29 +129,29 @@ type destinationPluginAdapter struct {

func (a *destinationPluginAdapter) Configure(ctx context.Context, req pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatchConfig{})

err := a.impl.Configure(ctx, req.Config)
if err != nil {
return pconnector.DestinationConfigureResponse{}, err
cfg := a.impl.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return pconnector.DestinationConfigureResponse{}, nil
}

a.configureWriteStrategy(ctx)
return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records
err := Util.ParseConfig(ctx, req.Config, cfg, a.parameters)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("failed to parse configuration: %w", err)
}

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
err = cfg.Validate(ctx)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("configuration invalid: %w", err)
}

return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatch{})

a.lastPosition = new(csync.ValueWatcher[opencdc.Position])

Expand All @@ -164,9 +174,21 @@ func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.Destin
}()

err := a.impl.Open(ctxOpen)
a.configureWriteStrategy(ctxOpen)

return pconnector.DestinationOpenResponse{}, err
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
}
}

func (a *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.DestinationRunStream) error {
ctx = internal.Enrich(ctx, a.cfg)
a.writeStrategy.SetStream(stream.Server())
Expand Down
Loading

0 comments on commit bafb76f

Please sign in to comment.