Skip to content

Commit

Permalink
Add syscodes, move creds checking into NewInitializer
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Sep 16, 2024
1 parent d9caa3c commit c87957b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 17 deletions.
22 changes: 18 additions & 4 deletions pkg/replicator/pgreplicator/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
"github.com/jackc/pgx/v5"
)

var (
ErrInvalidCredentials = pgsetup.ErrInvalidCredentials
ErrCannotCommunicate = pgsetup.ErrCannotCommunicate
ErrLogicalReplicationNotSetUp = pgsetup.ErrLogicalReplicationNotSetUp
ErrReplicationSlotNotFound = pgsetup.ErrReplicationSlotNotFound
ErrReplicationAlreadyRunning = pgsetup.ErrReplicationAlreadyRunning
)

type InitializeResult = pgsetup.TestConnResult

type InitializerOpts struct {
Expand All @@ -19,10 +27,16 @@ type InitializerOpts struct {
Password string
}

func NewInitializer(ctx context.Context, opts InitializerOpts) replicator.SystemInitializer[InitializeResult] {
// TODO: Immediatey

return initializer[pgsetup.TestConnResult]{opts: opts}
func NewInitializer(ctx context.Context, opts InitializerOpts) (replicator.SystemInitializer[InitializeResult], error) {
conn, err := pgx.ConnectConfig(ctx, &opts.AdminConfig)
if err != nil {
return nil, ErrInvalidCredentials
}
defer conn.Close(ctx)
if err := conn.Ping(ctx); err != nil {
return nil, ErrCannotCommunicate
}
return initializer[pgsetup.TestConnResult]{opts: opts}, nil
}

type initializer[T pgsetup.TestConnResult] struct {
Expand Down
43 changes: 43 additions & 0 deletions pkg/replicator/pgreplicator/initializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pgreplicator

import (
"context"
"testing"

"github.com/inngest/dbcap/internal/test"
"github.com/stretchr/testify/require"
)

func TestNewInitializer(t *testing.T) {
t.Parallel()
versions := []int{10, 11, 12, 13, 14, 15, 16}

for _, version := range versions {
v := version // loop capture

ctx := context.Background()
c, cfg := test.StartPG(t, ctx, test.StartPGOpts{
Version: v,
DisableLogicalReplication: true,
DisableCreateSlot: true,
})

t.Run("It succeeds with the wrong password", func(t *testing.T) {
_, err := NewInitializer(ctx, InitializerOpts{
AdminConfig: cfg,
})
require.NoError(t, err)
})

cfg.Password = "whatever my guy"

t.Run("It errors with the wrong password", func(t *testing.T) {
_, err := NewInitializer(ctx, InitializerOpts{
AdminConfig: cfg,
})
require.Error(t, err)
})

_ = c.Stop(ctx, nil)
}
}
10 changes: 0 additions & 10 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@ import (
var (
ReadTimeout = time.Second * 5
CommitInterval = time.Second * 5

ErrInvalidCredentials = fmt.Errorf("TODO")

ErrConnectionTimeout = fmt.Errorf("TODO")

ErrLogicalReplicationNotSetUp = fmt.Errorf("ERR_PG_001: Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.")

ErrReplicationSlotNotFound = fmt.Errorf("ERR_PG_002: The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.")

ErrReplicationAlreadyRunning = fmt.Errorf("ERR_PG_901: Replication is already streaming events")
)

// PostgresReplicator is a Replicator with added postgres functionality.
Expand Down
24 changes: 21 additions & 3 deletions pkg/replicator/pgreplicator/pgsetup/pgsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,31 @@ import (

"github.com/inngest/dbcap/pkg/consts/pgconsts"
"github.com/inngest/dbcap/pkg/replicator"
"github.com/inngest/inngest/pkg/syscode"
"github.com/jackc/pgx/v5"
)

var (
ErrLogicalReplicationNotSetUp = fmt.Errorf("ERR_PG_001: Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.")
ErrReplicationSlotNotFound = fmt.Errorf("ERR_PG_002: The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.")
ErrReplicationAlreadyRunning = fmt.Errorf("ERR_PG_901: Replication is already streaming events")
ErrInvalidCredentials = syscode.Error{
Code: "ERR_PG_001",
Message: "Invalid credentials. We cannot connect to your database.",
}
ErrCannotCommunicate = syscode.Error{
Code: "ERR_PG_002",
Message: "Cannot communicate with your database.",
}
ErrLogicalReplicationNotSetUp = syscode.Error{
Code: "ERR_PG_003",
Message: "Your database does not have logical replication configured. You must set the WAL level to 'logical' to stream events.",
}
ErrReplicationSlotNotFound = syscode.Error{
Code: "ERR_PG_004",
Message: "The replication slot 'inngest_cdc' doesn't exist in your database. Please create the logical replication slot to stream events.",
}
ErrReplicationAlreadyRunning = syscode.Error{
Code: "ERR_PG_901",
Message: "Replication is already streaming events",
}
)

type TestConnResult struct {
Expand Down

0 comments on commit c87957b

Please sign in to comment.