Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129378: stats: add support for most common values in histograms r=rytaft a=rytaft

Fixes cockroachdb#71828

Release note (sql change): Added a new cluster setting to control
whether most common values are collected as part of histogram collection
for use by the optimizer. The setting is called
`sql.stats.histogram_buckets.include_most_common_values.enabled`. When enabled,
the histogram collection logic will ensure that the most common sampled values
are represented as histogram bucket upper bounds. Since histograms in
CockroachDB track the number of elements equal to the upper bound in addition
to the number of elements less, this allows the optimizer to identify the most
common values in the histogram and better estimate the rows processed by a
query plan. To set the number of most common values to include in a histogram,
a second setting `sql.stats.histogram_buckets.max_fraction_most_common_values`
was added. Currently, the default is 0.1, or 10% of the number of buckets. So
with a 200 bucket histogram, by default, at most 20 buckets may be adjusted to
include a most common value (or "heavy hitter") as the upper bound.


129521: kvserver: wiring of RACv2 to eval, and v1 => v2 transition code r=kvoli a=sumeerbhola

The replicaFlowControlIntegration object for v1, which is a member of Replica.mu, is destroyed and replaced by a noop implementation.

The admissionDemuxHandle implements the new ReplicationAdmissionHandle and handles a switch from v1 to v2 during the wait.

At raft log entry encoding time, for the entries that were subject to replication admission control, we look at the latest value of the EnabledWhenLeaderLevel to decide whether we can use the v2 encoding.

Fixes cockroachdb#129129
Fixes cockroachdb#128756

Epic: CRDB-37515

Release note: None

Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Aug 26, 2024
3 parents 1436a84 + f931e69 + f387afe commit 81f6a82
Show file tree
Hide file tree
Showing 14 changed files with 557 additions and 29 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ sql.stats.forecasts.max_decrease float 0.3333333333333333 the most a prediction
sql.stats.forecasts.min_goodness_of_fit float 0.95 the minimum R² (goodness of fit) measurement required from all predictive models to use a forecast application
sql.stats.forecasts.min_observations integer 3 the mimimum number of observed statistics required to produce a statistics forecast application
sql.stats.histogram_buckets.count integer 200 maximum number of histogram buckets to build during table statistics collection application
sql.stats.histogram_buckets.include_most_common_values.enabled boolean true whether to include most common values as histogram buckets application
sql.stats.histogram_buckets.max_fraction_most_common_values float 0.1 maximum fraction of histogram buckets to use for most common values application
sql.stats.histogram_collection.enabled boolean true histogram collection mode application
sql.stats.histogram_samples.count integer 0 number of rows sampled for histogram construction during table statistics collection. Not setting this or setting a value of 0 means that a reasonable sample size will be automatically picked based on the table size. application
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode application
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@
<tr><td><div id="setting-sql-stats-forecasts-min-goodness-of-fit" class="anchored"><code>sql.stats.forecasts.min_goodness_of_fit</code></div></td><td>float</td><td><code>0.95</code></td><td>the minimum R² (goodness of fit) measurement required from all predictive models to use a forecast</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-forecasts-min-observations" class="anchored"><code>sql.stats.forecasts.min_observations</code></div></td><td>integer</td><td><code>3</code></td><td>the mimimum number of observed statistics required to produce a statistics forecast</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-histogram-buckets-count" class="anchored"><code>sql.stats.histogram_buckets.count</code></div></td><td>integer</td><td><code>200</code></td><td>maximum number of histogram buckets to build during table statistics collection</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-histogram-buckets-include-most-common-values-enabled" class="anchored"><code>sql.stats.histogram_buckets.include_most_common_values.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>whether to include most common values as histogram buckets</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-histogram-buckets-max-fraction-most-common-values" class="anchored"><code>sql.stats.histogram_buckets.max_fraction_most_common_values</code></div></td><td>float</td><td><code>0.1</code></td><td>maximum fraction of histogram buckets to use for most common values</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-histogram-collection-enabled" class="anchored"><code>sql.stats.histogram_collection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-histogram-samples-count" class="anchored"><code>sql.stats.histogram_samples.count</code></div></td><td>integer</td><td><code>0</code></td><td>number of rows sampled for histogram construction during table statistics collection. Not setting this or setting a value of 0 means that a reasonable sample size will be automatically picked based on the table size.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-stats-multi-column-collection-enabled" class="anchored"><code>sql.stats.multi_column_collection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>multi-column statistics collection mode</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,22 @@ func (f *replicaFlowControlIntegrationImpl) clearState(ctx context.Context) {
f.disconnectedStreams = nil
}

