Skip to content

Commit

Permalink
handle conn closed in heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Oct 15, 2024
1 parent ef02a7d commit 0dcc4eb
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {

t := time.NewTicker(p.heartbeatTime)
for range t.C {
if ctx.Err() != nil {
if ctx.Err() != nil || p.queryConn.IsClosed() {
return
}

Expand All @@ -262,14 +262,18 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()

if isConnClosedErr(err) && p.queryConn.IsClosed() {
return
}

if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host)
}
}
}()

for {
if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 {
if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() {
// Always call Close automatically.
p.Close(ctx)
return nil
Expand Down Expand Up @@ -501,3 +505,7 @@ func standardizeErr(err error) (bool, error) {
}
return false, err
}

func isConnClosedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "conn closed")
}

0 comments on commit 0dcc4eb

Please sign in to comment.