From 061b56b73887d6b390ba2ab4312c45ddc6c4a5cf Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Fri, 20 Sep 2024 22:56:10 +0000 Subject: [PATCH] Commit latest LSNs and only load LSN from saver if watermark is non-nil --- pkg/eventwriter/callback_writer.go | 17 ++++++++--------- pkg/replicator/pgreplicator/pg.go | 5 ++++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/eventwriter/callback_writer.go b/pkg/eventwriter/callback_writer.go index 1614e76..fa47c76 100644 --- a/pkg/eventwriter/callback_writer.go +++ b/pkg/eventwriter/callback_writer.go @@ -55,10 +55,13 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm select { case <-ctx.Done(): + if i == 0 { + // Do nothing. + return + } + // Shutting down. Send the existing batch. - if err := a.onChangeset(buf); err != nil { - // TODO: Fail. What do we do here? - } else { + if err := a.onChangeset(buf); err == nil { committer.Commit(buf[i-1].Watermark) } return @@ -70,9 +73,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm } // We have events after a timeout - send them. - if err := a.onChangeset(buf); err != nil { - // TODO: Fail. What do we do here? - } else { + if err := a.onChangeset(buf); err == nil { // Commit the last LSN. committer.Commit(buf[i-1].Watermark) } @@ -83,9 +84,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm case msg := <-a.cs: if i == a.batchSize { // send this batch, as we're full. - if err := a.onChangeset(buf); err != nil { - // TODO: Fail. What do we do here? - } else { + if err := a.onChangeset(buf); err == nil { committer.Commit(buf[i-1].Watermark) } // reset the buffer diff --git a/pkg/replicator/pgreplicator/pg.go b/pkg/replicator/pgreplicator/pg.go index 71beff0..d115f6e 100644 --- a/pkg/replicator/pgreplicator/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -96,6 +96,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) { } return &pg{ + opts: opts, conn: replConn, queryConn: pgxc, decoder: decoder.NewV1LogicalDecoder(sl), @@ -195,7 +196,9 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error { if err != nil { return fmt.Errorf("error loading watermark: %w", err) } - startLSN = watermark.LSN + if watermark != nil && watermark.LSN > 0 { + startLSN = watermark.LSN + } } if err := p.Connect(ctx, pglogrepl.LSN(startLSN)); err != nil {