type noopReplicaFlowControlIntegration struct{}

func (n noopReplicaFlowControlIntegration) onBecameLeader(context.Context) {}
func (n noopReplicaFlowControlIntegration) onBecameFollower(context.Context) {}
func (n noopReplicaFlowControlIntegration) onDescChanged(context.Context) {}
func (n noopReplicaFlowControlIntegration) onFollowersPaused(context.Context) {}
func (n noopReplicaFlowControlIntegration) onRaftTransportDisconnected(
context.Context, ...roachpb.StoreID,
) {
}
func (n noopReplicaFlowControlIntegration) onRaftTicked(context.Context) {}
func (n noopReplicaFlowControlIntegration) onDestroyed(context.Context) {}
func (n noopReplicaFlowControlIntegration) handle() (kvflowcontrol.Handle, bool) {
return nil, false
}

type replicaForRACv2 Replica

var _ replica_rac2.Replica = &replicaForRACv2{}
Expand Down
100 changes: 93 additions & 7 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ package kvserver

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -53,6 +55,14 @@ func (sh *storesForFlowControl) Lookup(
return handle, found
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storesForFlowControl) LookupReplicationAdmissionHandle(
rangeID roachpb.RangeID,
) (kvflowcontrol.ReplicationAdmissionHandle, bool) {
return sh.Lookup(rangeID)
}

