From c87957b7e10ac5038a57b8bb47cd42f89ba657ae Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Mon, 16 Sep 2024 16:03:50 -0700 Subject: [PATCH] Add syscodes, move creds checking into NewInitializer --- pkg/replicator/pgreplicator/initializer.go | 22 ++++++++-- .../pgreplicator/initializer_test.go | 43 +++++++++++++++++++ pkg/replicator/pgreplicator/pg.go | 10 ----- .../pgreplicator/pgsetup/pgsetup.go | 24 +++++++++-- 4 files changed, 82 insertions(+), 17 deletions(-) create mode 100644 pkg/replicator/pgreplicator/initializer_test.go diff --git a/pkg/replicator/pgreplicator/initializer.go b/pkg/replicator/pgreplicator/initializer.go index d5566f3..eb885db 100644 --- a/pkg/replicator/pgreplicator/initializer.go +++ b/pkg/replicator/pgreplicator/initializer.go @@ -8,6 +8,14 @@ import ( "github.com/jackc/pgx/v5" ) +var ( + ErrInvalidCredentials = pgsetup.ErrInvalidCredentials + ErrCannotCommunicate = pgsetup.ErrCannotCommunicate + ErrLogicalReplicationNotSetUp = pgsetup.ErrLogicalReplicationNotSetUp + ErrReplicationSlotNotFound = pgsetup.ErrReplicationSlotNotFound + ErrReplicationAlreadyRunning = pgsetup.ErrReplicationAlreadyRunning +) + type InitializeResult = pgsetup.TestConnResult type InitializerOpts struct { @@ -19,10 +27,16 @@ type InitializerOpts struct { Password string } -func NewInitializer(ctx context.Context, opts InitializerOpts) replicator.SystemInitializer[InitializeResult] { - // TODO: Immediatey - - return initializer[pgsetup.TestConnResult]{opts: opts} +func NewInitializer(ctx context.Context, opts InitializerOpts) (replicator.SystemInitializer[InitializeResult], error) { + conn, err := pgx.ConnectConfig(ctx, &opts.AdminConfig) + if err != nil { + return nil, ErrInvalidCredentials + } + defer conn.Close(ctx) + if err := conn.Ping(ctx); err != nil { + return nil, ErrCannotCommunicate + } + return initializer[pgsetup.TestConnResult]{opts: opts}, nil } type initializer[T pgsetup.TestConnResult] struct { diff --git a/pkg/replicator/pgreplicator/initializer_test.go b/pkg/replicator/pgreplicator/initializer_test.go new file mode 100644 index 0000000..831bf9f --- /dev/null +++ b/pkg/replicator/pgreplicator/initializer_test.go @@ -0,0 +1,43 @@ +package pgreplicator + +import ( + "context" + "testing" + + "github.com/inngest/dbcap/internal/test" + "github.com/stretchr/testify/require" +) + +func TestNewInitializer(t *testing.T) { + t.Parallel() + versions := []int{10, 11, 12, 13, 14, 15, 16} + + for _, version := range versions { + v := version // loop capture + + ctx := context.Background() + c, cfg := test.StartPG(t, ctx, test.StartPGOpts{ + Version: v, + DisableLogicalReplication: true, + DisableCreateSlot: true, + }) + + t.Run("It succeeds with the wrong password", func(t *testing.T) { + _, err := NewInitializer(ctx, InitializerOpts{ + AdminConfig: cfg, + }) + require.NoError(t, err) + }) + + cfg.Password = "whatever my guy" + + t.Run("It errors with the wrong password", func(t *testing.T) { + _, err := NewInitializer(ctx, InitializerOpts{ + AdminConfig: cfg, + }) + require.Error(t, err) + }) + + _ = c.Stop(ctx, nil) + } +} diff --git a/pkg/replicator/pgreplicator/pg.go b/pkg/replicator/pgreplicator/pg.go index 3fa4972..71beff0 100644 --- a/pkg/replicator/pgreplicator/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -25,16 +25,6 @@ import ( var ( ReadTimeout = time.Second * 5 CommitInterval = time.Second * 5 - - ErrInvalidCredentials = fmt.Errorf("TODO") - - ErrConnectionTimeout = fmt.Errorf("TODO") - - ErrLogicalReplicationNotSetUp = fmt.Errorf("ERR_PG_001: Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.") - - ErrReplicationSlotNotFound = fmt.Errorf("ERR_PG_002: The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.") - - ErrReplicationAlreadyRunning = fmt.Errorf("ERR_PG_901: Replication is already streaming events") ) // PostgresReplicator is a Replicator with added postgres functionality. diff --git a/pkg/replicator/pgreplicator/pgsetup/pgsetup.go b/pkg/replicator/pgreplicator/pgsetup/pgsetup.go index d17f67d..0dea457 100644 --- a/pkg/replicator/pgreplicator/pgsetup/pgsetup.go +++ b/pkg/replicator/pgreplicator/pgsetup/pgsetup.go @@ -8,13 +8,31 @@ import ( "github.com/inngest/dbcap/pkg/consts/pgconsts" "github.com/inngest/dbcap/pkg/replicator" + "github.com/inngest/inngest/pkg/syscode" "github.com/jackc/pgx/v5" ) var ( - ErrLogicalReplicationNotSetUp = fmt.Errorf("ERR_PG_001: Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.") - ErrReplicationSlotNotFound = fmt.Errorf("ERR_PG_002: The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.") - ErrReplicationAlreadyRunning = fmt.Errorf("ERR_PG_901: Replication is already streaming events") + ErrInvalidCredentials = syscode.Error{ + Code: "ERR_PG_001", + Message: "Invalid credentials. We cannot connect to your database.", + } + ErrCannotCommunicate = syscode.Error{ + Code: "ERR_PG_002", + Message: "Cannot communicate with your database.", + } + ErrLogicalReplicationNotSetUp = syscode.Error{ + Code: "ERR_PG_003", + Message: "Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.", + } + ErrReplicationSlotNotFound = syscode.Error{ + Code: "ERR_PG_004", + Message: "The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.", + } + ErrReplicationAlreadyRunning = syscode.Error{ + Code: "ERR_PG_901", + Message: "Replication is already streaming events", + } ) type TestConnResult struct {