Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Nov 1, 2024
1 parent 36a07dc commit 90e1733
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 111 deletions.
3 changes: 0 additions & 3 deletions pkg/provisioning/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ var (

ConnectorImmutableFields = []string{"Type"}
ConnectorMutableFields = []string{"Name", "Settings", "Processors", "Plugin"}

ProcessorImmutableFields = []string{"Plugin"}
ProcessorMutableFields = []string{"Settings", "Workers", "Condition"}
)

// Parser reads data from reader and parses all pipelines defined in the
Expand Down
14 changes: 0 additions & 14 deletions pkg/provisioning/config/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,6 @@ func TestConnectorFields(t *testing.T) {
is.Equal(wantFields, haveFields) // fields don't match, if you added a field to Connector please add it to ConnectorImmutableFields or ConnectorMutableFields
}

func TestProcessorFields(t *testing.T) {
is := is.New(t)

wantFields := extractFieldNames(reflect.TypeOf(Processor{}))
haveFields := []string{"ID"} // ID is special, it's the identifier
haveFields = append(haveFields, ProcessorImmutableFields...)
haveFields = append(haveFields, ProcessorMutableFields...)

sort.Strings(wantFields)
sort.Strings(haveFields)

is.Equal(wantFields, haveFields) // fields don't match, if you added a field to Processor please add it to ProcessorImmutableFields or ProcessorMutableFields
}

func extractFieldNames(t reflect.Type) []string {
fields := make([]string, t.NumField())
for i := 0; i < t.NumField(); i++ {
Expand Down
37 changes: 7 additions & 30 deletions pkg/provisioning/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,40 +305,17 @@ func (ab actionsBuilder) prepareProcessorActions(oldConfig, newConfig config.Pro
}}
}

// first compare whole configs
// configs match, no need to do anything
if cmp.Equal(oldConfig, newConfig) {
// configs match, no need to do anything
return nil
}

// compare them again but ignore mutable fields, if configs are still
// different an update is not possible, we have to entirely recreate the
// processor
opts := []cmp.Option{
cmpopts.IgnoreFields(config.Processor{}, config.ProcessorMutableFields...),
}
if cmp.Equal(oldConfig, newConfig, opts...) {
// only updatable fields don't match, we can update the processor
return []action{updateProcessorAction{
oldConfig: oldConfig,
newConfig: newConfig,
processorService: ab.processorService,
}}
}

// we have to delete the old processor and create a new one
return []action{
deleteProcessorAction{
cfg: oldConfig,
parent: parent,
processorService: ab.processorService,
},
createProcessorAction{
cfg: newConfig,
parent: parent,
processorService: ab.processorService,
},
}
// the processor changed, and all parts of a processor are updateable
return []action{updateProcessorAction{
oldConfig: oldConfig,
newConfig: newConfig,
processorService: ab.processorService,
}}
}

func reverseActions(actions []action) {
Expand Down
21 changes: 11 additions & 10 deletions pkg/provisioning/import_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-commons/lang"
"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/processor"
Expand All @@ -42,8 +43,8 @@ func TestCreatePipelineAction_Do(t *testing.T) {
DLQ: config.DLQ{
Plugin: "dlq-plugin",
Settings: map[string]string{"foo": "bar"},
WindowSize: intPtr(1),
WindowNackThreshold: intPtr(2),
WindowSize: lang.Ptr(1),
WindowNackThreshold: lang.Ptr(2),
},
}
wantCfg := pipeline.Config{
Expand Down Expand Up @@ -87,8 +88,8 @@ func TestCreatePipelineAction_Rollback(t *testing.T) {
DLQ: config.DLQ{
Plugin: "dlq-plugin",
Settings: map[string]string{"foo": "bar"},
WindowSize: intPtr(1),
WindowNackThreshold: intPtr(2),
WindowSize: lang.Ptr(1),
WindowNackThreshold: lang.Ptr(2),
},
}

Expand All @@ -113,8 +114,8 @@ func TestUpdatePipelineAction(t *testing.T) {
DLQ: config.DLQ{
Plugin: "dlq-plugin",
Settings: map[string]string{"foo": "bar"},
WindowSize: intPtr(1),
WindowNackThreshold: intPtr(2),
WindowSize: lang.Ptr(1),
WindowNackThreshold: lang.Ptr(2),
},
}

Expand Down Expand Up @@ -198,8 +199,8 @@ func TestDeletePipelineAction_Do(t *testing.T) {
DLQ: config.DLQ{
Plugin: "dlq-plugin",
Settings: map[string]string{"foo": "bar"},
WindowSize: intPtr(1),
WindowNackThreshold: intPtr(2),
WindowSize: lang.Ptr(1),
WindowNackThreshold: lang.Ptr(2),
},
}

Expand Down Expand Up @@ -228,8 +229,8 @@ func TestDeletePipelineAction_Rollback(t *testing.T) {
DLQ: config.DLQ{
Plugin: "dlq-plugin",
Settings: map[string]string{"foo": "bar"},
WindowSize: intPtr(1),
WindowNackThreshold: intPtr(2),
WindowSize: lang.Ptr(1),
WindowNackThreshold: lang.Ptr(2),
},
}
wantCfg := pipeline.Config{
Expand Down
57 changes: 3 additions & 54 deletions pkg/provisioning/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,9 @@ func TestActionBuilder_Build(t *testing.T) {
},
processorService: procSrv,
},
deleteProcessorAction{
cfg: oldConfig.Processors[1],
parent: processor.Parent{
ID: oldConfig.ID,
Type: processor.ParentTypePipeline,
},
processorService: procSrv,
},
createProcessorAction{
cfg: newConfig.Processors[1],
parent: processor.Parent{
ID: newConfig.ID,
Type: processor.ParentTypePipeline,
},
updateProcessorAction{
oldConfig: oldConfig.Processors[1],
newConfig: newConfig.Processors[1],
processorService: procSrv,
},
updateProcessorAction{
Expand Down Expand Up @@ -757,50 +746,10 @@ func TestActionsBuilder_PrepareProcessorActions_Update(t *testing.T) {
}
}

func TestActionsBuilder_PrepareProcessorActions_Recreate(t *testing.T) {
logger := log.Nop()
ctrl := gomock.NewController(t)

srv, _, _, procSrv, _, _ := newTestService(ctrl, logger)
parent := processor.Parent{
ID: uuid.NewString(),
Type: processor.ParentTypePipeline,
}

testCases := []struct {
name string
oldConfig config.Processor
newConfig config.Processor
}{{
name: "different Type",
oldConfig: config.Processor{ID: "config-id", Plugin: "old-type"},
newConfig: config.Processor{ID: "config-id", Plugin: "new-type"},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
want := []action{deleteProcessorAction{
cfg: tc.oldConfig,
parent: parent,
processorService: procSrv,
}, createProcessorAction{
cfg: tc.newConfig,
parent: parent,
processorService: procSrv,
}}
got := srv.newActionsBuilder().prepareProcessorActions(tc.oldConfig, tc.newConfig, parent)
is.Equal(got, want)
})
}
}

// -------------
// -- HELPERS --
// -------------

func intPtr(i int) *int { return &i }

func newTestService(ctrl *gomock.Controller, logger log.CtxLogger) (*Service, *mock.PipelineService, *mock.ConnectorService, *mock.ProcessorService, *mock.ConnectorPluginService, *mock.LifecycleService) {
db := &inmemory.DB{}
pipSrv := mock.NewPipelineService(ctrl)
Expand Down

0 comments on commit 90e1733

Please sign in to comment.