Skip to content

Commit

Permalink
broker: apply the flush interval when evaluating journal suspension
Browse files Browse the repository at this point in the history
Previously, a journal was eliglble for suspension as soon as all of its
fragments were persisted.

Though this is safe and correct, it doesn't account for raced evaluations
in the broader context of a busy cluster with other topology changes and
journal churn. Specifically, a journal with regular appends may be
temporarily eligble for suspension only because its toplogy was recently
changed, causing its fragments to be flushed.

Worse, the journal's suspension (and soon, re-activation) may trigger churn
which causes *other* journals to flush and themselves become suspended.

Groups of journals may also have correlated write schedules, such as journals
which are written once daily at midnight. We'd like to avoid thundering
herds of journal suspensions and re-activations that might impact other
journals due to assignment churn.

To resolve this, prevent a journal from being suspended until its
fragments are fully remote AND are at-least as old as its configured
flush interval (or 24h, if not defined).
  • Loading branch information
jgraettinger committed Jan 18, 2025
1 parent f510dce commit 5277ddd
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
20 changes: 19 additions & 1 deletion broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"hash"
"io"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -435,9 +436,26 @@ func (b *appendFSM) onValidatePreconditions() {
// reflected in our index, if a commit wasn't accepted by all peers.
// Such writes are reported as failed to the client and are retried
// (this failure mode is what makes journals at-least-once).
var indexMin, indexMax, indexDirty = b.resolved.replica.index.OffsetRange()
var indexMin, indexMax, indexModTime = b.resolved.replica.index.Summary()
var suspend = b.resolved.journalSpec.Suspend

// The index is "clean" if all fragments have been remote for the journal's
// flush interval, where the flush interval is interpreted as an upper-bound
// expectation of the period between appends if the journal remains "in use".
// Thus, if a journal doesn't recieve an append for more than its interval,
// it's presumed to be idle and is eligible for suspension.
//
// To see why, consider a group of journals which are appended to at midnight,
// configured with a 24h flush interval. These journals will not auto-suspend
// ordinarily. If we instead used a more aggressive policy, they might trigger
// storms of suspensions and re-activations which could impact other journals
// due to assignment churn.
var flushInterval = int64(b.resolved.journalSpec.Fragment.FlushInterval.Seconds())
if flushInterval == 0 {
flushInterval = 24 * 60 * 60 // Default to 24h.
}
var indexDirty = indexModTime != 0 && indexModTime < time.Now().Unix()-flushInterval

var maxOffset = b.pln.spool.End
if indexMax > maxOffset {
maxOffset = indexMax
Expand Down
12 changes: 8 additions & 4 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
}
}

// OffsetRange returns the [Begin, End) offset range of all Fragments in the index,
// and a boolean indicating if the index has local-only fragments.
func (fi *Index) OffsetRange() (int64, int64, bool) {
// Summary returns the [Begin, End) offset range of all Fragments in the index,
// and the persisted ModTime of the last Fragment (or zero, if it's local).
func (fi *Index) Summary() (int64, int64, int64) {
defer fi.mu.RUnlock()
fi.mu.RLock()

return fi.set.BeginOffset(), fi.set.EndOffset(), len(fi.local) != 0
if l := len(fi.set); l == 0 {
return 0, 0, 0
} else {
return fi.set[0].Begin, fi.set[l-1].End, fi.set[l-1].ModTime
}
}

// SpoolCommit adds local Spool Fragment |frag| to the index.
Expand Down
4 changes: 2 additions & 2 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
<-ind.FirstRefreshCh()

c.Check(ind.set, gc.HasLen, 3)
var bo, eo, _ = ind.OffsetRange()
var bo, eo, _ = ind.Summary()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x255))

Expand All @@ -306,7 +306,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
ind.ReplaceRemote(set)

c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected.
bo, eo, _ = ind.OffsetRange()
bo, eo, _ = ind.Summary()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x555))

Expand Down
4 changes: 2 additions & 2 deletions broker/replicate_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestReplicateStreamAndCommit(t *testing.T) {
require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6}))

// Precondition: content not observable in the Fragment index.
var _, eo, _ = broker.replica("a/journal").index.OffsetRange()
var _, eo, _ = broker.replica("a/journal").index.Summary()
require.Equal(t, int64(0), eo)

// Commit.
Expand All @@ -58,7 +58,7 @@ func TestReplicateStreamAndCommit(t *testing.T) {
expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK})

// Post-condition: content is now observable.
_, eo, _ = broker.replica("a/journal").index.OffsetRange()
_, eo, _ = broker.replica("a/journal").index.Summary()
require.Equal(t, int64(13), eo)

// Send EOF and expect its returned.
Expand Down

0 comments on commit 5277ddd

Please sign in to comment.