Skip to content

Commit

Permalink
pgcdc: move all pg conn usage to a single goroutine
Browse files Browse the repository at this point in the history
s.pgConn is not safe to use in multiple goroutines, so move all it's
usage to the single goroutine that is emitting changes.

This also allows us to do the annoying "remapping LSN for last message
in a txn" locally which simplifies a lot of things and also should speed
stuff up because we no longer need to wait for a txn to be acked before
moving on to the next one.
  • Loading branch information
rockwotj committed Dec 3, 2024
1 parent 7b3d8ea commit 94b1ea1
Showing 1 changed file with 88 additions and 43 deletions.
131 changes: 88 additions & 43 deletions internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/Jeffail/shutdown"
Expand All @@ -36,7 +37,9 @@ type Stream struct {

shutSig *shutdown.Signaller

clientXLogPos LSN
ackedLSNMu sync.Mutex
// The LSN acked by the stream, we may not have acked this to postgres yet (ack, ack, ack)
ackedLSN LSN

standbyMessageTimeout time.Duration
nextStandbyMessageDeadline time.Time
Expand Down Expand Up @@ -209,7 +212,9 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {
} else {
lsnrestart, _ = ParseLSN(confirmedLSNFromDB)
}
stream.clientXLogPos = lsnrestart
if lsnrestart > 0 {
stream.ackedLSN = lsnrestart - 1
}

stream.standbyMessageTimeout = config.PgStandbyTimeout
stream.nextStandbyMessageDeadline = time.Now().Add(stream.standbyMessageTimeout)
Expand All @@ -225,7 +230,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {
}
})

