Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
136861: sql: separate pausable portal code paths in `execStmtInOpenState` r=mgartner a=mgartner

#### sql: add `(*connExecutor).execStmtInOpenStateWithPausablePortal`

The `execStmtInOpenStateWithPausablePortal` method has been added to
`connExecutor`. It is currently an exact copy of `execStmtInOpenState`.
Future commits will simplify both methods.

Release note: None

#### sql: remove `isPausablePortal` anonymous function in `execStmtInOpenStateWithPortal`

Release note: None

#### sql: remove pausable-portal-specific code paths in `execStmtInOpenState`

Release note: None

#### sql: remove `processCleanupFunc` in `execStmtInOpenState`

Fixes cockroachdb#135908

Release note: None

#### sql: remove `portal` parameter from `execStmtInOpenState`

Release note: None

#### sql: remove `localVars` struct in `execStmtInOpenState`

Refactoring in previous commits has prevented all the `localVars`
variables from escaping to the heap, except for `cancelQuery`. So the
struct is no longer needed. Removing it reduces the size of heap
allocations, since the entire struct was heap allocated previously.

Release note: None

#### sql: refactor errors in `execStmtInOpenState`

Release note: None

#### sql: add metamorphic test variable to force pausable portal logic

Release note: None

136977: kvserver,rac2,tracker: use RaftMaxInflightBytes when force-flushing r=sumeerbhola a=sumeerbhola

This limit is roughly respected, in that force-flush is paused when the
limit is exceeded, and unpaused when ready handling indicates that the
replicaSendStream is back within the limit.

Informs cockroachdb#135814

Epic: none

Release note: None

137313: raft: misc becomeFollower cleanups r=nvanbenschoten a=arulajmani

See individual commits. 

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
4 people committed Dec 11, 2024
4 parents 00264aa + ca13fec + 6391681 + 144cfad commit 53fadf4
Show file tree
Hide file tree
Showing 34 changed files with 2,496 additions and 1,731 deletions.
87 changes: 73 additions & 14 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,16 @@ type RangeControllerOptions struct {
EvalWaitMetrics *EvalWaitMetrics
RangeControllerMetrics *RangeControllerMetrics
WaitForEvalConfig *WaitForEvalConfig
ReplicaMutexAsserter ReplicaMutexAsserter
Knobs *kvflowcontrol.TestingKnobs
// RaftMaxInflightBytes is a soft limit on the maximum inflight bytes when
// using MsgAppPull mode. Currently, the RangeController only attempts to
// respect this when force-flushing a replicaSendStream, since the typical
// production configuration of this value (32MiB) is larger than the typical
// production configuration of the shared regular token pool (16MiB), so
// attempting to respect this when doing non-force-flush sends is
// unnecessary.
RaftMaxInflightBytes uint64
ReplicaMutexAsserter ReplicaMutexAsserter
Knobs *kvflowcontrol.TestingKnobs
}

