From 7cf9492fe21d5105ba0487bfd5f0ca0efb5c75c0 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 18 Sep 2024 17:40:41 -0400 Subject: [PATCH] kvserver: move racv2 enabled level to kvflowcontrol and add test knob Move `EnabledWhenLeaderLevel` from `replica_rac2` to the parent package `kvflowcontrol` and rename `V2EnabledWhenLeaderLevel` to reflect the move to a shared v1/v2 package. Also move the corresponding function `racV2EnabledWhenLeaderLevel` to `kvflowcontrol`. `GetV2EnabledWhenLeaderLevel` will check if there are testing knob overrides for the enabled level, and if not continue returning `V2NotEnabledWhenLeader`. Some commentary and todos are also left around this function, for when we enable the protocol and separately, pull mode. Part of: #130187 Release note: None --- pkg/kv/kvserver/flow_control_stores.go | 4 +- pkg/kv/kvserver/kvflowcontrol/BUILD.bazel | 1 + .../kvserver/kvflowcontrol/kvflowcontrol.go | 53 +++++++++++++++++++ .../kvflowcontrol/replica_rac2/processor.go | 30 ++++------- .../replica_rac2/processor_test.go | 22 ++++---- .../kvserver/kvflowcontrol/testing_knobs.go | 3 ++ pkg/kv/kvserver/replica.go | 10 +--- pkg/kv/kvserver/replica_init.go | 4 +- pkg/kv/kvserver/replica_raft.go | 11 ++-- 9 files changed, 91 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index c46a62e0b6e4..013ddc988d13 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -174,7 +174,7 @@ func (sh *storeForFlowControl) LookupReplicationAdmissionHandle( } // NB: Admit is called soon after this lookup. level := repl.flowControlV2.GetEnabledWhenLeader() - useV1 := level == replica_rac2.NotEnabledWhenLeader + useV1 := level == kvflowcontrol.V2NotEnabledWhenLeader var v1Handle kvflowcontrol.ReplicationAdmissionHandle if useV1 { repl.mu.Lock() @@ -453,7 +453,7 @@ func (h admissionDemuxHandle) Admit( // can cause either value of admitted. See the comment in // ReplicationAdmissionHandle. level := h.r.flowControlV2.GetEnabledWhenLeader() - if level == replica_rac2.NotEnabledWhenLeader { + if level == kvflowcontrol.V2NotEnabledWhenLeader { return admitted, err } // Transition from v1 => v2 happened while waiting. Fall through to wait diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel index ec8546679835..4eb362463094 100644 --- a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb", "//pkg/roachpb", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/metamorphic", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e1926603fa54..cb435052a5ff 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/redact" @@ -118,6 +119,58 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error { return nil }) +// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when +// this replica is the leader. +// +// State transitions are V2NotEnabledWhenLeader => +// V2EnabledWhenLeaderV1Encoding => V2EnabledWhenLeaderV2Encoding, i.e., the +// level will never regress. +type V2EnabledWhenLeaderLevel = uint32 + +const ( + V2NotEnabledWhenLeader V2EnabledWhenLeaderLevel = iota + V2EnabledWhenLeaderV1Encoding + V2EnabledWhenLeaderV2Encoding +) + +// GetV2EnabledWhenLeaderLevel returns the level at which RACV2 is enabled when +// this replica is the leader. +// +// The level is determined by the cluster version, and is ratcheted up as the +// cluster version advances. The level is used to determine: +// +// 1. Whether the leader should use the RACv2 protocol. +// 2. Whether the leader should use the V1 or V2 entry encoding iff (1) is +// true. +// +// Upon the leader first seeing V24_3_UseRACV2WithV1EntryEncoding, it will +// create a RangeController and use the V1 entry encoding, operating in Push +// mode. Upon the leader first seeing V24_3_UseRACV2Full, it will continue +// using the RACV2 protocol, but will switch to the V2 entry encoding. Note the +// necessary migration for V2NotEnabledWhenLeader => +// V2EnabledWhenLeaderV1Encoding occurs before anything else in +// kvserver.handleRaftReadyRaftMuLocked. +// +// TODO(kvoli,sumeerbhola,pav-kv): When we introduce pull mode (and associated +// cluster setting), update this comment to mention that the cluster setting is +// only relevant when at V2EnabledWhenLeaderV2Encoding level. +func GetV2EnabledWhenLeaderLevel( + ctx context.Context, st *cluster.Settings, knobs *TestingKnobs, +) V2EnabledWhenLeaderLevel { + if knobs != nil && knobs.OverrideV2EnabledWhenLeaderLevel != nil { + return knobs.OverrideV2EnabledWhenLeaderLevel() + } + // TODO(kvoli): Enable once #130619 merges and tests affected by enabling v2 + // are addressed: + // if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2Full) { + // return V2EnabledWhenLeaderV2Encoding + // } + // if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2WithV1EntryEncoding) { + // return V2EnabledWhenLeaderV1Encoding + // } + return V2NotEnabledWhenLeader +} + // Stream models the stream over which we replicate data traffic, the // transmission for which we regulate using flow control. It's segmented by the // specific store the traffic is bound for and the tenant driving it. Despite diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index ed42c9f5b628..1d2cbb074ba9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -183,19 +183,6 @@ type RangeControllerFactory interface { New(ctx context.Context, state rangeControllerInitState) rac2.RangeController } -// EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when -// this replica is the leader. -// -// State transitions are NotEnabledWhenLeader => EnabledWhenLeaderV1Encoding -// => EnabledWhenLeaderV2Encoding, i.e., the level will never regress. -type EnabledWhenLeaderLevel = uint32 - -const ( - NotEnabledWhenLeader EnabledWhenLeaderLevel = iota - EnabledWhenLeaderV1Encoding - EnabledWhenLeaderV2Encoding -) - // ProcessorOptions are specified when creating a new Processor. type ProcessorOptions struct { // Various constant fields that are duplicated from Replica, since we @@ -216,7 +203,7 @@ type ProcessorOptions struct { Settings *cluster.Settings EvalWaitMetrics *rac2.EvalWaitMetrics - EnabledWhenLeaderLevel EnabledWhenLeaderLevel + EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel Knobs *kvflowcontrol.TestingKnobs } @@ -289,14 +276,14 @@ type Processor interface { // This may be a noop if the level has already been reached. // // raftMu is held. - SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level EnabledWhenLeaderLevel) + SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel) // GetEnabledWhenLeader returns the current level. It may be used in // highly concurrent settings at the leaseholder, when waiting for eval, // and when encoding a proposal. Note that if the leaseholder is not the // leader and the leader has switched to a higher level, there is no harm // done, since the leaseholder can continue waiting for v1 tokens and use // the v1 entry encoding. - GetEnabledWhenLeader() EnabledWhenLeaderLevel + GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel // OnDescChangedLocked provides a possibly updated RangeDescriptor. The // tenantID passed in all calls must be the same. @@ -502,7 +489,7 @@ type processorImpl struct { // enabledWhenLeader indicates the RACv2 mode of operation when this replica // is the leader. Atomic value, for serving GetEnabledWhenLeader. Updated only // while holding raftMu. Can be read non-atomically if raftMu is held. - enabledWhenLeader EnabledWhenLeaderLevel + enabledWhenLeader kvflowcontrol.V2EnabledWhenLeaderLevel v1EncodingPriorityMismatch log.EveryN } @@ -547,14 +534,15 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { // SetEnabledWhenLeaderRaftMuLocked implements Processor. func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked( - ctx context.Context, level EnabledWhenLeaderLevel, + ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel, ) { p.opts.Replica.RaftMuAssertHeld() if p.destroyed || p.enabledWhenLeader >= level { return } atomic.StoreUint32(&p.enabledWhenLeader, level) - if level != EnabledWhenLeaderV1Encoding || p.desc.replicas == nil { + if level != kvflowcontrol.V2EnabledWhenLeaderV1Encoding || + p.desc.replicas == nil { return } // May need to create RangeController. @@ -576,7 +564,7 @@ func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked( } // GetEnabledWhenLeader implements Processor. -func (p *processorImpl) GetEnabledWhenLeader() EnabledWhenLeaderLevel { +func (p *processorImpl) GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel { return atomic.LoadUint32(&p.enabledWhenLeader) } @@ -693,7 +681,7 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked( return } // Is the leader. - if p.enabledWhenLeader == NotEnabledWhenLeader { + if p.enabledWhenLeader == kvflowcontrol.V2NotEnabledWhenLeader { return } if p.leader.rc != nil && termChanged { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 1c5ee707d0cd..16db8f88143f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -281,7 +281,7 @@ func TestProcessorBasic(t *testing.T) { var st *cluster.Settings var p *processorImpl tenantID := roachpb.MustMakeTenantID(4) - reset := func(enabled EnabledWhenLeaderLevel) { + reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) { b.Reset() r = newTestReplica(&b) sched = testRaftScheduler{b: &b} @@ -533,31 +533,33 @@ func parseAdmissionPriority(t *testing.T, td *datadriven.TestData) admissionpb.W return admissionpb.NormalPri } -func parseEnabledLevel(t *testing.T, td *datadriven.TestData) EnabledWhenLeaderLevel { +func parseEnabledLevel( + t *testing.T, td *datadriven.TestData, +) kvflowcontrol.V2EnabledWhenLeaderLevel { if td.HasArg("enabled-level") { var str string td.ScanArgs(t, "enabled-level", &str) switch str { case "not-enabled": - return NotEnabledWhenLeader + return kvflowcontrol.V2NotEnabledWhenLeader case "v1-encoding": - return EnabledWhenLeaderV1Encoding + return kvflowcontrol.V2EnabledWhenLeaderV1Encoding case "v2-encoding": - return EnabledWhenLeaderV2Encoding + return kvflowcontrol.V2EnabledWhenLeaderV2Encoding default: t.Fatalf("unrecoginized level %s", str) } } - return NotEnabledWhenLeader + return kvflowcontrol.V2NotEnabledWhenLeader } -func enabledLevelString(enabledLevel EnabledWhenLeaderLevel) string { +func enabledLevelString(enabledLevel kvflowcontrol.V2EnabledWhenLeaderLevel) string { switch enabledLevel { - case NotEnabledWhenLeader: + case kvflowcontrol.V2NotEnabledWhenLeader: return "not-enabled" - case EnabledWhenLeaderV1Encoding: + case kvflowcontrol.V2EnabledWhenLeaderV1Encoding: return "v1-encoding" - case EnabledWhenLeaderV2Encoding: + case kvflowcontrol.V2EnabledWhenLeaderV2Encoding: return "v2-encoding" } return "unknown-level" diff --git a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go index bbac739698c2..1dc470b44c07 100644 --- a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go +++ b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go @@ -23,6 +23,9 @@ type TestingKnobs struct { // OverrideTokenDeduction is used to override how many tokens are deducted // post-evaluation. OverrideTokenDeduction func() Tokens + // OverrideV2EnabledWhenLeaderLevel is used to override the level at which + // RACv2 is enabled when a replica is the leader. + OverrideV2EnabledWhenLeaderLevel func() V2EnabledWhenLeaderLevel } // TestingKnobsV1 are the testing knobs that appply to replication flow control diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 36620181e6ae..eff277ac18c4 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -328,7 +329,7 @@ type Replica struct { // being applied to the state machine. bytesAccount logstore.BytesAccount - flowControlLevel replica_rac2.EnabledWhenLeaderLevel + flowControlLevel kvflowcontrol.V2EnabledWhenLeaderLevel // Scratch for populating RaftEvent for flowControlV2. msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message @@ -2527,13 +2528,6 @@ func (r *Replica) GetMutexForTesting() *ReplicaMutex { return &r.mu.ReplicaMutex } -func racV2EnabledWhenLeaderLevel( - ctx context.Context, st *cluster.Settings, -) replica_rac2.EnabledWhenLeaderLevel { - // TODO(sumeer): implement fully, once all the dependencies are implemented. - return replica_rac2.NotEnabledWhenLeader -} - // maybeEnqueueProblemRange will enqueue the replica for processing into the // replicate queue iff: // diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c03682ffedf5..0f69c1aa7114 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" @@ -224,7 +225,8 @@ func newUninitializedReplicaWithoutRaftGroup( makeStoreFlowControlHandleFactory(r.store), r.store.TestingKnobs().FlowControlTestingKnobs, ) - r.raftMu.flowControlLevel = racV2EnabledWhenLeaderLevel(r.raftCtx, store.cfg.Settings) + r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel( + r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs) r.raftMu.msgAppScratchForFlowControl = map[roachpb.ReplicaID][]raftpb.Message{} r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{ NodeID: store.NodeID(), diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index d8d2a4950f0b..ba45035bb822 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -360,7 +360,7 @@ func (r *Replica) evalAndPropose( } func (r *Replica) encodePriorityForRACv2() bool { - return r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding + return r.flowControlV2.GetEnabledWhenLeader() == kvflowcontrol.V2EnabledWhenLeaderV2Encoding } // propose encodes a command, starts tracking it, and proposes it to Raft. @@ -833,11 +833,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Before doing anything, including calling Ready(), see if we need to // ratchet up the flow control level. This code will go away when RACv1 => // RACv2 transition is complete and RACv1 code is removed. - if r.raftMu.flowControlLevel < replica_rac2.EnabledWhenLeaderV2Encoding { + if r.raftMu.flowControlLevel < kvflowcontrol.V2EnabledWhenLeaderV2Encoding { // Not already at highest level. - level := racV2EnabledWhenLeaderLevel(ctx, r.store.cfg.Settings) + level := kvflowcontrol.GetV2EnabledWhenLeaderLevel( + ctx, r.store.ClusterSettings(), r.store.TestingKnobs().FlowControlTestingKnobs) if level > r.raftMu.flowControlLevel { - if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader { + if r.raftMu.flowControlLevel == kvflowcontrol.V2NotEnabledWhenLeader { func() { r.mu.Lock() defer r.mu.Unlock() @@ -1938,7 +1939,7 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { FromReplica: fromReplica, Message: msg, RangeStartKey: startKey, // usually nil - UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= replica_rac2.EnabledWhenLeaderV1Encoding, + UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority.