Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

specgen #211

Merged
merged 12 commits into from
Dec 23, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
specgen/main
28 changes: 28 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
echo
echo "Running integration tests..."
echo
cd specgen/specgen/tests/parse_specs/ && go test $(GOTEST_FLAGS) -race ./...
cd specgen/specgen/tests/write_and_combine/ && go test $(GOTEST_FLAGS) -race ./...

.PHONY: fmt
fmt:
Expand All @@ -19,3 +24,26 @@ install-tools:
@echo Installing tools from tools.go
@go list -e -f '{{ join .Imports "\n" }}' tools.go | xargs -I % go list -f "%@{{.Module.Version}}" % | xargs -tI % go install %
@go mod tidy

.PHONY: tidy-all
tidy-all:
@echo "Tidying up module in parse_specs directory"
@(cd specgen/specgen/tests/parse_specs && go mod tidy)
@echo "Tidying up subdirectories..."
@for dir in specgen/specgen/tests/parse_specs/*/; do \
if [ -f "$$dir/go.mod" ]; then \
echo "Processing directory: $$dir"; \
(cd "$$dir" && go mod tidy) || exit 1; \
fi \
done

@echo "Tidying up module in write_and_combine directory"
@(cd specgen/specgen/tests/write_and_combine && go mod tidy)
@echo "Tidying up subdirectories..."
@for dir in specgen/specgen/tests/write_and_combine/*/; do \
if [ -f "$$dir/go.mod" ]; then \
echo "Processing directory: $$dir"; \
(cd "$$dir" && go mod tidy) || exit 1; \
fi \
done
@echo "Module tidying complete."
93 changes: 77 additions & 16 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []
// writing something to the destination should result in the same record
// being produced by the source
dest := d.Connector().NewDestination()
err := dest.Configure(ctx, d.DestinationConfig(t))
err := configureDestination(ctx, dest, d.DestinationConfig(t), d.Connector().NewSpecification().DestinationParams)
is.NoErr(err)

err = dest.Open(ctx)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
// writing something to the destination should result in the same record
// being produced by the source
src := d.Connector().NewSource()
err := src.Configure(ctx, d.SourceConfig(t))
err := configureSource(ctx, src, d.SourceConfig(t), d.Connector().NewSpecification().SourceParams)
is.NoErr(err)

err = src.Open(ctx, nil)
Expand Down Expand Up @@ -547,8 +547,7 @@ func (a acceptanceTest) TestSource_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
params := source.Parameters()
params := a.driver.Connector().NewSpecification().SourceParams

// we enforce that there is at least 1 parameter, any real source will
// require some configuration
Expand All @@ -569,7 +568,7 @@ func (a acceptanceTest) TestSource_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -582,10 +581,10 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

srcSpec := a.driver.Connector().NewSource()
srcParams := a.driver.Connector().NewSpecification().SourceParams
origCfg := a.driver.SourceConfig(t)

for name, p := range srcSpec.Parameters() {
for name, p := range srcParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -602,7 +601,7 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is.Equal(len(haveCfg)+1, len(origCfg)) // source config does not contain required parameter, please check the test setup

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, haveCfg)
err := configureSource(ctx, source, haveCfg, a.driver.Connector().NewSpecification().SourceParams)
is.True(err != nil)

err = source.Teardown(ctx)
Expand Down Expand Up @@ -781,8 +780,7 @@ func (a acceptanceTest) TestDestination_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
params := dest.Parameters()
params := a.driver.Connector().NewSpecification().DestinationParams

// we enforce that there is at least 1 parameter, any real destination will
// require some configuration
Expand All @@ -803,7 +801,7 @@ func (a acceptanceTest) TestDestination_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -816,10 +814,10 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

destSpec := a.driver.Connector().NewDestination()
dstParams := a.driver.Connector().NewSpecification().DestinationParams
origCfg := a.driver.DestinationConfig(t)

for name, p := range destSpec.Parameters() {
for name, p := range dstParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -836,7 +834,12 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is.Equal(len(origCfg), len(haveCfg)+1) // destination config does not contain required parameter, please check the test setup

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, haveCfg)
err := configureDestination(
ctx,
dest,
haveCfg,
a.driver.Connector().NewSpecification().DestinationParams,
)
is.True(err != nil) // expected error if required param is removed

err = dest.Teardown(ctx)
Expand Down Expand Up @@ -905,7 +908,7 @@ func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos opencd
is := is.New(t)

source = a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand All @@ -929,7 +932,7 @@ func (a acceptanceTest) openDestination(ctx context.Context, t *testing.T) (dest
is := is.New(t)

dest = a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand Down Expand Up @@ -1119,3 +1122,61 @@ func (a acceptanceTest) context(t *testing.T) context.Context {
}
return ctx
}

func (a acceptanceTest) configureSource(ctx context.Context, t *testing.T, src Source) error {
return configureSource(
ctx,
src,
a.driver.SourceConfig(t),
a.driver.Connector().NewSpecification().SourceParams,
)
}