// Inspect is part of the StoresForFlowControl interface.
func (sh *storesForFlowControl) Inspect() []roachpb.RangeID {
ls := (*Stores)(sh)
Expand Down Expand Up @@ -110,21 +120,57 @@ func makeStoreForFlowControl(store *Store) *storeForFlowControl {
func (sh *storeForFlowControl) Lookup(
rangeID roachpb.RangeID,
) (_ kvflowcontrol.Handle, found bool) {
s := (*Store)(sh)
repl := s.GetReplicaIfExists(rangeID)
repl := sh.lookupReplica(rangeID)
if repl == nil {
return nil, false
}
repl.mu.Lock()
defer repl.mu.Unlock()
return repl.mu.replicaFlowControlIntegration.handle()
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storeForFlowControl) LookupReplicationAdmissionHandle(
rangeID roachpb.RangeID,
) (kvflowcontrol.ReplicationAdmissionHandle, bool) {
repl := sh.lookupReplica(rangeID)
if repl == nil {
return nil, false
}
// NB: Admit is called soon after this lookup.
level := repl.flowControlV2.GetEnabledWhenLeader()
useV1 := level == replica_rac2.NotEnabledWhenLeader
var v1Handle kvflowcontrol.ReplicationAdmissionHandle
if useV1 {
repl.mu.Lock()
var found bool
v1Handle, found = repl.mu.replicaFlowControlIntegration.handle()
repl.mu.Unlock()
if !found {
return nil, found
}
}
// INVARIANT: useV1 => v1Handle was found.
return admissionDemuxHandle{
v1Handle: v1Handle,
r: repl,
useV1: useV1,
}, true
}

func (sh *storeForFlowControl) lookupReplica(rangeID roachpb.RangeID) *Replica {
s := (*Store)(sh)
repl := s.GetReplicaIfExists(rangeID)
if repl == nil {
return nil
}
if knobs := s.TestingKnobs().FlowControlTestingKnobs; knobs != nil &&
knobs.UseOnlyForScratchRanges &&
!repl.IsScratchRange() {
return nil, false
return nil
}

repl.mu.Lock()
defer repl.mu.Unlock()
return repl.mu.replicaFlowControlIntegration.handle()
return repl
}

// ResetStreams is part of the StoresForFlowControl interface.
Expand Down Expand Up @@ -208,6 +254,14 @@ func (l NoopStoresFlowControlIntegration) Lookup(roachpb.RangeID) (kvflowcontrol
return nil, false
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (l NoopStoresFlowControlIntegration) LookupReplicationAdmissionHandle(
rangeID roachpb.RangeID,
) (kvflowcontrol.ReplicationAdmissionHandle, bool) {
return l.Lookup(rangeID)
}

// ResetStreams is part of the StoresForFlowControl interface.
func (l NoopStoresFlowControlIntegration) ResetStreams(context.Context) {
}
Expand Down Expand Up @@ -298,3 +352,35 @@ func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
s.scheduler.EnqueueRACv2PiggybackAdmitted(m.RangeID)
}
}

type admissionDemuxHandle struct {
v1Handle kvflowcontrol.ReplicationAdmissionHandle
r *Replica
useV1 bool
}

// Admit implements kvflowcontrol.ReplicationAdmissionHandle.
func (h admissionDemuxHandle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) (admitted bool, err error) {
if h.useV1 {
admitted, err = h.v1Handle.Admit(ctx, pri, ct)
if err != nil {
return admitted, err
}
// It is possible a transition from v1 => v2 happened while waiting, which
// can cause either value of admitted. See the comment in
// ReplicationAdmissionHandle.
level := h.r.flowControlV2.GetEnabledWhenLeader()
if level == replica_rac2.NotEnabledWhenLeader {
return admitted, err
}
// Transition from v1 => v2 happened while waiting. Fall through to wait
// on v2, since it is possible that nothing was waited on, or the
// overloaded stream was not waited on. This double wait is acceptable
// since during the transition from v1 => v2 only elastic work should be
// subject to replication AC, and we would like to err towards not
// overloading.
}
return h.r.flowControlV2.AdmitForEval(ctx, pri, ct)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (n *controllerImpl) AdmitKVWork(
var admitted bool
attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV)
if attemptFlowControl && !bypassAdmission {
kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID)
kvflowHandle, found := n.kvflowHandles.LookupReplicationAdmissionHandle(ba.RangeID)
if !found {
return Handle{}, nil
}
Expand Down
37 changes: 29 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ type Controller interface {
// See I2, I3a and [^7] in kvflowcontrol/doc.go.
}

// ReplicationAdmissionHandle abstracts waiting for admission across RACv1 and RACv2.
type ReplicationAdmissionHandle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are flow
// tokens available for connected streams. This returns true if the request
// was admitted through flow control. Ignore the first return type if err !=
// nil. admitted == false && err == nil is a valid return, when something
// caused the callee to not care whether flow tokens were available. This
// can happen for at least the following reasons:
// - Configuration specifies the given WorkPriority is not subject to
// replication AC.
// - The callee doesn't think it is the leader or has been closed/destroyed.
//
// The latter can happen in the midst of a transition from RACv1 => RACv2.
// In this case if the callee waited on at least one connectedStream and was
// admitted, it will return (true, nil). This includes the case where the
// connectedStream was closed while waiting. If there were no
// connectedStreams (because they were already closed) it will return
// (false, nil).
Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error)
}

// Handle is used to interface with replication flow control; it's typically
// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas
// initiating replication traffic, i.e. are both the leaseholder and raft
Expand All @@ -195,14 +217,7 @@ type Controller interface {
// given priority, takes log position into account -- see
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority and create-time. This blocks until there are flow tokens
// available for all connected streams. This returns true if the request was
// admitted through flow control. Ignore the first return type if err != nil.
// admitted == false && err == nil is a valid return, when something (e.g.
// configuration) caused the callee to not care whether flow tokens were
// available.
Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error)
ReplicationAdmissionHandle
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
Expand Down Expand Up @@ -285,6 +300,12 @@ type Handles interface {
// part of #95563.
//
// Iterate(roachpb.StoreID, func(context.Context, Handle, Stream))

// LookupReplicationAdmissionHandle looks up the ReplicationAdmissionHandle
// for the specific range (or rather, the replica of the specific range
// that's locally held). The bool is false if no handle was found, in which
// case the caller must use the pre-replication-admission-control path.
LookupReplicationAdmissionHandle(roachpb.RangeID) (ReplicationAdmissionHandle, bool)
}