// RangeControllerInitState is the initial state at the time of creation.
Expand Down Expand Up @@ -686,6 +694,9 @@ func NewRangeController(
if log.V(1) {
log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID)
}
if o.RaftMaxInflightBytes == 0 {
o.RaftMaxInflightBytes = math.MaxUint64
}
rc := &rangeController{
opts: o,
term: init.Term,
Expand Down Expand Up @@ -876,10 +887,15 @@ type raftEventForReplica struct {
// Reminder: (ReplicaStateInfo.Match, ReplicaStateInfo.Next) are in-flight.
// nextRaftIndex is where the next entry will be added.
//
// ReplicaStateInfo.{State, Match} are the latest state.
// ReplicaStateInfo.{State, Match, InflightBytes} are the latest state.
// ReplicaStateInfo.Next represents the state preceding this raft event,
// i.e., it will be altered by sendingEntries. nextRaftIndex also represents
// the state preceding this event, and will be altered by newEntries.
// i.e., it will be altered by sendingEntries. Note that InflightBytes
// already incorporates sendingEntries -- we could choose to iterate over
// the sending entries in constructRaftEventForReplica and compensate for
// them, but we don't bother.
//
// nextRaftIndex also represents the state preceding this event, and will be
// altered by newEntries.
//
// createSendStream is set to true if the replicaSendStream should be
// (re)created.
Expand Down Expand Up @@ -1050,9 +1066,10 @@ func constructRaftEventForReplica(
refr := raftEventForReplica{
mode: mode,
replicaStateInfo: ReplicaStateInfo{
State: latestReplicaStateInfo.State,
Match: latestReplicaStateInfo.Match,
Next: next,
State: latestReplicaStateInfo.State,
Match: latestReplicaStateInfo.Match,
Next: next,
InflightBytes: latestReplicaStateInfo.InflightBytes,
},
nextRaftIndex: raftEventAppendState.rewoundNextRaftIndex,
newEntries: raftEventAppendState.newEntries,
Expand Down Expand Up @@ -2113,6 +2130,10 @@ type replicaSendStream struct {
tokenWatcherHandle SendTokenWatcherHandle
deductedForSchedulerTokens kvflowcontrol.Tokens
}
// inflightBytes is the sum of bytes that are inflight, i.e., in
// (ReplicaStateInfo.Match,ReplicaStateInfo.Next).
inflightBytes uint64

// TODO(sumeer): remove closed. Whenever a replicaSendStream is closed it
// is also no longer referenced by replicaState. The only motivation for
// closed is that replicaSendStream.Notify calls directly into
Expand Down Expand Up @@ -2517,6 +2538,13 @@ func (rs *replicaState) scheduledRaftMuLocked(
rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, mode, false, false)
return false, false
}
forceFlushActiveAndPaused := func() bool {
return rss.mu.sendQueue.forceFlushStopIndex.active() &&
rss.reachedInflightBytesThresholdRaftMuAndStreamLocked()
}
if forceFlushActiveAndPaused() {
return false, false
}
// 4MB. Don't want to hog the scheduler thread for too long.
const MaxBytesToSend kvflowcontrol.Tokens = 4 << 20
bytesToSend := MaxBytesToSend
Expand Down Expand Up @@ -2564,6 +2592,7 @@ func (rs *replicaState) scheduledRaftMuLocked(
rs.sendStream = nil
return false, true
}
rss.updateInflightRaftMuAndStreamLocked(slice)
rss.dequeueFromQueueAndSendRaftMuAndStreamLocked(ctx, msg)
isEmpty := rss.isEmptySendQueueStreamLocked()
if isEmpty {
Expand All @@ -2580,12 +2609,14 @@ func (rs *replicaState) scheduledRaftMuLocked(
// next tick. We accept a latency hiccup in this case for now.
rss.mu.sendQueue.forceFlushStopIndex = 0
}
forceFlushNeedsToPause := forceFlushActiveAndPaused()
watchForTokens :=
!rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0
if watchForTokens {
rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx)
}
return !watchForTokens, false
scheduleAgain = !watchForTokens && !forceFlushNeedsToPause
return scheduleAgain, false
}

func (rs *replicaState) closeSendStreamRaftMuLocked(ctx context.Context) {
Expand Down Expand Up @@ -2644,6 +2675,9 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
wasEmptySendQ := rss.isEmptySendQueueStreamLocked()
rss.tryHandleModeChangeRaftMuAndStreamLocked(
ctx, event.mode, wasEmptySendQ, directive.forceFlushStopIndex.active())
// Use the latest inflight bytes, since it reflects the advancing Match.
wasExceedingInflightBytesThreshold := rss.reachedInflightBytesThresholdRaftMuAndStreamLocked()
rss.mu.inflightBytes = event.replicaStateInfo.InflightBytes
if event.mode == MsgAppPull {
// MsgAppPull mode (i.e., followers). Populate sendingEntries.
n := len(event.sendingEntries)
Expand All @@ -2652,12 +2686,18 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID))
}
if directive.forceFlushStopIndex.active() {
// Must have a send-queue, so sendingEntries should stay empty (these
// will be queued).
if !rss.mu.sendQueue.forceFlushStopIndex.active() {
// Must have a send-queue, so sendingEntries should stay empty
// (these will be queued).
rss.startForceFlushRaftMuAndStreamLocked(ctx, directive.forceFlushStopIndex)
} else if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex {
rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex
} else {
if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex {
rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex
}
if wasExceedingInflightBytesThreshold &&
!rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
}
}
} else {
// INVARIANT: !directive.forceFlushStopIndex.active()
Expand Down Expand Up @@ -2800,6 +2840,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
return false,
errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID)
}
rss.updateInflightRaftMuAndStreamLocked(slice)
rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false)
}

Expand All @@ -2819,6 +2860,16 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
return transitionedSendQState, nil
}

func (rss *replicaSendStream) updateInflightRaftMuAndStreamLocked(ls raft.LogSlice) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
entries := ls.Entries()
for i := range ls.Entries() {
// NB: raft.payloadSize also uses len(raftpb.Entry.Data).
rss.mu.inflightBytes += uint64(len(entries[i].Data))
}
}

