Skip to content

Commit

Permalink
Reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Oct 15, 2024
1 parent 0dcc4eb commit 279c5cb
Showing 1 changed file with 47 additions and 20 deletions.
67 changes: 47 additions & 20 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
replConfig := cfg.Copy()
replConfig.RuntimeParams["replication"] = "database"
// And for schema inspection, ensure this is never set.
schemaConfig := opts.Config.Copy()
delete(schemaConfig.RuntimeParams, "replication")

// Connect using pgconn for replication. This is a prerequisite, as
// replication uses different client connection parameters to enable specific
Expand All @@ -87,34 +85,33 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
return nil, fmt.Errorf("error connecting to postgres host for replication: %w", err)
}

pgxc, err := pgx.ConnectConfig(ctx, schemaConfig)
if err != nil {
return nil, fmt.Errorf("error connecting to postgres host for schemas: %w", err)
p := &pg{
opts: opts,
conn: replConn,
queryLock: &sync.Mutex{},
log: opts.Log,
heartbeatTime: DefaultHeartbeatTime,
}

if err := p.createQueryConn(ctx); err != nil {
return nil, err
}

// Query for current postgres version.
var version int
row := pgxc.QueryRow(ctx, "SELECT current_setting('server_version_num')::int / 10000;")
if err := row.Scan(&version); err != nil {
row := p.queryConn.QueryRow(ctx, "SELECT current_setting('server_version_num')::int / 10000;")
if err := row.Scan(&p.version); err != nil {
opts.Log.Warn("error querying for postgres version", "error", err)
}

sl := schema.NewPGXSchemaLoader(pgxc)
sl := schema.NewPGXSchemaLoader(p.queryConn)
// Refresh all schemas to begin with
if err := sl.Refresh(); err != nil {
return nil, err
}

return &pg{
opts: opts,
conn: replConn,
queryConn: pgxc,
queryLock: &sync.Mutex{},
decoder: decoder.NewV1LogicalDecoder(sl, opts.Log, version >= pgconsts.MessagesVersion),
log: opts.Log,
version: version,
heartbeatTime: DefaultHeartbeatTime,
}, nil
p.decoder = decoder.NewV1LogicalDecoder(sl, opts.Log, p.version >= pgconsts.MessagesVersion)

return p, nil
}

type pg struct {
Expand Down Expand Up @@ -149,12 +146,35 @@ type pg struct {
stopped int32
}

func (p *pg) createQueryConn(ctx context.Context) error {
p.queryLock.Lock()
defer p.queryLock.Unlock()

if p.queryConn != nil && !p.queryConn.IsClosed() {
_ = p.queryConn.Close(ctx)
}

// And for schema inspection, ensure this is never set.
schemaConfig := p.opts.Config.Copy()
delete(schemaConfig.RuntimeParams, "replication")

// Connect using pgconn for replication. This is a prerequisite, as
pgxc, err := pgx.ConnectConfig(ctx, schemaConfig)
if err != nil {
return fmt.Errorf("error connecting to postgres host for schemas: %w", err)
}
p.queryConn = pgxc
return nil

}

func (p *pg) Stop() {
atomic.StoreInt32(&p.stopped, 1)
_ = p.Close(context.Background())
}

func (p *pg) Close(ctx context.Context) error {
atomic.StoreInt32(&p.stopped, 1)
_ = p.conn.Close(ctx)
_ = p.queryConn.Close(ctx)
return nil
Expand Down Expand Up @@ -253,7 +273,14 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {

t := time.NewTicker(p.heartbeatTime)
for range t.C {
if ctx.Err() != nil || p.queryConn.IsClosed() {
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err, "host", p.opts.Config.Host)
return
}
}

if atomic.LoadInt32(&p.stopped) == 1 || ctx.Err() != nil {
return
}

Expand Down

0 comments on commit 279c5cb

Please sign in to comment.