diff --git a/broker/append_fsm.go b/broker/append_fsm.go index f8b49023..68ea372c 100644 --- a/broker/append_fsm.go +++ b/broker/append_fsm.go @@ -6,6 +6,7 @@ import ( "fmt" "hash" "io" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -435,9 +436,26 @@ func (b *appendFSM) onValidatePreconditions() { // reflected in our index, if a commit wasn't accepted by all peers. // Such writes are reported as failed to the client and are retried // (this failure mode is what makes journals at-least-once). - var indexMin, indexMax, indexDirty = b.resolved.replica.index.OffsetRange() + var indexMin, indexMax, indexModTime = b.resolved.replica.index.Summary() var suspend = b.resolved.journalSpec.Suspend + // The index is "clean" if all fragments have been remote for the journal's + // flush interval, where the flush interval is interpreted as an upper-bound + // expectation of the period between appends if the journal remains "in use". + // Thus, if a journal doesn't recieve an append for more than its interval, + // it's presumed to be idle and is eligible for suspension. + // + // To see why, consider a group of journals which are appended to at midnight, + // configured with a 24h flush interval. These journals will not auto-suspend + // ordinarily. If we instead used a more aggressive policy, they might trigger + // storms of suspensions and re-activations which could impact other journals + // due to assignment churn. + var flushInterval = int64(b.resolved.journalSpec.Fragment.FlushInterval.Seconds()) + if flushInterval == 0 { + flushInterval = 24 * 60 * 60 // Default to 24h. + } + var indexDirty = indexModTime != 0 && indexModTime < time.Now().Unix()-flushInterval + var maxOffset = b.pln.spool.End if indexMax > maxOffset { maxOffset = indexMax diff --git a/broker/fragment/index.go b/broker/fragment/index.go index 8cdc7ca2..f6be5618 100644 --- a/broker/fragment/index.go +++ b/broker/fragment/index.go @@ -125,13 +125,17 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon } } -// OffsetRange returns the [Begin, End) offset range of all Fragments in the index, -// and a boolean indicating if the index has local-only fragments. -func (fi *Index) OffsetRange() (int64, int64, bool) { +// Summary returns the [Begin, End) offset range of all Fragments in the index, +// and the persisted ModTime of the last Fragment (or zero, if it's local). +func (fi *Index) Summary() (int64, int64, int64) { defer fi.mu.RUnlock() fi.mu.RLock() - return fi.set.BeginOffset(), fi.set.EndOffset(), len(fi.local) != 0 + if l := len(fi.set); l == 0 { + return 0, 0, 0 + } else { + return fi.set[0].Begin, fi.set[l-1].End, fi.set[l-1].ModTime + } } // SpoolCommit adds local Spool Fragment |frag| to the index. diff --git a/broker/fragment/index_test.go b/broker/fragment/index_test.go index 9a4d6c08..b70add4c 100644 --- a/broker/fragment/index_test.go +++ b/broker/fragment/index_test.go @@ -288,7 +288,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) { <-ind.FirstRefreshCh() c.Check(ind.set, gc.HasLen, 3) - var bo, eo, _ = ind.OffsetRange() + var bo, eo, _ = ind.Summary() c.Check(bo, gc.Equals, int64(0x0)) c.Check(eo, gc.Equals, int64(0x255)) @@ -306,7 +306,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) { ind.ReplaceRemote(set) c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected. - bo, eo, _ = ind.OffsetRange() + bo, eo, _ = ind.Summary() c.Check(bo, gc.Equals, int64(0x0)) c.Check(eo, gc.Equals, int64(0x555)) diff --git a/broker/replicate_api_test.go b/broker/replicate_api_test.go index 637a3f21..65182ed2 100644 --- a/broker/replicate_api_test.go +++ b/broker/replicate_api_test.go @@ -40,7 +40,7 @@ func TestReplicateStreamAndCommit(t *testing.T) { require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6})) // Precondition: content not observable in the Fragment index. - var _, eo, _ = broker.replica("a/journal").index.OffsetRange() + var _, eo, _ = broker.replica("a/journal").index.Summary() require.Equal(t, int64(0), eo) // Commit. @@ -58,7 +58,7 @@ func TestReplicateStreamAndCommit(t *testing.T) { expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK}) // Post-condition: content is now observable. - _, eo, _ = broker.replica("a/journal").index.OffsetRange() + _, eo, _ = broker.replica("a/journal").index.Summary() require.Equal(t, int64(13), eo) // Send EOF and expect its returned.