func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked(
ctx context.Context, mode RaftMsgAppMode, isEmptySendQ bool, toldToForceFlush bool,
) {
Expand Down Expand Up @@ -2860,14 +2911,22 @@ func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked(
}
}

func (rss *replicaSendStream) reachedInflightBytesThresholdRaftMuAndStreamLocked() bool {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
return rss.mu.inflightBytes >= rss.parent.parent.opts.RaftMaxInflightBytes
}

func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(
ctx context.Context, forceFlushStopIndex forceFlushStopIndex,
) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1)
rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
if !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
}
rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false)
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ type testingRCState struct {
setTokenCounters map[kvflowcontrol.Stream]struct{}
initialRegularTokens kvflowcontrol.Tokens
initialElasticTokens kvflowcontrol.Tokens
maxInflightBytes uint64
}

func (s *testingRCState) init(t *testing.T, ctx context.Context) {
Expand Down Expand Up @@ -235,10 +237,21 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string {
rss.mu.Lock()
defer rss.mu.Unlock()

var inflightBytesStr string
// NB: inflightBytes is retrieved from the state of replicaSendStream,
// while the starting index of inflight (match + 1) is retrieved from
// testRepl. If the change in testRepl state has not been communicated to
// replicaSendStream, these can look inconsistent, which is completely
// explainable. Typically, test code that advances match (via admit)
// should also send an empty raft event so that replicaSendStream is aware
// of the change in match.
if rss.mu.inflightBytes != 0 {
inflightBytesStr = fmt.Sprintf(" (%s)", humanize.IBytes(rss.mu.inflightBytes))
}
fmt.Fprintf(&b,
"state=%v closed=%v inflight=[%v,%v) send_queue=[%v,%v) precise_q_size=%v",
"state=%v closed=%v inflight=[%v,%v)%s send_queue=[%v,%v) precise_q_size=%v",
rss.mu.connectedState, rss.mu.closed,
testRepl.info.Match+1, rss.mu.sendQueue.indexToSend,
testRepl.info.Match+1, rss.mu.sendQueue.indexToSend, inflightBytesStr,
rss.mu.sendQueue.indexToSend,
rss.mu.sendQueue.nextRaftIndex, rss.mu.sendQueue.preciseSizeSum)
if rss.mu.sendQueue.forceFlushStopIndex.active() {
Expand Down Expand Up @@ -355,6 +368,7 @@ func (s *testingRCState) getOrInitRange(
EvalWaitMetrics: s.evalMetrics,
RangeControllerMetrics: s.rcMetrics,
WaitForEvalConfig: s.waitForEvalConfig,
RaftMaxInflightBytes: s.maxInflightBytes,
ReplicaMutexAsserter: makeTestMutexAsserter(),
Knobs: &kvflowcontrol.TestingKnobs{},
}
Expand Down Expand Up @@ -1141,6 +1155,13 @@ func TestRangeController(t *testing.T) {
require.NoError(t, err)
state.initialElasticTokens = kvflowcontrol.Tokens(elasticInit)
}
var maxInflightBytesString string
d.MaybeScanArgs(t, "max_inflight_bytes", &maxInflightBytesString)
if maxInflightBytesString != "" {
maxInflightBytes, err := humanizeutil.ParseBytes(maxInflightBytesString)
require.NoError(t, err)
state.maxInflightBytes = uint64(maxInflightBytes)
}

for _, r := range scanRanges(t, d.Input) {
state.getOrInitRange(t, r, MsgAppPush)
Expand Down
28 changes: 20 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/close
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ t1/s3: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[1,4) (3.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[1,4) (3.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n3,s3):3: state=replicate closed=false inflight=[1,4) (3.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand All @@ -86,22 +86,34 @@ t1/s2: eval reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB
t1/s3: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
send reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB

# Send an empty event so that replicaSendStreams are aware of the update to
# match.
raft_event
range_id=1
----
t1/s1: eval reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB
send reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB
t1/s2: eval reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB
send reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB
t1/s3: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
send reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[3,4) send_queue=[4,4) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[3,4) (1.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+1.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false inflight=[2,4) send_queue=[4,4) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[2,4) (2.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+2.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n3,s3):3: state=replicate closed=false inflight=[1,4) (3.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand All @@ -122,13 +134,13 @@ r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[3,4) send_queue=[4,4) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[3,4) (1.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+1.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false inflight=[2,4) send_queue=[4,4) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[2,4) (2.0 MiB) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+2.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand Down
Loading

0 comments on commit 53fadf4

Please sign in to comment.