Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130171: replica_rac2: integrate LogTracker r=sumeerbhola a=pav-kv

This PR integrates the `LogTracker` into RACv2 `Processor`. The design sketch is below.

- `LogTracker` is created when `raft.RawNode` is initialized. It reads the initial stable state from `RawNode.LogMark()`, and initializes the stable and admitted indices to match this mark.
- `LogTracker` observes all log storage appends in `handleRaftReady`, and all log and snapshot syncs in `OnLogSync` and `OnSnapSync` handlers. This guarantees that the stable mark in `LogTracker` is always accurate.
- `LogTracker` observes all entries subject to admission control, and their corresponding admissions. This allows updating the admitted vector accurately.

The admitted vector is sent to the leader from two places:

- In `sendRaftMessage`, any successful `MsgAppResp` message is intercepted, and the corresponding `RaftMessageRequest` is annotated with the admitted vector if it is in the coordinate system of the receiver, i.e. has the same leader term. This flow supports the fast path when logical admission happens without delays: by the time the `MsgAppResp` is sent, the entries are already admitted, and the admitted vector has advanced.
- In `handleRaftReady`, the admitted vector is sent to the `Piggybacker`, which then attaches them to any `RaftMessageRequestBatch` going to the same node as the receiver replica. This serves cases when admission is lagging the log syncs, and there might be no `MsgAppResp` to attach the vector to. Such admissions are batched into one `Ready` cycle for efficiency reasons.

Part of cockroachdb#129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 10, 2024
2 parents c6956d5 + 51043f9 commit 7571549
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (a AdmittedState) String() string {
}

func (a AdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("admitted=t%d/%s", a.Term, a.Admitted)
w.Printf("admitted=t%d/%v", a.Term, a.Admitted)
}

func (a PiggybackedAdmittedState) String() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"admission.go",
"close_scheduler.go",
"doc.go",
"log_tracker.go",
"processor.go",
"raft_node.go",
],
Expand Down
131 changes: 131 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package replica_rac2

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// logTracker wraps rac2.LogTracker with a mutex and state that helps track
// admitted vector changes and schedule their delivery to the leader. The
// semantics and requirements for all the methods is equivalent to the
// corresponding methods of rac2.LogTracker.
//
// The logTracker has its own mutex in order to avoid interference with objects
// that use wider mutexes such as raftMu.
type logTracker struct {
syncutil.Mutex
lt rac2.LogTracker
// dirty is true when the admitted vector has changed and should be sent to
// the leader.
dirty bool
// scheduled is true when the admitted vector change has been scheduled for
// processing by raft Ready.
scheduled bool
}

func (l *logTracker) init(stable rac2.LogMark) {
l.Lock()
defer l.Unlock()
l.lt = rac2.NewLogTracker(stable)
}

// admitted returns the current admitted vector, and a bool indicating whether
// this is the first call observing this particular admitted vector. The caller
// may decide not to send this vector to the leader if it is not new (since it
// has already been sent).
//
// The passed-in bool indicates whether this call is made from the Ready
// handler. In this case the scheduled flag is reset, which allows the next
// logAdmitted call to return true and allow scheduling a Ready iteration again.
// This flow avoids unnecessary Ready scheduling events.
func (l *logTracker) admitted(sched bool) (av rac2.AdmittedVector, dirty bool) {
l.Lock()
defer l.Unlock()
dirty, l.dirty = l.dirty, false
if sched {
l.scheduled = false
}
av = l.lt.Admitted()
return av, dirty
}

func (l *logTracker) append(ctx context.Context, after uint64, to rac2.LogMark) {
l.Lock()
defer l.Unlock()
if l.lt.Append(ctx, after, to) {
l.dirty = true
}
}

func (l *logTracker) register(ctx context.Context, at rac2.LogMark, pri raftpb.Priority) {
l.Lock()
defer l.Unlock()
l.lt.Register(ctx, at, pri)
}

func (l *logTracker) logSynced(ctx context.Context, stable rac2.LogMark) {
l.Lock()
defer l.Unlock()
if l.lt.LogSynced(ctx, stable) {
l.dirty = true
}
}

// logAdmitted returns true if the admitted vector has advanced and must be
// scheduled for delivery to the leader. At the moment, this schedules a Ready
// handling cycle.
//
// The returned bool helps to avoid scheduling Ready many times in a row, in
// situations when there are many consecutive logAdmitted calls. The next
// scheduling event will be allowed after the next admitted(true) call.
func (l *logTracker) logAdmitted(ctx context.Context, at rac2.LogMark, pri raftpb.Priority) bool {
l.Lock()
defer l.Unlock()
if !l.lt.LogAdmitted(ctx, at, pri) {
return false
}
l.dirty = true
if !l.scheduled {
l.scheduled = true
return true
}
return false
}

func (l *logTracker) snapSynced(ctx context.Context, mark rac2.LogMark) {
l.Lock()
defer l.Unlock()
if l.lt.SnapSynced(ctx, mark) {
l.dirty = true
}
}

func (l *logTracker) debugString() string {
l.Lock()
defer l.Unlock()
var flags string
if l.dirty {
flags += "+dirty"
}
if l.scheduled {
flags += "+sched"
}
if len(flags) != 0 {
flags = " [" + flags + "]"
}
return fmt.Sprintf("LogTracker%s: %s", flags, l.lt.DebugString())
}
Loading

0 comments on commit 7571549

Please sign in to comment.