func (a acceptanceTest) configureDestination(ctx context.Context, t *testing.T, dest Destination) error {
return configureDestination(
ctx,
dest,
a.driver.DestinationConfig(t),
a.driver.Connector().NewSpecification().DestinationParams,
)
}

func configureSource(ctx context.Context, src Source, cfgMap config.Config, params config.Parameters) error {
cfg := src.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}

func configureDestination(ctx context.Context, dest Destination, cfgMap config.Config, params config.Parameters) error {
cfg := dest.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}
9 changes: 0 additions & 9 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type benchmarkSource struct {
config map[string]string

// measures
configure time.Duration
open time.Duration
firstRead time.Duration
allReads time.Duration
Expand All @@ -73,13 +72,6 @@ func (bm *benchmarkSource) Run(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bm.configure = bm.measure(func() {
err := bm.source.Configure(ctx, bm.config)
if err != nil {
b.Fatal(err)
}
})

bm.open = bm.measure(func() {
err := bm.source.Open(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -172,7 +164,6 @@ func (*benchmarkSource) measure(f func()) time.Duration {
func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(0, "ns/op") // suppress ns/op metric, it is misleading in this benchmarkSource

b.ReportMetric(bm.configure.Seconds(), "configure")
b.ReportMetric(bm.open.Seconds(), "open")
b.ReportMetric(bm.stop.Seconds(), "stop")
b.ReportMetric(bm.teardown.Seconds(), "teardown")
Expand Down
88 changes: 55 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ import (
// All implementations must embed UnimplementedDestination for forward
// compatibility.
type Destination interface {
// Parameters is a map of named Parameters that describe how to configure
// the Destination.
Parameters() config.Parameters

// Configure is the first function to be called in a connector. It provides the
// connector with the configuration that needs to be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The connector SDK will sanitize, apply defaults and validate the
// configuration before calling this function. This means that the
// configuration will always contain all keys defined in Parameters
// (unprovided keys will have their default values) and all non-empty
// values will be of the correct type.
Configure(context.Context, config.Config) error
// Config returns the configuration that the destination expects. It should
// return a pointer to a struct that contains all the configuration keys that
// the destination expects. The struct should be annotated with the necessary
// validation tags. The value should be a pointer to allow the SDK to
// populate it using the values from the configuration.
//
// The returned DestinationConfig should contain all the configuration keys
// that the destination expects, including middleware fields (see
// [DefaultDestinationMiddleware]).
Config() DestinationConfig

// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
Expand Down Expand Up @@ -93,20 +88,35 @@ type Destination interface {
mustEmbedUnimplementedDestination()
}

// DestinationConfig represents the configuration containing all configuration
// keys that a destination expects. The type needs to implement [Validatable],
// which will be used to automatically validate the config when configuring the
// connector.
type DestinationConfig interface {
Validatable

mustEmbedUnimplementedDestinationConfig()
}

// NewDestinationPlugin takes a Destination and wraps it into an adapter that
// converts it into a pconnector.DestinationPlugin. If the parameter is nil it
// will wrap UnimplementedDestination instead.
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin {
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.DestinationPlugin {
if impl == nil {
// prevent nil pointers
impl = UnimplementedDestination{}
}
return &destinationPluginAdapter{impl: impl, cfg: cfg}
return &destinationPluginAdapter{
impl: impl,
cfg: cfg,
parameters: parameters,
}
}

type destinationPluginAdapter struct {
impl Destination
cfg pconnector.PluginConfig
impl Destination
cfg pconnector.PluginConfig
parameters config.Parameters

// lastPosition holds the position of the last record passed to the connector's
// Write method. It is used to determine when the connector should stop.
Expand All @@ -119,29 +129,29 @@ type destinationPluginAdapter struct {

func (a *destinationPluginAdapter) Configure(ctx context.Context, req pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatchConfig{})

err := a.impl.Configure(ctx, req.Config)
if err != nil {
return pconnector.DestinationConfigureResponse{}, err
cfg := a.impl.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return pconnector.DestinationConfigureResponse{}, nil
}

a.configureWriteStrategy(ctx)
return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records
err := Util.ParseConfig(ctx, req.Config, cfg, a.parameters)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("failed to parse configuration: %w", err)
}

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
err = cfg.Validate(ctx)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("configuration invalid: %w", err)
}

return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatch{})

a.lastPosition = new(csync.ValueWatcher[opencdc.Position])

Expand All @@ -164,9 +174,21 @@ func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.Destin
}()

err := a.impl.Open(ctxOpen)
a.configureWriteStrategy(ctxOpen)

return pconnector.DestinationOpenResponse{}, err
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
}
}

func (a *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.DestinationRunStream) error {
ctx = internal.Enrich(ctx, a.cfg)
a.writeStrategy.SetStream(stream.Server())
Expand Down
Loading
Loading