// HandleFactory is used to construct new Handles.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ func (d dummyHandles) Lookup(id roachpb.RangeID) (kvflowcontrol.Handle, bool) {
return nil, false
}

func (d dummyHandles) LookupReplicationAdmissionHandle(
rangeID roachpb.RangeID,
) (kvflowcontrol.ReplicationAdmissionHandle, bool) {
return d.Lookup(rangeID)
}

func (d dummyHandles) ResetStreams(ctx context.Context) {}

func (d dummyHandles) Inspect() []roachpb.RangeID {
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ type Processor interface {
AdmittedLogEntry(
ctx context.Context, state EntryForAdmissionCallbackState,
)

// AdmitForEval is called to admit work that wants to evaluate at the
// leaseholder.
//
// If the callee decided not to admit because replication admission
// control is disabled, or for any other reason, admitted will be false
// and error will be nil.
AdmitForEval(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) (admitted bool, err error)
}

type processorImpl struct {
Expand Down Expand Up @@ -895,6 +904,14 @@ func (p *processorImpl) AdmittedLogEntry(
}
}

// AdmitForEval implements Processor.
func (p *processorImpl) AdmitForEval(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) (admitted bool, err error) {
// TODO(sumeer):
return false, nil
}

func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool {
for i := range prev {
if prev[i] < next[i] {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ type Replica struct {
// both the leaseholder and raft leader.
//
// Accessing it requires Replica.mu to be held, exclusively.
//
// There is a one-way transition from RACv1 => RACv2 that causes the
// existing real implementation to be destroyed and replaced with a real
// implementation.
replicaFlowControlIntegration replicaFlowControlIntegration
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ func (r *Replica) evalAndPropose(
return proposalCh, abandon, idKey, writeBytes, nil
}

func (r *Replica) encodePriorityForRACv2() bool {
return r.flowControlV2.GetEnabledWhenLeader() == replica_rac2.EnabledWhenLeaderV2Encoding
}

// propose encodes a command, starts tracking it, and proposes it to Raft.
//
// The method hands ownership of the command over to the Raft machinery. After
Expand Down Expand Up @@ -409,7 +413,7 @@ func (r *Replica) propose(
data, err := raftlog.EncodeCommand(ctx, p.command, p.idKey,
raftlog.EncodeOptions{
RaftAdmissionMeta: raftAdmissionMeta,
EncodePriority: false,
EncodePriority: r.encodePriorityForRACv2(),
})
if err != nil {
return kvpb.NewError(err)
Expand Down Expand Up @@ -813,9 +817,19 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Not already at highest level.
level := racV2EnabledWhenLeaderLevel(ctx, r.store.cfg.Settings)
if level > r.raftMu.flowControlLevel {
// TODO(sumeer): close RACv1 leader stuff.
// if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader {
// }
if r.raftMu.flowControlLevel == replica_rac2.NotEnabledWhenLeader {
func() {
r.mu.Lock()
defer r.mu.Unlock()
// This will close all connected streams and consequently all
// requests waiting on v1 kvflowcontrol.ReplicationAdmissionHandles
// will return.
r.mu.replicaFlowControlIntegration.onDestroyed(ctx)
// Replace with a noop integration since want no code to execute on
// various calls.
r.mu.replicaFlowControlIntegration = noopReplicaFlowControlIntegration{}
}()
}
r.raftMu.flowControlLevel = level
r.flowControlV2.SetEnabledWhenLeaderRaftMuLocked(level)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/stats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"histogram.go",
"json.go",
"merge.go",
"most_common_values.go",
"new_stat.go",
"quantile.go",
"row_sampling.go",
Expand Down Expand Up @@ -49,6 +50,7 @@ go_library(
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/util/cache",
"//pkg/util/container/heap",
"//pkg/util/encoding",
"//pkg/util/errorutil",
"//pkg/util/hlc",
Expand Down
Loading

0 comments on commit 81f6a82

Please sign in to comment.