diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 2f73db38e2b6..1efb5e1e0aa3 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -399,4 +399,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez application trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. application ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application -version version 1000024.2-upgrading-to-1000024.3-step-016 set the active cluster version in the format '.' application +version version 1000024.2-upgrading-to-1000024.3-step-020 set the active cluster version in the format '.' application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 326200f4b4a7..2e395a64237a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -356,6 +356,6 @@
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracezServerless/Dedicated/Self-Hosted
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.Serverless/Dedicated/Self-Hosted
ui.display_timezone
enumerationetc/utcthe timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]Serverless/Dedicated/Self-Hosted -
version
version1000024.2-upgrading-to-1000024.3-step-016set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted +
version
version1000024.2-upgrading-to-1000024.3-step-020set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted diff --git a/pkg/ccl/backupccl/alter_backup_schedule.go b/pkg/ccl/backupccl/alter_backup_schedule.go index 4ff80bc47645..7bdabce2d71d 100644 --- a/pkg/ccl/backupccl/alter_backup_schedule.go +++ b/pkg/ccl/backupccl/alter_backup_schedule.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) @@ -464,6 +465,13 @@ func processFullBackupRecurrence( s.incStmt = &tree.Backup{} *s.incStmt = *s.fullStmt s.incStmt.AppendToLatest = true + // Pre 23.2 schedules did not have a cluster ID, so if we are altering a + // schedule that was created before 23.2, we need to set the cluster ID on + // the newly created incremental manually. + schedDetails := *s.fullJob.ScheduleDetails() + if schedDetails.ClusterID.Equal(uuid.Nil) { + schedDetails.ClusterID = p.ExtendedEvalContext().ClusterID + } rec := s.fullJob.ScheduleExpr() incRecurrence, err := schedulebase.ComputeScheduleRecurrence(env.Now(), &rec) @@ -476,7 +484,7 @@ func processFullBackupRecurrence( p.User(), s.fullJob.ScheduleLabel(), incRecurrence, - *s.fullJob.ScheduleDetails(), + schedDetails, jobspb.InvalidScheduleID, s.fullArgs.UpdatesLastBackupMetric, s.incStmt, diff --git a/pkg/ccl/backupccl/alter_backup_schedule_test.go b/pkg/ccl/backupccl/alter_backup_schedule_test.go index ed20d00aa92b..da53ddbb2fc5 100644 --- a/pkg/ccl/backupccl/alter_backup_schedule_test.go +++ b/pkg/ccl/backupccl/alter_backup_schedule_test.go @@ -158,5 +158,42 @@ INSERT INTO t1 values (1), (10), (100); rows = th.sqlDB.QueryStr(t, fmt.Sprintf(`ALTER BACKUP SCHEDULE %d EXECUTE IMMEDIATELY;`, scheduleID)) require.Equal(t, trim(th.env.Now().String()), trim(rows[0][3])) +} + +func TestAlterBackupScheduleSetsIncrementalClusterID(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + th, cleanup := newAlterSchedulesTestHelper(t, nil) + defer cleanup() + rows := th.sqlDB.QueryStr( + t, + `CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@daily' FULL BACKUP ALWAYS;`, + ) + require.Len(t, rows, 1) + scheduleID, err := strconv.Atoi(rows[0][0]) + require.NoError(t, err) + + // Artificially remove cluster ID from full backup to simulate pre-23.2 schedule. + th.sqlDB.QueryStr( + t, + fmt.Sprintf(`UPDATE system.scheduled_jobs + SET + schedule_details = crdb_internal.json_to_pb( + 'cockroach.jobs.jobspb.ScheduleDetails', + json_remove_path( + crdb_internal.pb_to_json('cockroach.jobs.jobspb.ScheduleDetails', schedule_details), + ARRAY['clusterId'] + ) + ) + WHERE schedule_id=%d;`, scheduleID), + ) + + // Ensure creating incremental from a full backup schedule without a cluster ID passes + rows = th.sqlDB.QueryStr(t, fmt.Sprintf( + `ALTER BACKUP SCHEDULE %d SET RECURRING '@hourly', SET FULL BACKUP '@daily'`, + scheduleID), + ) + require.Len(t, rows, 2) } diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 098e0b8513d4..dc054f88d8a2 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -247,6 +247,15 @@ const ( // policies. V24_3_MaybePreventUpgradeForCoreLicenseDeprecation + // V24_3_UseRACV2WithV1EntryEncoding is the earliest version which supports + // ranges using replication flow control v2, still with v1 entry encoding. + V24_3_UseRACV2WithV1EntryEncoding + + // V24_3_UseRACV2Full is the earliest version which supports ranges using + // replication flow control v2, with v2 entry encoding. Replication flow + // control v1 is unsupported at this version. + V24_3_UseRACV2Full + // ************************************************* // Step (1) Add new versions above this comment. // Do not add new versions to a patch release. @@ -302,6 +311,8 @@ var versionTable = [numKeys]roachpb.Version{ V24_3_AdvanceCommitIndexViaMsgApps: {Major: 24, Minor: 2, Internal: 12}, V24_3_SQLInstancesAddDraining: {Major: 24, Minor: 2, Internal: 14}, V24_3_MaybePreventUpgradeForCoreLicenseDeprecation: {Major: 24, Minor: 2, Internal: 16}, + V24_3_UseRACV2WithV1EntryEncoding: {Major: 24, Minor: 2, Internal: 18}, + V24_3_UseRACV2Full: {Major: 24, Minor: 2, Internal: 20}, // ************************************************* // Step (2): Add new versions above this comment. diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index c46a62e0b6e4..013ddc988d13 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -174,7 +174,7 @@ func (sh *storeForFlowControl) LookupReplicationAdmissionHandle( } // NB: Admit is called soon after this lookup. level := repl.flowControlV2.GetEnabledWhenLeader() - useV1 := level == replica_rac2.NotEnabledWhenLeader + useV1 := level == kvflowcontrol.V2NotEnabledWhenLeader var v1Handle kvflowcontrol.ReplicationAdmissionHandle if useV1 { repl.mu.Lock() @@ -453,7 +453,7 @@ func (h admissionDemuxHandle) Admit( // can cause either value of admitted. See the comment in // ReplicationAdmissionHandle. level := h.r.flowControlV2.GetEnabledWhenLeader() - if level == replica_rac2.NotEnabledWhenLeader { + if level == kvflowcontrol.V2NotEnabledWhenLeader { return admitted, err } // Transition from v1 => v2 happened while waiting. Fall through to wait diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel index ec8546679835..4eb362463094 100644 --- a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb", "//pkg/roachpb", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/metamorphic", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e1926603fa54..cb435052a5ff 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/redact" @@ -118,6 +119,58 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error { return nil }) +// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when +// this replica is the leader. +// +// State transitions are V2NotEnabledWhenLeader => +// V2EnabledWhenLeaderV1Encoding => V2EnabledWhenLeaderV2Encoding, i.e., the +// level will never regress. +type V2EnabledWhenLeaderLevel = uint32 + +const ( + V2NotEnabledWhenLeader V2EnabledWhenLeaderLevel = iota + V2EnabledWhenLeaderV1Encoding + V2EnabledWhenLeaderV2Encoding +) + +// GetV2EnabledWhenLeaderLevel returns the level at which RACV2 is enabled when +// this replica is the leader. +// +// The level is determined by the cluster version, and is ratcheted up as the +// cluster version advances. The level is used to determine: +// +// 1. Whether the leader should use the RACv2 protocol. +// 2. Whether the leader should use the V1 or V2 entry encoding iff (1) is +// true. +// +// Upon the leader first seeing V24_3_UseRACV2WithV1EntryEncoding, it will +// create a RangeController and use the V1 entry encoding, operating in Push +// mode. Upon the leader first seeing V24_3_UseRACV2Full, it will continue +// using the RACV2 protocol, but will switch to the V2 entry encoding. Note the +// necessary migration for V2NotEnabledWhenLeader => +// V2EnabledWhenLeaderV1Encoding occurs before anything else in +// kvserver.handleRaftReadyRaftMuLocked. +// +// TODO(kvoli,sumeerbhola,pav-kv): When we introduce pull mode (and associated +// cluster setting), update this comment to mention that the cluster setting is +// only relevant when at V2EnabledWhenLeaderV2Encoding level. +func GetV2EnabledWhenLeaderLevel( + ctx context.Context, st *cluster.Settings, knobs *TestingKnobs, +) V2EnabledWhenLeaderLevel { + if knobs != nil && knobs.OverrideV2EnabledWhenLeaderLevel != nil { + return knobs.OverrideV2EnabledWhenLeaderLevel() + } + // TODO(kvoli): Enable once #130619 merges and tests affected by enabling v2 + // are addressed: + // if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2Full) { + // return V2EnabledWhenLeaderV2Encoding + // } + // if st.Version.IsActive(ctx, clusterversion.V24_3_UseRACV2WithV1EntryEncoding) { + // return V2EnabledWhenLeaderV1Encoding + // } + return V2NotEnabledWhenLeader +} + // Stream models the stream over which we replicate data traffic, the // transmission for which we regulate using flow control. It's segmented by the // specific store the traffic is bound for and the tenant driving it. Despite diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index ed42c9f5b628..1d2cbb074ba9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -183,19 +183,6 @@ type RangeControllerFactory interface { New(ctx context.Context, state rangeControllerInitState) rac2.RangeController } -// EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when -// this replica is the leader. -// -// State transitions are NotEnabledWhenLeader => EnabledWhenLeaderV1Encoding -// => EnabledWhenLeaderV2Encoding, i.e., the level will never regress. -type EnabledWhenLeaderLevel = uint32 - -const ( - NotEnabledWhenLeader EnabledWhenLeaderLevel = iota - EnabledWhenLeaderV1Encoding - EnabledWhenLeaderV2Encoding -) - // ProcessorOptions are specified when creating a new Processor. type ProcessorOptions struct { // Various constant fields that are duplicated from Replica, since we @@ -216,7 +203,7 @@ type ProcessorOptions struct { Settings *cluster.Settings EvalWaitMetrics *rac2.EvalWaitMetrics - EnabledWhenLeaderLevel EnabledWhenLeaderLevel + EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel Knobs *kvflowcontrol.TestingKnobs } @@ -289,14 +276,14 @@ type Processor interface { // This may be a noop if the level has already been reached. // // raftMu is held. - SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level EnabledWhenLeaderLevel) + SetEnabledWhenLeaderRaftMuLocked(ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel) // GetEnabledWhenLeader returns the current level. It may be used in // highly concurrent settings at the leaseholder, when waiting for eval, // and when encoding a proposal. Note that if the leaseholder is not the // leader and the leader has switched to a higher level, there is no harm // done, since the leaseholder can continue waiting for v1 tokens and use // the v1 entry encoding. - GetEnabledWhenLeader() EnabledWhenLeaderLevel + GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel // OnDescChangedLocked provides a possibly updated RangeDescriptor. The // tenantID passed in all calls must be the same. @@ -502,7 +489,7 @@ type processorImpl struct { // enabledWhenLeader indicates the RACv2 mode of operation when this replica // is the leader. Atomic value, for serving GetEnabledWhenLeader. Updated only // while holding raftMu. Can be read non-atomically if raftMu is held. - enabledWhenLeader EnabledWhenLeaderLevel + enabledWhenLeader kvflowcontrol.V2EnabledWhenLeaderLevel v1EncodingPriorityMismatch log.EveryN } @@ -547,14 +534,15 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { // SetEnabledWhenLeaderRaftMuLocked implements Processor. func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked( - ctx context.Context, level EnabledWhenLeaderLevel, + ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel, ) { p.opts.Replica.RaftMuAssertHeld() if p.destroyed || p.enabledWhenLeader >= level { return } atomic.StoreUint32(&p.enabledWhenLeader, level) - if level != EnabledWhenLeaderV1Encoding || p.desc.replicas == nil { + if level != kvflowcontrol.V2EnabledWhenLeaderV1Encoding || + p.desc.replicas == nil { return } // May need to create RangeController. @@ -576,7 +564,7 @@ func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked( } // GetEnabledWhenLeader implements Processor. -func (p *processorImpl) GetEnabledWhenLeader() EnabledWhenLeaderLevel { +func (p *processorImpl) GetEnabledWhenLeader() kvflowcontrol.V2EnabledWhenLeaderLevel { return atomic.LoadUint32(&p.enabledWhenLeader) } @@ -693,7 +681,7 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked( return } // Is the leader. - if p.enabledWhenLeader == NotEnabledWhenLeader { + if p.enabledWhenLeader == kvflowcontrol.V2NotEnabledWhenLeader { return } if p.leader.rc != nil && termChanged { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 1c5ee707d0cd..16db8f88143f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -281,7 +281,7 @@ func TestProcessorBasic(t *testing.T) { var st *cluster.Settings var p *processorImpl tenantID := roachpb.MustMakeTenantID(4) - reset := func(enabled EnabledWhenLeaderLevel) { + reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) { b.Reset() r = newTestReplica(&b) sched = testRaftScheduler{b: &b} @@ -533,31 +533,33 @@ func parseAdmissionPriority(t *testing.T, td *datadriven.TestData) admissionpb.W return admissionpb.NormalPri } -func parseEnabledLevel(t *testing.T, td *datadriven.TestData) EnabledWhenLeaderLevel { +func parseEnabledLevel( + t *testing.T, td *datadriven.TestData, +) kvflowcontrol.V2EnabledWhenLeaderLevel { if td.HasArg("enabled-level") { var str string td.ScanArgs(t, "enabled-level", &str) switch str { case "not-enabled": - return NotEnabledWhenLeader + return kvflowcontrol.V2NotEnabledWhenLeader case "v1-encoding": - return EnabledWhenLeaderV1Encoding + return kvflowcontrol.V2EnabledWhenLeaderV1Encoding case "v2-encoding": - return EnabledWhenLeaderV2Encoding + return kvflowcontrol.V2EnabledWhenLeaderV2Encoding default: t.Fatalf("unrecoginized level %s", str) } } - return NotEnabledWhenLeader + return kvflowcontrol.V2NotEnabledWhenLeader } -func enabledLevelString(enabledLevel EnabledWhenLeaderLevel) string { +func enabledLevelString(enabledLevel kvflowcontrol.V2EnabledWhenLeaderLevel) string { switch enabledLevel { - case NotEnabledWhenLeader: + case kvflowcontrol.V2NotEnabledWhenLeader: return "not-enabled" - case EnabledWhenLeaderV1Encoding: + case kvflowcontrol.V2EnabledWhenLeaderV1Encoding: return "v1-encoding" - case EnabledWhenLeaderV2Encoding: + case kvflowcontrol.V2EnabledWhenLeaderV2Encoding: return "v2-encoding" } return "unknown-level" diff --git a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go index bbac739698c2..1dc470b44c07 100644 --- a/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go +++ b/pkg/kv/kvserver/kvflowcontrol/testing_knobs.go @@ -23,6 +23,9 @@ type TestingKnobs struct { // OverrideTokenDeduction is used to override how many tokens are deducted // post-evaluation. OverrideTokenDeduction func() Tokens + // OverrideV2EnabledWhenLeaderLevel is used to override the level at which + // RACv2 is enabled when a replica is the leader. + OverrideV2EnabledWhenLeaderLevel func() V2EnabledWhenLeaderLevel } // TestingKnobsV1 are the testing knobs that appply to replication flow control diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d3189cb82718..9f9f6160a7ca 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -327,7 +328,7 @@ type Replica struct { // being applied to the state machine. bytesAccount logstore.BytesAccount - flowControlLevel replica_rac2.EnabledWhenLeaderLevel + flowControlLevel kvflowcontrol.V2EnabledWhenLeaderLevel // Scratch for populating RaftEvent for flowControlV2. msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message @@ -2526,13 +2527,6 @@ func (r *Replica) GetMutexForTesting() *ReplicaMutex { return &r.mu.ReplicaMutex } -func racV2EnabledWhenLeaderLevel( - ctx context.Context, st *cluster.Settings, -) replica_rac2.EnabledWhenLeaderLevel { - // TODO(sumeer): implement fully, once all the dependencies are implemented. - return replica_rac2.NotEnabledWhenLeader -} - // maybeEnqueueProblemRange will enqueue the replica for processing into the // replicate queue iff: // diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c03682ffedf5..0f69c1aa7114 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" @@ -224,7 +225,8 @@ func newUninitializedReplicaWithoutRaftGroup( makeStoreFlowControlHandleFactory(r.store), r.store.TestingKnobs().FlowControlTestingKnobs, ) - r.raftMu.flowControlLevel = racV2EnabledWhenLeaderLevel(r.raftCtx, store.cfg.Settings) + r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel( + r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs) r.raftMu.msgAppScratchForFlowControl = map[roachpb.ReplicaID][]raftpb.Message{} r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{ NodeID: store.NodeID(), diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 754c2be200c1..f8a6158e307c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -360,7 +360,7 @@ func (r *Replica) evalAndPropose( } func (r *Replica) encodePriorityForRACv2() bool { - return r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding + return r.flowControlV2.GetEnabledWhenLeader() == kvflowcontrol.V2EnabledWhenLeaderV2Encoding } // propose encodes a command, starts tracking it, and proposes it to Raft. @@ -833,11 +833,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Before doing anything, including calling Ready(), see if we need to // ratchet up the flow control level. This code will go away when RACv1 => // RACv2 transition is complete and RACv1 code is removed. - if r.raftMu.flowControlLevel < replica_rac2.EnabledWhenLeaderV2Encoding { + if r.raftMu.flowControlLevel < kvflowcontrol.V2EnabledWhenLeaderV2Encoding { // Not already at highest level. - level := racV2EnabledWhenLeaderLevel(ctx, r.store.cfg.Settings) + level := kvflowcontrol.GetV2EnabledWhenLeaderLevel( + ctx, r.store.ClusterSettings(), r.store.TestingKnobs().FlowControlTestingKnobs) if level > r.raftMu.flowControlLevel { - if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader { + if r.raftMu.flowControlLevel == kvflowcontrol.V2NotEnabledWhenLeader { func() { r.mu.Lock() defer r.mu.Unlock() @@ -1938,7 +1939,7 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { FromReplica: fromReplica, Message: msg, RangeStartKey: startKey, // usually nil - UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= replica_rac2.EnabledWhenLeaderV1Encoding, + UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority.