Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove long_polling #153

Merged
merged 11 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ the connector will return an error.

## Configuration Options

| name | description | required | default |
|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `tables` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"`. Using `*` will read from all public tables. | true | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| `logrepl.autoCleanup` | Whether or not to cleanup the replication slot and pub when connector is deleted | false | `true` |
| ~~`table`~~ | List of table names to read from, separated by comma. **Deprecated: use `tables` instead.** | false | |
| name | description | required | default |
|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `tables` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"`. Using `*` will read from all public tables. | true | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl`). | false | `auto` |
raulb marked this conversation as resolved.
Show resolved Hide resolved
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| `logrepl.autoCleanup` | Whether or not to cleanup the replication slot and pub when connector is deleted | false | `true` |
| ~~`table`~~ | List of table names to read from, separated by comma. **Deprecated: use `tables` instead.** | false | |

# Destination

Expand Down
29 changes: 5 additions & 24 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-connector-postgres/source"
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
"github.com/conduitio/conduit-connector-postgres/source/snapshot"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -85,16 +84,15 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {

// ensure we have keys for all tables
for _, tableName := range s.config.Tables {
s.tableKeys[tableName], err = s.getTableKeys(ctx, tableName)
s.tableKeys[tableName], err = s.getPrimaryKey(ctx, tableName)
if err != nil {
return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err)
return fmt.Errorf("failed to find primary key for table %s: %w", tableName, err)
}
}

switch s.config.CDCMode {
case source.CDCModeAuto:
// TODO add logic that checks if the DB supports logical replication and
// switches to long polling if it's not. For now use logical replication
// TODO add logic that checks if the DB supports logical replication (since that's the only thing we support at the moment)
fallthrough
case source.CDCModeLogrepl:
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Expand All @@ -109,23 +107,6 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
}
s.iterator = i
case source.CDCModeLongPolling:
logger.Warn().Msg("Long polling not supported yet, only snapshot is supported")
if s.config.SnapshotMode != source.SnapshotModeInitial {
// TODO create long polling iterator and pass snapshot mode in the config
logger.Warn().Msg("snapshot disabled, can't do anything right now")
return sdk.ErrUnimplemented
}

snap, err := snapshot.NewIterator(ctx, pool, snapshot.Config{
Tables: s.config.Tables,
TableKeys: s.tableKeys,
})
if err != nil {
return fmt.Errorf("failed to create long polling iterator: %w", err)
}

s.iterator = snap
default:
// shouldn't happen, config was validated
return fmt.Errorf("unsupported CDC mode %q", s.config.CDCMode)
Expand Down Expand Up @@ -209,9 +190,9 @@ func (s *Source) getAllTables(ctx context.Context) ([]string, error) {
return tables, nil
}

// getTableKeys queries the db for the name of the primary key column for a
// getPrimaryKey queries the db for the name of the primary key column for a
// table if one exists and returns it.
func (s *Source) getTableKeys(ctx context.Context, tableName string) (string, error) {
func (s *Source) getPrimaryKey(ctx context.Context, tableName string) (string, error) {
query := `SELECT c.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name)
Expand Down
9 changes: 1 addition & 8 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const (
CDCModeAuto CDCMode = "auto"
// CDCModeLogrepl uses logical replication to listen to changes.
CDCModeLogrepl CDCMode = "logrepl"
// CDCModeLongPolling uses long polling to listen to changes.
CDCModeLongPolling CDCMode = "long_polling"

// AllTablesWildcard can be used if you'd like to listen to all tables.
AllTablesWildcard = "*"
Expand All @@ -60,7 +58,7 @@ type Config struct {
// SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"`
// CDCMode determines how the connector should listen to changes.
CDCMode CDCMode `json:"cdcMode" validate:"inclusion=auto|logrepl|long_polling" default:"auto"`
CDCMode CDCMode `json:"cdcMode" validate:"inclusion=auto|logrepl" default:"auto"`

// LogreplPublicationName determines the publication name in case the
// connector uses logical replication to listen to changes (see CDCMode).
Expand Down Expand Up @@ -91,11 +89,6 @@ func (c Config) Validate() error {
if len(c.Tables) == 0 {
errs = append(errs, fmt.Errorf(`error validating "tables": %w`, config.ErrRequiredParameterMissing))
}

// TODO: when cdcMode "auto" is implemented, change this check
if len(c.Tables) != 1 && c.CDCMode == CDCModeLongPolling {
errs = append(errs, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table"))
}
return errors.Join(errs...)
}

Expand Down
8 changes: 0 additions & 8 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ func TestConfig_Validate(t *testing.T) {
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:[email protected]:5432/meroxadb",
Tables: []string{"table1", "table2"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
},
}
for _, tc := range testCases {
Expand Down
2 changes: 0 additions & 2 deletions source/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/conduitio/conduit-connector-postgres/source/logrepl"
"github.com/conduitio/conduit-connector-postgres/source/longpoll"
sdk "github.com/conduitio/conduit-connector-sdk"
)

Expand All @@ -36,5 +35,4 @@ type Iterator interface {

var (
_ Iterator = (*logrepl.CDCIterator)(nil)
_ Iterator = (*longpoll.SnapshotIterator)(nil)
)
2 changes: 1 addition & 1 deletion source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewCombinedIterator(ctx context.Context, pool *pgxpool.Pool, conf Config) (
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("failed to validate longrepl config: %w", err)
return nil, fmt.Errorf("failed to validate logrepl config: %w", err)
}

c := &CombinedIterator{
Expand Down
200 changes: 0 additions & 200 deletions source/longpoll/snapshot.go

This file was deleted.

Loading