From 202bb43c644a730d586bceb4220a130b395a9a08 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 15 Nov 2024 17:05:39 -0500 Subject: [PATCH] kvserver: add TestFlowControlSendQueueRangeSplitMerge test Add a new rac2 flow control integration test, `TestFlowControlSendQueueRangeSplitMerge`. This test takes the following steps: ```sql -- We will exhaust the tokens across all streams while admission is blocked on -- n3, using a single 4 MiB (deduction, the write itself is small) write. Then, -- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the -- LHS range, merge the ranges, and write a 1 MiB put to the merged range. We -- expect that at each stage where a send queue develops n1->s3, the send queue -- will be flushed by the range merge and range split range operations.``sql ``` Note that the RHS is not written to post-split, pre-merge. See the relevant comments, this will be resolved via #136649, or some variation, which enforces the timely replication on subsume requests. Also note that epoch leases are used in the test, the added todo describes why this is currently necessary and an intention to re-enable them. Part of: #132614 Release note: None --- .../kvserver/flow_control_integration_test.go | 212 ++++++++++++++- .../send_queue_range_split_merge | 250 ++++++++++++++++++ 2 files changed, 461 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 924b52b9e710..86d7c2193cf6 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -5676,6 +5676,216 @@ func TestFlowControlSendQueueRangeMigrate(t *testing.T) { h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) } +// TestFlowControlSendQueueRangeSplitMerge exercises the send queue formation, +// prevention and force flushing due to range split and merge operations. See +// the initial comment for an overview of the test structure. +func TestFlowControlSendQueueRangeSplitMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) + // We want to exhaust tokens but not overload the test, so we set the limits + // lower (8 and 16 MiB default). + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20) + // TODO(kvoli): There are unexpected messages popping up, which cause a send + // queue to be created on the RHS range post-split. This appears related to + // leader leases, or at least disablng them deflakes the test. We should + // re-enable leader leases likely by adjusting the test to ignore the 500b + // send queue formatiion: + // + // r3=(is_state_replicate=true has_send_queue=true send_queue_size=500 B / 1 entries + // [idx_to_send=12 next_raft_idx=13 next_raft_idx_initial=13 force_flush_stop_idx=0]) + // + // See #136258 for more debug info. + kvserver.OverrideDefaultLeaseType(ctx, &settings.SV, roachpb.LeaseEpoch) + disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes) + setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) { + for _, serverIdx := range serverIdxs { + disableWorkQueueGrantingServers[serverIdx].Store(!enabled) + } + } + + argsPerServer := make(map[int]base.TestServerArgs) + for i := range disableWorkQueueGrantingServers { + disableWorkQueueGrantingServers[i].Store(true) + argsPerServer[i] = base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // Deduct every write as 1 MiB, regardless of how large it + // actually is. + return kvflowcontrol.Tokens(1 << 20) + }, + // We want to test the behavior of the send queue, so we want to + // always have up-to-date stats. This ensures that the send queue + // stats are always refreshed on each call to + // RangeController.HandleRaftEventRaftMuLocked. + OverrideAlwaysRefreshSendStreamStats: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + idx := i + return disableWorkQueueGrantingServers[idx].Load() + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: argsPerServer, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close("send_queue_range_split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) + + // TODO(kvoli): Update this comment to also mention writing to the RHS range, + // once we resolve the subsume wait for application issue. See #136649. + h.comment(` +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then, +-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the +-- LHS range, merge the ranges, and write a 1 MiB put to the merged range. We +-- expect that at each stage where a send queue develops n1->s3, the send queue +-- will be flushed by the range merge and range split range operations.`) + h.comment(` +-- Start by exhausting the tokens from n1->s3 and blocking admission on s3. +-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`) + setTokenReturnEnabled(true /* enabled */, 0, 1) + setTokenReturnEnabled(false /* enabled */, 2) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + + h.comment(`(Sending 1 MiB put request to pre-split range)`) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to pre-split range)`) + + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + h.comment(` +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */) + h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, left.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, right.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + h.comment(` +-- Send queue and flow token metrics from n1, post-split. +-- We expect to see a force flush of the send queue for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`(Sending 1 MiB put request to post-split LHS range)`) + h.put(ctx, roachpb.Key(left.StartKey), 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to post-split LHS range)`) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + + // TODO(kvoli): Uncomment once we resolve the subsume wait for application + // issue. See #136649. + // h.comment(`(Sending 1 MiB put request to post-split RHS range)`) + // h.put(ctx, roachpb.Key(right.StartKey), 1, admissionpb.NormalPri) + // h.comment(`(Sent 1 MiB put request to post-split RHS range)`) + // h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + // testingMkFlowStream(0), testingMkFlowStream(1)) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split and 1 MiB put on +-- each side.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + h.waitForConnectedStreams(ctx, merged.RangeID, 3, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, merged.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split-merge. +-- We expect to see a force flush of the send queue for s3 again.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`(Sending 1 MiB put request to post-split-merge range)`) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to post-split-merge range)`) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, merged.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put. +-- We expect to see the send queue develop for s3 again.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`) + setTokenReturnEnabled(true /* enabled */, 0, 1, 2) + + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.comment(` +-- Send queue and flow token metrics from n1, all tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster @@ -5788,7 +5998,7 @@ func (h *flowControlTestHelper) checkSendQueueSize( h.tc.GetFirstStoreFromServer(h.t, serverIdx).GetReplicaIfExists(rangeID).SendStreamStats(&stats) _, sizeBytes := stats.SumSendQueues() if sizeBytes != expSize { - return errors.Errorf("expected send queue size %d, got %d [%v]", expSize, sizeBytes, stats) + return errors.Errorf("expected send queue size %d, got %d [%v]", expSize, sizeBytes, &stats) } return nil } diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge new file mode 100644 index 000000000000..8c89bc52dd7b --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge @@ -0,0 +1,250 @@ +echo +---- +---- +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then, +-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the +-- LHS range, merge the ranges, and write a 1 MiB put to the merged range. We +-- expect that at each stage where a send queue develops n1->s3, the send queue +-- will be flushed by the range merge and range split range operations. + + +-- Start by exhausting the tokens from n1->s3 and blocking admission on s3. +-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.) + + +(Sending 1 MiB put request to pre-split range) + + +(Sent 1 MiB put request to pre-split range) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + + +-- (Splitting range.) + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 74 | 3 + 75 | 3 + + +-- Send queue and flow token metrics from n1, post-split. +-- We expect to see a force flush of the send queue for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -3.0 MiB + + +(Sending 1 MiB put request to post-split LHS range) + + +(Sent 1 MiB put request to post-split LHS range) + + +-- Send queue and flow token metrics from n1, post-split and 1 MiB put on +-- each side. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -4.0 MiB | 0 B | -3.0 MiB + + +-- (Merging ranges.) + + +-- Send queue and flow token metrics from n1, post-split-merge. +-- We expect to see a force flush of the send queue for s3 again. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -4.0 MiB | 0 B | -4.0 MiB + + +(Sending 1 MiB put request to post-split-merge range) + + +(Sent 1 MiB put request to post-split-merge range) + + +-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put. +-- We expect to see the send queue develop for s3 again. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -5.0 MiB | 0 B | -4.0 MiB + + +-- (Allowing below-raft admission to proceed on [n1,n2,n3].) + + +-- Send queue and flow token metrics from n1, all tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql