diff --git a/pkg/replicator/pgreplicator/pg.go b/pkg/replicator/pgreplicator/pg.go index a0c1967..4e50d62 100644 --- a/pkg/replicator/pgreplicator/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -66,7 +66,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) { if opts.Log == nil { opts.Log = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ Level: slog.LevelInfo, - })) + })).With("host", opts.Config.Host) } cfg := opts.Config @@ -256,6 +256,8 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { return err } + p.log.Debug("connected to replication slot") + // Postgres batches every individual insert, update, etc. within a BEGIN/COMMIT message. // This is great for replication. However, for Inngest events, we don't want superflous begin // or commit messages as events. @@ -268,33 +270,45 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { go func() { if p.version < pgconsts.MessagesVersion { // doesn't support wal messages; ignore. + p.log.Debug("heartbeat not supported", "pg_version", p.version) return } + // Send a hearbeat immediately. + if err := p.heartbeat(ctx); err != nil { + p.log.Warn("unable to emit immediate heartbeat", "error", err) + } + t := time.NewTicker(p.heartbeatTime) - for range t.C { - if p.queryConn.IsClosed() { - if err := p.createQueryConn(ctx); err != nil { - p.log.Error("error reconnecting for heartbeat", "error", err, "host", p.opts.Config.Host) + doneCheck := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-doneCheck.C: + // Check for the stopped signal internally every second. This lets us log + // the stopped message relatively close to the stop signal occurring. + if atomic.LoadInt32(&p.stopped) == 1 { + p.log.Debug("stopping heartbeat", "ctx_err", ctx.Err(), "stopped", atomic.LoadInt32(&p.stopped)) return } - } - - if atomic.LoadInt32(&p.stopped) == 1 || ctx.Err() != nil { - return - } - - // Send a hearbeat every minute - p.queryLock.Lock() - _, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);") - p.queryLock.Unlock() - - if isConnClosedErr(err) && p.queryConn.IsClosed() { continue - } + case <-t.C: + if p.queryConn.IsClosed() { + if err := p.createQueryConn(ctx); err != nil { + p.log.Error("error reconnecting for heartbeat", "error", err) + return + } + } + + // Send a hearbeat every minute + err := p.heartbeat(ctx) + if err != nil { + p.log.Warn("unable to emit heartbeat", "error", err) + continue + } - if err != nil { - p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host) + p.log.Debug("sent heartbeat", "error", err) } } }() @@ -302,19 +316,23 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { for { if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() { // Always call Close automatically. + p.log.Debug("stopping cdc connection", "conn_closed", p.conn.IsClosed()) p.Close(ctx) return nil } changes, err := p.fetch(ctx) if err != nil { + p.log.Warn("error pulling messages", "error", err) return err } if changes == nil { + p.log.Debug("no messages pulled") continue } if changes.Operation == changeset.OperationHeartbeat { + p.log.Debug("heartbeat pulled") p.Commit(changes.Watermark) if err := p.forceNextReport(ctx); err != nil { p.log.Warn("unable to report lsn on heartbeat", "error", err, "host", p.opts.Config.Host) @@ -322,10 +340,21 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { continue } + p.log.Debug("message pulled", "op", changes.Operation) + unwrapper.Process(changes) } } +func (p *pg) heartbeat(ctx context.Context) error { + // Send a hearbeat every minute + p.queryLock.Lock() + _, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);") + p.queryLock.Unlock() + return err + +} + func (p *pg) fetch(ctx context.Context) (*changeset.Changeset, error) { var err error @@ -457,6 +486,9 @@ func (p *pg) report(ctx context.Context, forceReply bool) error { if lsn == 0 { return nil } + + p.log.Debug("reporting lsn to source", "lsn", p.LSN().String()) + err := pglogrepl.SendStandbyStatusUpdate(ctx, p.conn.PgConn(), pglogrepl.StandbyStatusUpdate{ @@ -533,6 +565,6 @@ func standardizeErr(err error) (bool, error) { return false, err } -func isConnClosedErr(err error) bool { +func IsConnClosedErr(err error) bool { return err != nil && strings.Contains(err.Error(), "conn closed") }