Skip to content

Commit

Permalink
kvserver: add TestFlowControlSendQueueRangeSplitMerge test
Browse files Browse the repository at this point in the history
Add a new rac2 flow control integration test,
`TestFlowControlSendQueueRangeSplitMerge`.

This test takes the following steps:

```sql
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the
-- LHS range, merge the ranges, and write a 1 MiB put to the merged range. We
-- expect that at each stage where a send queue develops n1->s3, the send queue
-- will be flushed by the range merge and range split range operations.``sql
```

Note that the RHS is not written to post-split, pre-merge. See the
relevant comments, this will be resolved via #136649, or some variation,
which enforces the timely replication on subsume requests.

Also note that epoch leases are used in the test, the added todo
describes why this is currently necessary and an intention to re-enable
them.

Part of: #132614
Release note: None
  • Loading branch information
kvoli committed Dec 10, 2024
1 parent 790ae95 commit 202bb43
Show file tree
Hide file tree
Showing 2 changed files with 461 additions and 1 deletion.
212 changes: 211 additions & 1 deletion pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5676,6 +5676,216 @@ func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

// TestFlowControlSendQueueRangeSplitMerge exercises the send queue formation,
// prevention and force flushing due to range split and merge operations. See
// the initial comment for an overview of the test structure.
func TestFlowControlSendQueueRangeSplitMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
// TODO(kvoli): There are unexpected messages popping up, which cause a send
// queue to be created on the RHS range post-split. This appears related to
// leader leases, or at least disablng them deflakes the test. We should
// re-enable leader leases likely by adjusting the test to ignore the 500b
// send queue formatiion:
//
// r3=(is_state_replicate=true has_send_queue=true send_queue_size=500 B / 1 entries
// [idx_to_send=12 next_raft_idx=13 next_raft_idx_initial=13 force_flush_stop_idx=0])
//
// See #136258 for more debug info.
kvserver.OverrideDefaultLeaseType(ctx, &settings.SV, roachpb.LeaseEpoch)
disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_split_merge")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

// TODO(kvoli): Update this comment to also mention writing to the RHS range,
// once we resolve the subsume wait for application issue. See #136649.
h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the
-- LHS range, merge the ranges, and write a 1 MiB put to the merged range. We
-- expect that at each stage where a send queue develops n1->s3, the send queue
-- will be flushed by the range merge and range split range operations.`)
h.comment(`
-- Start by exhausting the tokens from n1->s3 and blocking admission on s3.
-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`)
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to pre-split range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to pre-split range)`)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Splitting range.)`)
left, right := tc.SplitRangeOrFatal(t, k.Next())
h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */)
h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, left.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, right.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`-- Observe the newly split off replica, with its own three streams.`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")
h.comment(`
-- Send queue and flow token metrics from n1, post-split.
-- We expect to see a force flush of the send queue for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split LHS range)`)
h.put(ctx, roachpb.Key(left.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split LHS range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

// TODO(kvoli): Uncomment once we resolve the subsume wait for application
// issue. See #136649.
// h.comment(`(Sending 1 MiB put request to post-split RHS range)`)
// h.put(ctx, roachpb.Key(right.StartKey), 1, admissionpb.NormalPri)
// h.comment(`(Sent 1 MiB put request to post-split RHS range)`)
// h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
// testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue and flow token metrics from n1, post-split and 1 MiB put on
-- each side.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Merging ranges.)`)
merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey())
h.waitForConnectedStreams(ctx, merged.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, merged.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge.
-- We expect to see a force flush of the send queue for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split-merge range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split-merge range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, merged.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put.
-- We expect to see the send queue develop for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1, all tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down Expand Up @@ -5788,7 +5998,7 @@ func (h *flowControlTestHelper) checkSendQueueSize(
h.tc.GetFirstStoreFromServer(h.t, serverIdx).GetReplicaIfExists(rangeID).SendStreamStats(&stats)
_, sizeBytes := stats.SumSendQueues()
if sizeBytes != expSize {
return errors.Errorf("expected send queue size %d, got %d [%v]", expSize, sizeBytes, stats)
return errors.Errorf("expected send queue size %d, got %d [%v]", expSize, sizeBytes, &stats)
}
return nil
}
Expand Down
Loading

0 comments on commit 202bb43

Please sign in to comment.