Skip to content

Commit

Permalink
Send heartbeats immedately when pulling messages (#5)
Browse files Browse the repository at this point in the history
This ensures there's some DB activity immediately after connecting to
stream WAL changes.  For some hosts, this is required.
  • Loading branch information
tonyhb authored Oct 16, 2024
1 parent 8fe5f77 commit 334036d
Showing 1 changed file with 53 additions and 21 deletions.
74 changes: 53 additions & 21 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
if opts.Log == nil {
opts.Log = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
})).With("host", opts.Config.Host)
}

cfg := opts.Config
Expand Down Expand Up @@ -256,6 +256,8 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
return err
}

p.log.Debug("connected to replication slot")

// Postgres batches every individual insert, update, etc. within a BEGIN/COMMIT message.
// This is great for replication. However, for Inngest events, we don't want superflous begin
// or commit messages as events.
Expand All @@ -268,64 +270,91 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
go func() {
if p.version < pgconsts.MessagesVersion {
// doesn't support wal messages; ignore.
p.log.Debug("heartbeat not supported", "pg_version", p.version)
return
}

// Send a hearbeat immediately.
if err := p.heartbeat(ctx); err != nil {
p.log.Warn("unable to emit immediate heartbeat", "error", err)
}

t := time.NewTicker(p.heartbeatTime)
for range t.C {
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err, "host", p.opts.Config.Host)
doneCheck := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-doneCheck.C:
// Check for the stopped signal internally every second. This lets us log
// the stopped message relatively close to the stop signal occurring.
if atomic.LoadInt32(&p.stopped) == 1 {
p.log.Debug("stopping heartbeat", "ctx_err", ctx.Err(), "stopped", atomic.LoadInt32(&p.stopped))
return
}
}

if atomic.LoadInt32(&p.stopped) == 1 || ctx.Err() != nil {
return
}

// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()

if isConnClosedErr(err) && p.queryConn.IsClosed() {
continue
}
case <-t.C:
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err)
return
}
}

// Send a hearbeat every minute
err := p.heartbeat(ctx)
if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err)
continue
}

if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host)
p.log.Debug("sent heartbeat", "error", err)
}
}
}()

for {
if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() {
// Always call Close automatically.
p.log.Debug("stopping cdc connection", "conn_closed", p.conn.IsClosed())
p.Close(ctx)
return nil
}

changes, err := p.fetch(ctx)
if err != nil {
p.log.Warn("error pulling messages", "error", err)
return err
}
if changes == nil {
p.log.Debug("no messages pulled")
continue
}

if changes.Operation == changeset.OperationHeartbeat {
p.log.Debug("heartbeat pulled")
p.Commit(changes.Watermark)
if err := p.forceNextReport(ctx); err != nil {
p.log.Warn("unable to report lsn on heartbeat", "error", err, "host", p.opts.Config.Host)
}
continue
}

p.log.Debug("message pulled", "op", changes.Operation)

unwrapper.Process(changes)
}
}

func (p *pg) heartbeat(ctx context.Context) error {
// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()
return err

}

func (p *pg) fetch(ctx context.Context) (*changeset.Changeset, error) {
var err error

Expand Down Expand Up @@ -457,6 +486,9 @@ func (p *pg) report(ctx context.Context, forceReply bool) error {
if lsn == 0 {
return nil
}

p.log.Debug("reporting lsn to source", "lsn", p.LSN().String())

err := pglogrepl.SendStandbyStatusUpdate(ctx,
p.conn.PgConn(),
pglogrepl.StandbyStatusUpdate{
Expand Down Expand Up @@ -533,6 +565,6 @@ func standardizeErr(err error) (bool, error) {
return false, err
}

func isConnClosedErr(err error) bool {
func IsConnClosedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "conn closed")
}

0 comments on commit 334036d

Please sign in to comment.