stream.logger.Debugf("starting stream from LSN %s with clientXLogPos %s and snapshot name %s", lsnrestart.String(), stream.clientXLogPos.String(), stream.snapshotName)
stream.logger.Debugf("starting stream from LSN %s with snapshot name %s", lsnrestart.String(), stream.snapshotName)
// TODO(le-vlad): if snapshot processing is restarted we will just skip right to streaming...
if !freshlyCreatedSlot || !config.StreamOldData {
if err = stream.startLr(ctx, lsnrestart); err != nil {
Expand All @@ -234,7 +239,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {

go func() {
defer stream.shutSig.TriggerHasStopped()
if err := stream.streamMessages(); err != nil {
if err := stream.streamMessages(lsnrestart); err != nil {
stream.errors <- fmt.Errorf("logical replication stream error: %w", err)
}
}()
Expand All @@ -250,7 +255,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {
stream.errors <- fmt.Errorf("failed to start logical replication: %w", err)
return
}
if err := stream.streamMessages(); err != nil {
if err := stream.streamMessages(lsnrestart); err != nil {
stream.errors <- fmt.Errorf("logical replication stream error: %w", err)
}
}()
Expand Down Expand Up @@ -287,55 +292,76 @@ func (s *Stream) startLr(ctx context.Context, lsnStart LSN) error {
// AckLSN acknowledges the LSN up to which the stream has processed the messages.
// This makes Postgres to remove the WAL files that are no longer needed.
func (s *Stream) AckLSN(ctx context.Context, lsn string) error {
parsed, err := ParseLSN(lsn)
if err != nil {
return fmt.Errorf("unable to parse LSN: %w", err)
}
s.ackedLSNMu.Lock()
defer s.ackedLSNMu.Unlock()
if s.shutSig.IsHardStopSignalled() {
return fmt.Errorf("unable to ack LSN %s stream shutting down", lsn)
}
clientXLogPos, err := ParseLSN(lsn)
if err != nil {
return err
}
s.ackedLSN = parsed
return nil
}

err = SendStandbyStatusUpdate(
func (s *Stream) getAckedLSN() LSN {
s.ackedLSNMu.Lock()
ackedLSN := s.ackedLSN
s.ackedLSNMu.Unlock()
return ackedLSN
}

func (s *Stream) commitAckedLSN(ctx context.Context, lsn LSN) error {
err := SendStandbyStatusUpdate(
ctx,
s.pgConn,
StandbyStatusUpdate{
WALWritePosition: clientXLogPos + 1,
WALWritePosition: lsn + 1,
ReplyRequested: true,
},
)

if err != nil {
return fmt.Errorf("failed to send Standby status message at LSN %s: %w", clientXLogPos.String(), err)
return fmt.Errorf("failed to send standby status message at LSN %s: %w", lsn, err)
}

// Update client XLogPos after we ack the message
s.clientXLogPos = clientXLogPos
s.logger.Debugf("Sent Standby status message at LSN#%s", clientXLogPos.String())
s.nextStandbyMessageDeadline = time.Now().Add(s.standbyMessageTimeout)

return nil
}

func (s *Stream) streamMessages() error {
func (s *Stream) streamMessages(currentLSN LSN) error {
relations := map[uint32]*RelationMessage{}
typeMap := pgtype.NewMap()
// If we don't stream commit messages we could not ack them, which means postgres will replay the whole transaction
// so if we're at the end of a stream and we get an ack for the last message in a txn, we need to ack the txn not the
// last message.
lastEmittedLSN := currentLSN
lastEmittedCommitLSN := currentLSN

commitLSN := func(force bool) error {
ctx, _ := s.shutSig.HardStopCtx(context.Background())
ackedLSN := s.getAckedLSN()
if ackedLSN == lastEmittedLSN {
ackedLSN = lastEmittedCommitLSN
}
if force || ackedLSN > currentLSN {
if err := s.commitAckedLSN(ctx, ackedLSN); err != nil {
return err
}
// Update the currentLSN
currentLSN = ackedLSN
}
return nil
}
defer func() {
if err := commitLSN(false); err != nil {
s.logger.Errorf("unable to acknowledge LSN on stream shutdown: %v", err)
}
}()

ctx, _ := s.shutSig.SoftStopCtx(context.Background())
for !s.shutSig.IsSoftStopSignalled() {
if time.Now().After(s.nextStandbyMessageDeadline) {
pos := s.clientXLogPos
err := SendStandbyStatusUpdate(
ctx,
s.pgConn,
StandbyStatusUpdate{
WALWritePosition: pos + 1,
},
)
if err != nil {
return fmt.Errorf("unable to send standby status message at LSN %s: %w", pos, err)
}
s.logger.Debugf("Sent Standby status message at LSN#%s", pos.String())
s.nextStandbyMessageDeadline = time.Now().Add(s.standbyMessageTimeout)
if err := commitLSN(time.Now().After(s.nextStandbyMessageDeadline)); err != nil {
return err
}
recvCtx, cancel := context.WithDeadline(ctx, s.nextStandbyMessageDeadline)
rawMsg, err := s.pgConn.ReceiveMessage(recvCtx)
Expand Down Expand Up @@ -380,42 +406,61 @@ func (s *Stream) streamMessages() error {
if err != nil {
return fmt.Errorf("failed to parse XLogData: %w", err)
}
clientXLogPos := xld.WALStart + LSN(len(xld.WALData))
err = s.processChange(ctx, clientXLogPos, xld, relations, typeMap)
msgLSN := xld.WALStart + LSN(len(xld.WALData))
result, err := s.processChange(ctx, msgLSN, xld, relations, typeMap)
if err != nil {
return fmt.Errorf("decoding postgres changes failed: %w", err)
}
// See the explaination above about lastEmittedCommitLSN but if this is a commit message, we want to
// only remap the commit of the last message in a transaction, so only update the remapped value if
// it was a suppressed commit, otherwise we just provide a noop mapping of commit LSN
if result == changeResultSuppressedCommitMessage {
lastEmittedCommitLSN = msgLSN
} else if result == changeResultEmittedMessage {
lastEmittedLSN = msgLSN
lastEmittedCommitLSN = msgLSN
}
}
}
// clean shutdown, return nil
return nil
}

type processChangeResult int

const (
changeResultNoMessage = 0
changeResultSuppressedCommitMessage = 1
changeResultEmittedMessage = 2
)

// Handle handles the pgoutput output
func (s *Stream) processChange(ctx context.Context, clientXLogPos LSN, xld XLogData, relations map[uint32]*RelationMessage, typeMap *pgtype.Map) error {
func (s *Stream) processChange(ctx context.Context, msgLSN LSN, xld XLogData, relations map[uint32]*RelationMessage, typeMap *pgtype.Map) (processChangeResult, error) {
// parse changes inside the transaction
message, err := decodePgOutput(xld.WALData, relations, typeMap)
if err != nil {
return err
return changeResultNoMessage, err
}
if message == nil {
return nil
return changeResultNoMessage, nil
}

if !s.includeTxnMarkers {
switch message.Operation {
case BeginOpType, CommitOpType:
return nil
case CommitOpType:
return changeResultSuppressedCommitMessage, nil
case BeginOpType:
return changeResultNoMessage, nil
}
}

lsn := clientXLogPos.String()
lsn := msgLSN.String()
message.LSN = &lsn
select {
case s.messages <- *message:
return nil
return changeResultEmittedMessage, nil
case <-ctx.Done():
return ctx.Err()
return changeResultNoMessage, ctx.Err()
}
}

Expand Down

0 comments on commit 94b1ea1

Please sign in to comment.