Skip to content

Commit

Permalink
WIP - add timeout to replicate_api
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Jan 14, 2025
1 parent 95692a2 commit afe1503
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions broker/replicate_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"fmt"
"io"
"net"
"time"

log "github.com/sirupsen/logrus"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

// Replicate dispatches the JournalServer.Replicate API.
Expand Down Expand Up @@ -65,6 +68,14 @@ func (svc *Service) Replicate(claims pb.Claims, stream pb.Journal_ReplicateServe
case <-resolved.invalidateCh: // Replica assignments changed.
addTrace(ctx, " ... resolution was invalidated")
// Loop to retry.
case <-time.After(time.Second):
// Use an aggressive timeout of the spool to prevent deadlocks
// during rapid topology changes.
//
// This returned error will bubble up to cancel the peer's
// replication pipeline, causing it to release resources and
// allowing another append pipeline to make progress.
return status.Error(codes.Unavailable, "timeout awaiting replica spool")
}
}

Expand Down

0 comments on commit afe1503

Please sign in to comment.