Skip to content

Commit

Permalink
specgen (#211)
Browse files Browse the repository at this point in the history
specgen

---------

Co-authored-by: Haris Osmanagic <[email protected]>

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
hariso and lovromazgon authored Dec 23, 2024
1 parent fc1d8e7 commit 9d9a69d
Show file tree
Hide file tree
Showing 87 changed files with 16,420 additions and 1,840 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
specgen/main
28 changes: 28 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
echo
echo "Running integration tests..."
echo
cd specgen/specgen/tests/parse_specs/ && go test $(GOTEST_FLAGS) -race ./...
cd specgen/specgen/tests/write_and_combine/ && go test $(GOTEST_FLAGS) -race ./...

.PHONY: fmt
fmt:
Expand All @@ -19,3 +24,26 @@ install-tools:
@echo Installing tools from tools.go
@go list -e -f '{{ join .Imports "\n" }}' tools.go | xargs -I % go list -f "%@{{.Module.Version}}" % | xargs -tI % go install %
@go mod tidy

.PHONY: tidy-all
tidy-all:
@echo "Tidying up module in parse_specs directory"
@(cd specgen/specgen/tests/parse_specs && go mod tidy)
@echo "Tidying up subdirectories..."
@for dir in specgen/specgen/tests/parse_specs/*/; do \
if [ -f "$$dir/go.mod" ]; then \
echo "Processing directory: $$dir"; \
(cd "$$dir" && go mod tidy) || exit 1; \
fi \
done

@echo "Tidying up module in write_and_combine directory"
@(cd specgen/specgen/tests/write_and_combine && go mod tidy)
@echo "Tidying up subdirectories..."
@for dir in specgen/specgen/tests/write_and_combine/*/; do \
if [ -f "$$dir/go.mod" ]; then \
echo "Processing directory: $$dir"; \
(cd "$$dir" && go mod tidy) || exit 1; \
fi \
done
@echo "Module tidying complete."
93 changes: 77 additions & 16 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []
// writing something to the destination should result in the same record
// being produced by the source
dest := d.Connector().NewDestination()
err := dest.Configure(ctx, d.DestinationConfig(t))
err := configureDestination(ctx, dest, d.DestinationConfig(t), d.Connector().NewSpecification().DestinationParams)
is.NoErr(err)

err = dest.Open(ctx)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
// writing something to the destination should result in the same record
// being produced by the source
src := d.Connector().NewSource()
err := src.Configure(ctx, d.SourceConfig(t))
err := configureSource(ctx, src, d.SourceConfig(t), d.Connector().NewSpecification().SourceParams)
is.NoErr(err)

err = src.Open(ctx, nil)
Expand Down Expand Up @@ -547,8 +547,7 @@ func (a acceptanceTest) TestSource_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
params := source.Parameters()
params := a.driver.Connector().NewSpecification().SourceParams

// we enforce that there is at least 1 parameter, any real source will
// require some configuration
Expand All @@ -569,7 +568,7 @@ func (a acceptanceTest) TestSource_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -582,10 +581,10 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

srcSpec := a.driver.Connector().NewSource()
srcParams := a.driver.Connector().NewSpecification().SourceParams
origCfg := a.driver.SourceConfig(t)

for name, p := range srcSpec.Parameters() {
for name, p := range srcParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -602,7 +601,7 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
is.Equal(len(haveCfg)+1, len(origCfg)) // source config does not contain required parameter, please check the test setup

source := a.driver.Connector().NewSource()
err := source.Configure(ctx, haveCfg)
err := configureSource(ctx, source, haveCfg, a.driver.Connector().NewSpecification().SourceParams)
is.True(err != nil)

err = source.Teardown(ctx)
Expand Down Expand Up @@ -781,8 +780,7 @@ func (a acceptanceTest) TestDestination_Parameters_Success(t *testing.T) {
is := is.New(t)
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
params := dest.Parameters()
params := a.driver.Connector().NewSpecification().DestinationParams

// we enforce that there is at least 1 parameter, any real destination will
// require some configuration
Expand All @@ -803,7 +801,7 @@ func (a acceptanceTest) TestDestination_Configure_Success(t *testing.T) {
defer a.verifyGoleaks(t)

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

// calling Teardown after Configure is valid and happens when connector is created
Expand All @@ -816,10 +814,10 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is := is.New(t)
ctx := a.context(t)

destSpec := a.driver.Connector().NewDestination()
dstParams := a.driver.Connector().NewSpecification().DestinationParams
origCfg := a.driver.DestinationConfig(t)

for name, p := range destSpec.Parameters() {
for name, p := range dstParams {
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(config.ValidationRequired); ok {
Expand All @@ -836,7 +834,12 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
is.Equal(len(origCfg), len(haveCfg)+1) // destination config does not contain required parameter, please check the test setup

dest := a.driver.Connector().NewDestination()
err := dest.Configure(ctx, haveCfg)
err := configureDestination(
ctx,
dest,
haveCfg,
a.driver.Connector().NewSpecification().DestinationParams,
)
is.True(err != nil) // expected error if required param is removed

err = dest.Teardown(ctx)
Expand Down Expand Up @@ -905,7 +908,7 @@ func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos opencd
is := is.New(t)

source = a.driver.Connector().NewSource()
err := source.Configure(ctx, a.driver.SourceConfig(t))
err := a.configureSource(ctx, t, source)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand All @@ -929,7 +932,7 @@ func (a acceptanceTest) openDestination(ctx context.Context, t *testing.T) (dest
is := is.New(t)

dest = a.driver.Connector().NewDestination()
err := dest.Configure(ctx, a.driver.DestinationConfig(t))
err := a.configureDestination(ctx, t, dest)
is.NoErr(err)

openCtx, cancelOpenCtx := context.WithCancel(ctx)
Expand Down Expand Up @@ -1119,3 +1122,61 @@ func (a acceptanceTest) context(t *testing.T) context.Context {
}
return ctx
}

func (a acceptanceTest) configureSource(ctx context.Context, t *testing.T, src Source) error {
return configureSource(
ctx,
src,
a.driver.SourceConfig(t),
a.driver.Connector().NewSpecification().SourceParams,
)
}

func (a acceptanceTest) configureDestination(ctx context.Context, t *testing.T, dest Destination) error {
return configureDestination(
ctx,
dest,
a.driver.DestinationConfig(t),
a.driver.Connector().NewSpecification().DestinationParams,
)
}

func configureSource(ctx context.Context, src Source, cfgMap config.Config, params config.Parameters) error {
cfg := src.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}

func configureDestination(ctx context.Context, dest Destination, cfgMap config.Config, params config.Parameters) error {
cfg := dest.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return nil
}

err := Util.ParseConfig(ctx, cfgMap, cfg, params)
if err != nil {
return fmt.Errorf("failed to parse configuration: %w", err)
}

err = cfg.Validate(ctx)
if err != nil {
return fmt.Errorf("configuration invalid: %w", err)
}

return nil
}
9 changes: 0 additions & 9 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type benchmarkSource struct {
config map[string]string

// measures
configure time.Duration
open time.Duration
firstRead time.Duration
allReads time.Duration
Expand All @@ -73,13 +72,6 @@ func (bm *benchmarkSource) Run(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bm.configure = bm.measure(func() {
err := bm.source.Configure(ctx, bm.config)
if err != nil {
b.Fatal(err)
}
})

bm.open = bm.measure(func() {
err := bm.source.Open(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -172,7 +164,6 @@ func (*benchmarkSource) measure(f func()) time.Duration {
func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(0, "ns/op") // suppress ns/op metric, it is misleading in this benchmarkSource

b.ReportMetric(bm.configure.Seconds(), "configure")
b.ReportMetric(bm.open.Seconds(), "open")
b.ReportMetric(bm.stop.Seconds(), "stop")
b.ReportMetric(bm.teardown.Seconds(), "teardown")
Expand Down
88 changes: 55 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ import (
// All implementations must embed UnimplementedDestination for forward
// compatibility.
type Destination interface {
// Parameters is a map of named Parameters that describe how to configure
// the Destination.
Parameters() config.Parameters

// Configure is the first function to be called in a connector. It provides the
// connector with the configuration that needs to be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The connector SDK will sanitize, apply defaults and validate the
// configuration before calling this function. This means that the
// configuration will always contain all keys defined in Parameters
// (unprovided keys will have their default values) and all non-empty
// values will be of the correct type.
Configure(context.Context, config.Config) error
// Config returns the configuration that the destination expects. It should
// return a pointer to a struct that contains all the configuration keys that
// the destination expects. The struct should be annotated with the necessary
// validation tags. The value should be a pointer to allow the SDK to
// populate it using the values from the configuration.
//
// The returned DestinationConfig should contain all the configuration keys
// that the destination expects, including middleware fields (see
// [DefaultDestinationMiddleware]).
Config() DestinationConfig

// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
Expand Down Expand Up @@ -93,20 +88,35 @@ type Destination interface {
mustEmbedUnimplementedDestination()
}

// DestinationConfig represents the configuration containing all configuration
// keys that a destination expects. The type needs to implement [Validatable],
// which will be used to automatically validate the config when configuring the
// connector.
type DestinationConfig interface {
Validatable

mustEmbedUnimplementedDestinationConfig()
}

// NewDestinationPlugin takes a Destination and wraps it into an adapter that
// converts it into a pconnector.DestinationPlugin. If the parameter is nil it
// will wrap UnimplementedDestination instead.
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin {
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.DestinationPlugin {
if impl == nil {
// prevent nil pointers
impl = UnimplementedDestination{}
}
return &destinationPluginAdapter{impl: impl, cfg: cfg}
return &destinationPluginAdapter{
impl: impl,
cfg: cfg,
parameters: parameters,
}
}

type destinationPluginAdapter struct {
impl Destination
cfg pconnector.PluginConfig
impl Destination
cfg pconnector.PluginConfig
parameters config.Parameters

// lastPosition holds the position of the last record passed to the connector's
// Write method. It is used to determine when the connector should stop.
Expand All @@ -119,29 +129,29 @@ type destinationPluginAdapter struct {

func (a *destinationPluginAdapter) Configure(ctx context.Context, req pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatchConfig{})

err := a.impl.Configure(ctx, req.Config)
if err != nil {
return pconnector.DestinationConfigureResponse{}, err
cfg := a.impl.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return pconnector.DestinationConfigureResponse{}, nil
}

a.configureWriteStrategy(ctx)
return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records
err := Util.ParseConfig(ctx, req.Config, cfg, a.parameters)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("failed to parse configuration: %w", err)
}

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
err = cfg.Validate(ctx)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("configuration invalid: %w", err)
}

return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatch{})

a.lastPosition = new(csync.ValueWatcher[opencdc.Position])

Expand All @@ -164,9 +174,21 @@ func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.Destin
}()

err := a.impl.Open(ctxOpen)
a.configureWriteStrategy(ctxOpen)

return pconnector.DestinationOpenResponse{}, err
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
}
}

func (a *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.DestinationRunStream) error {
ctx = internal.Enrich(ctx, a.cfg)
a.writeStrategy.SetStream(stream.Server())
Expand Down
Loading

0 comments on commit 9d9a69d

Please sign in to comment.