Skip to content

Commit

Permalink
Commit latest LSNs and only load LSN from saver if watermark is non-nil
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Sep 20, 2024
1 parent bae6446 commit 061b56b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
17 changes: 8 additions & 9 deletions pkg/eventwriter/callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm

select {
case <-ctx.Done():
if i == 0 {
// Do nothing.
return
}

// Shutting down. Send the existing batch.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
if err := a.onChangeset(buf); err == nil {
committer.Commit(buf[i-1].Watermark)
}
return
Expand All @@ -70,9 +73,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
}

// We have events after a timeout - send them.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
if err := a.onChangeset(buf); err == nil {
// Commit the last LSN.
committer.Commit(buf[i-1].Watermark)
}
Expand All @@ -83,9 +84,7 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
case msg := <-a.cs:
if i == a.batchSize {
// send this batch, as we're full.
if err := a.onChangeset(buf); err != nil {
// TODO: Fail. What do we do here?
} else {
if err := a.onChangeset(buf); err == nil {
committer.Commit(buf[i-1].Watermark)
}
// reset the buffer
Expand Down
5 changes: 4 additions & 1 deletion pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
}

return &pg{
opts: opts,
conn: replConn,
queryConn: pgxc,
decoder: decoder.NewV1LogicalDecoder(sl),
Expand Down Expand Up @@ -195,7 +196,9 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
if err != nil {
return fmt.Errorf("error loading watermark: %w", err)
}
startLSN = watermark.LSN
if watermark != nil && watermark.LSN > 0 {
startLSN = watermark.LSN
}
}

if err := p.Connect(ctx, pglogrepl.LSN(startLSN)); err != nil {
Expand Down

0 comments on commit 061b56b

Please sign in to comment.