From 1eda6110d5ae706844d7abe0e5af8b02cce4b0a2 Mon Sep 17 00:00:00 2001 From: Naveen Setlur Date: Tue, 6 Aug 2024 15:39:31 -0400 Subject: [PATCH 1/2] roachtest: Add an update-heavy LDR test As part of the effort to increase LDR test coverage, this test leverages the YCSB workload to stress LDR under very update heavy workloads. It required a little bit of refactoring as some of the test structures assumed a KV workload. Release note: none Epic: CRDB-40236 --- .../tests/logical_data_replication.go | 230 ++++++++++++++---- 1 file changed, 186 insertions(+), 44 deletions(-) diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 79a1db63ae52..3e842b17d01e 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -32,10 +32,50 @@ import ( "github.com/stretchr/testify/require" ) +type ycsbWorkload struct { + // workload is the YCSB workload letter e.g. a, b, ..., f. + workloadType string + // initRows is the number of records to pre-load into the user table. + initRows int + // waitDuration is the duration the workload should run for. + debugRunDuration time.Duration + // initSplits is the count of initial splits before resuming work + initSplits int +} + +func (ycsb ycsbWorkload) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { + cmd := roachtestutil.NewCommand(`./cockroach workload init ycsb`). + MaybeFlag(ycsb.initRows > 0, "insert-count", ycsb.initRows). + MaybeFlag(ycsb.initSplits > 0, "splits", ycsb.initSplits). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() +} + +func (ycsb ycsbWorkload) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { + cmd := roachtestutil.NewCommand(`./cockroach workload run ycsb`). + Option("tolerate-errors"). + Flag("workload", ycsb.workloadType). + MaybeFlag(ycsb.debugRunDuration > 0, "duration", ycsb.debugRunDuration). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() +} + +func (ycsb ycsbWorkload) runDriver( + workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, +) error { + return defaultWorkloadDriver(workloadCtx, setup, c, ycsb) +} + +type LDRWorkload struct { + workload streamingWorkload + dbName string + tableName string +} + func registerLogicalDataReplicationTests(r registry.Registry) { specs := []ldrTestSpec{ { - name: "ldr/kv0/workload=both/ingestion=both", + name: "ldr/kv0/workload=both/ingestion", clusterSpec: multiClusterSpec{ leftNodes: 3, rightNodes: 3, @@ -48,6 +88,20 @@ func registerLogicalDataReplicationTests(r registry.Registry) { }, run: TestLDRBasic, }, + { + name: "ldr/kv0/workload=both/update_heavy", + clusterSpec: multiClusterSpec{ + leftNodes: 3, + rightNodes: 3, + clusterOpts: []spec.Option{ + spec.CPU(8), + spec.WorkloadNode(), + spec.WorkloadNodeCPU(8), + spec.VolumeSize(100), + }, + }, + run: TestLDRUpdateHeavy, + }, { name: "ldr/kv0/workload=both/shutdown_node", clusterSpec: multiClusterSpec{ @@ -106,14 +160,23 @@ func registerLogicalDataReplicationTests(r registry.Registry) { } func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) { - kvWorkload := replicateKV{ - readPercent: 0, - debugRunDuration: 15 * time.Minute, - maxBlockBytes: 1024, - initRows: 1000, - initWithSplitAndScatter: true} + duration := 15 * time.Minute + if c.IsLocal() { + duration = 5 * time.Minute + } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, kvWorkload) + ldrWorkload := LDRWorkload{ + workload: replicateKV{ + readPercent: 0, + debugRunDuration: duration, + maxBlockBytes: 1024, + initRows: 1000, + initWithSplitAndScatter: true}, + dbName: "kv", + tableName: "kv", + } + + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) // Setup latency verifiers maxExpectedLatency := 2 * time.Minute @@ -146,7 +209,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul monitor.Go(func(ctx context.Context) error { defer close(workloadDoneCh) - return c.RunE(ctx, option.WithNodes(setup.workloadNode), kvWorkload.sourceRunCmd("system", setup.CRDBNodes())) + return c.RunE(ctx, option.WithNodes(setup.workloadNode), ldrWorkload.workload.sourceRunCmd("system", setup.CRDBNodes())) }) monitor.Wait() @@ -154,7 +217,71 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute) + VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) +} + +func TestLDRUpdateHeavy( + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, +) { + + duration := 10 * time.Minute + if c.IsLocal() { + duration = 3 * time.Minute + } + + ldrWorkload := LDRWorkload{ + workload: ycsbWorkload{ + workloadType: "A", + debugRunDuration: duration, + initRows: 1000, + initSplits: 1000, + }, + dbName: "ycsb", + tableName: "usertable", + } + + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + + // Setup latency verifiers + maxExpectedLatency := 3 * time.Minute + llv := makeLatencyVerifier("ldr-left", 0, maxExpectedLatency, t.L(), + getLogicalDataReplicationJobInfo, t.Status, false /* tolerateErrors */) + defer llv.maybeLogLatencyHist() + + rlv := makeLatencyVerifier("ldr-right", 0, maxExpectedLatency, t.L(), + getLogicalDataReplicationJobInfo, t.Status, false /* tolerateErrors */) + defer rlv.maybeLogLatencyHist() + + workloadDoneCh := make(chan struct{}) + debugZipFetcher := &sync.Once{} + + monitor := c.NewMonitor(ctx, setup.CRDBNodes()) + monitor.Go(func(ctx context.Context) error { + if err := llv.pollLatencyUntilJobSucceeds(ctx, setup.left.db, leftJobID, time.Second, workloadDoneCh); err != nil { + debugZipFetcher.Do(func() { getDebugZips(ctx, t, c, setup) }) + return err + } + return nil + }) + monitor.Go(func(ctx context.Context) error { + if err := rlv.pollLatencyUntilJobSucceeds(ctx, setup.right.db, rightJobID, time.Second, workloadDoneCh); err != nil { + debugZipFetcher.Do(func() { getDebugZips(ctx, t, c, setup) }) + return err + } + return nil + }) + + monitor.Go(func(ctx context.Context) error { + defer close(workloadDoneCh) + return c.RunE(ctx, option.WithNodes(setup.workloadNode), ldrWorkload.workload.sourceRunCmd("system", setup.CRDBNodes())) + }) + + monitor.Wait() + + llv.assertValid(t) + rlv.assertValid(t) + + VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDROnNodeShutdown( @@ -166,14 +293,18 @@ func TestLDROnNodeShutdown( duration = 3 * time.Minute } - kvWorkload := replicateKV{ - readPercent: 0, - debugRunDuration: duration, - maxBlockBytes: 1024, - initRows: 1000, - initWithSplitAndScatter: true} + ldrWorkload := LDRWorkload{ + workload: replicateKV{ + readPercent: 0, + debugRunDuration: duration, + maxBlockBytes: 1024, + initRows: 1000, + initWithSplitAndScatter: true}, + dbName: "kv", + tableName: "kv", + } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, kvWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) // Setup latency verifiers, remembering to account for latency spike from killing a node maxExpectedLatency := 5 * time.Minute @@ -206,11 +337,11 @@ func TestLDROnNodeShutdown( monitor.Go(func(ctx context.Context) error { defer close(workloadDoneCh) - return c.RunE(ctx, option.WithNodes(setup.workloadNode), kvWorkload.sourceRunCmd("system", setup.CRDBNodes())) + return c.RunE(ctx, option.WithNodes(setup.workloadNode), ldrWorkload.workload.sourceRunCmd("system", setup.CRDBNodes())) }) // Let workload run for a bit before we kill a node - time.Sleep(kvWorkload.debugRunDuration / 10) + time.Sleep(ldrWorkload.workload.(replicateKV).debugRunDuration / 10) findNodeToStop := func(info *clusterInfo, rng *rand.Rand) int { for { @@ -241,7 +372,7 @@ func TestLDROnNodeShutdown( } monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute) + VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } // TestLDROnNetworkPartition aims to see what happens when both clusters @@ -251,28 +382,31 @@ func TestLDROnNodeShutdown( func TestLDROnNetworkPartition( ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, ) { - duration := 10 * time.Minute if c.IsLocal() { duration = 3 * time.Minute } - kvWorkload := replicateKV{ - readPercent: 0, - debugRunDuration: duration, - maxBlockBytes: 1024, - initRows: 1000, - initWithSplitAndScatter: true} + ldrWorkload := LDRWorkload{ + workload: replicateKV{ + readPercent: 0, + debugRunDuration: duration, + maxBlockBytes: 1024, + initRows: 1000, + initWithSplitAndScatter: true}, + dbName: "kv", + tableName: "kv", + } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, kvWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) monitor := c.NewMonitor(ctx, setup.CRDBNodes()) monitor.Go(func(ctx context.Context) error { - return c.RunE(ctx, option.WithNodes(setup.workloadNode), kvWorkload.sourceRunCmd("system", setup.CRDBNodes())) + return c.RunE(ctx, option.WithNodes(setup.workloadNode), ldrWorkload.workload.sourceRunCmd("system", setup.CRDBNodes())) }) // Let workload run for a bit before we kill a node - time.Sleep(kvWorkload.debugRunDuration / 10) + time.Sleep(ldrWorkload.workload.(replicateKV).debugRunDuration / 10) failNodesLength := len(setup.CRDBNodes()) / 2 nodesToFail, err := setup.CRDBNodes().SeededRandList(setup.rng, failNodesLength) @@ -282,7 +416,7 @@ func TestLDROnNetworkPartition( // We're not using the entire blackholeFailer setup, so break the interface contract and use this directly blackholeFailer := &blackholeFailer{t: t, c: c, input: true, output: true} - disconnectDuration := kvWorkload.debugRunDuration / 5 + disconnectDuration := ldrWorkload.workload.(replicateKV).debugRunDuration / 5 t.L().Printf("Disconnecting nodes %v", nodesToFail) for _, nodeID := range nodesToFail { blackholeFailer.FailPartial(ctx, nodeID, setup.CRDBNodes()) @@ -297,7 +431,7 @@ func TestLDROnNetworkPartition( t.L().Printf("Nodes reconnected. Waiting for workload to complete") monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute) + VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } type ldrJobInfo struct { @@ -442,23 +576,26 @@ func setupLDR( t test.Test, c cluster.Cluster, setup multiClusterSetup, - kvWorkload replicateKV, + ldrWorkload LDRWorkload, ) (int, int) { c.Run(ctx, option.WithNodes(setup.workloadNode), - kvWorkload.sourceInitCmd("system", setup.left.nodes)) + ldrWorkload.workload.sourceInitCmd("system", setup.left.nodes)) c.Run(ctx, option.WithNodes(setup.workloadNode), - kvWorkload.sourceInitCmd("system", setup.right.nodes)) + ldrWorkload.workload.sourceInitCmd("system", setup.right.nodes)) + + dbName, tableName := ldrWorkload.dbName, ldrWorkload.tableName // Setup LDR-specific columns - setup.left.sysSQL.Exec(t, "ALTER TABLE kv.kv ADD COLUMN crdb_replication_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL") - setup.right.sysSQL.Exec(t, "ALTER TABLE kv.kv ADD COLUMN crdb_replication_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL") + timestampQuery := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN crdb_replication_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL", dbName, tableName) + setup.left.sysSQL.Exec(t, timestampQuery) + setup.right.sysSQL.Exec(t, timestampQuery) startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int { - targetDB.Exec(t, "USE kv") + targetDB.Exec(t, fmt.Sprintf("USE %s", dbName)) r := targetDB.QueryRow(t, - "CREATE LOGICAL REPLICATION STREAM FROM TABLE kv ON $1 INTO TABLE kv", + fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s", tableName, tableName), sourceURL, ) var jobID int @@ -466,8 +603,8 @@ func setupLDR( return jobID } - leftJobID := startLDR(setup.left.sysSQL, setup.right.PgURLForDatabase("kv")) - rightJobID := startLDR(setup.right.sysSQL, setup.left.PgURLForDatabase("kv")) + leftJobID := startLDR(setup.left.sysSQL, setup.right.PgURLForDatabase(dbName)) + rightJobID := startLDR(setup.right.sysSQL, setup.left.PgURLForDatabase(dbName)) // TODO(ssd): We wait for the replicated time to // avoid starting the workload here until we @@ -481,7 +618,11 @@ func setupLDR( } func VerifyCorrectness( - t test.Test, setup multiClusterSetup, leftJobID, rightJobID int, waitTime time.Duration, + t test.Test, + setup multiClusterSetup, + leftJobID, rightJobID int, + waitTime time.Duration, + ldrWorkload LDRWorkload, ) { t.L().Printf("Verifying left and right tables") now := timeutil.Now() @@ -493,8 +634,9 @@ func VerifyCorrectness( // this table while we are using in-row storage // for crdb_internal_mvcc_timestamp. var leftCount, rightCount int - setup.left.sysSQL.QueryRow(t, "SELECT count(1) FROM kv.kv").Scan(&leftCount) - setup.right.sysSQL.QueryRow(t, "SELECT count(1) FROM kv.kv").Scan(&rightCount) + selectQuery := fmt.Sprintf("SELECT count(1) FROM %s.%s", ldrWorkload.dbName, ldrWorkload.tableName) + setup.left.sysSQL.QueryRow(t, selectQuery).Scan(&leftCount) + setup.right.sysSQL.QueryRow(t, selectQuery).Scan(&rightCount) require.Equal(t, leftCount, rightCount) } From f3a7df262a36085602c270c10e0db76907e5939b Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Fri, 9 Aug 2024 10:31:25 -0400 Subject: [PATCH 2/2] roachtest: reduce max node count and metamorphic cloud Previously the max node count was (36 + 2) * 32 = 1216 which is more than the quota of 1024 that is allowed. This commit reduces the max count to 1024 and additionally changes the metamorphic tests to on average only run once per night (rather than once per cloud per night). The on average part is important since it might run multiple times in some nights and no times on other nights. Epic: none Release note: None --- .../tests/admission_control_latency.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index 56588c6fb1bf..74ec5eaf58ed 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -90,6 +90,7 @@ type variations struct { perturbation perturbation workload workloadType acceptableChange float64 + cloud registry.CloudSet // These fields are set up at the start of the test run cluster cluster.Cluster @@ -102,9 +103,10 @@ const NUM_REGIONS = 3 var durationOptions = []time.Duration{10 * time.Second, 10 * time.Minute, 30 * time.Minute} var splitOptions = []int{1, 100, 10000} var maxBlockBytes = []int{1, 1024, 4096} -var numNodes = []int{5, 12, 36} +var numNodes = []int{5, 12, 30} var numVCPUs = []int{4, 8, 16, 32} var numDisks = []int{1, 2} +var cloudSets = []registry.CloudSet{registry.OnlyAWS, registry.OnlyGCE, registry.OnlyAzure} var leases = []registry.LeaseType{ registry.EpochLeases, @@ -114,9 +116,10 @@ var leases = []registry.LeaseType{ func (v variations) String() string { return fmt.Sprintf("seed: %d, fillDuration: %s, maxBlockBytes: %d, perturbationDuration: %s, "+ "validationDuration: %s, ratioOfMax: %f, splits: %d, numNodes: %d, numWorkloadNodes: %d, "+ - "partitionSite: %t, vcpu: %d, disks: %d, leaseType: %s", v.seed, v.fillDuration, v.maxBlockBytes, + "partitionSite: %t, vcpu: %d, disks: %d, leaseType: %s, cloud: %v", + v.seed, v.fillDuration, v.maxBlockBytes, v.perturbationDuration, v.validationDuration, v.ratioOfMax, v.splits, v.numNodes, v.numWorkloadNodes, - v.partitionSite, v.vcpu, v.disks, v.leaseType) + v.partitionSite, v.vcpu, v.disks, v.leaseType, v.cloud) } // Normally a single worker can handle 20-40 nodes. If we find this is @@ -145,6 +148,7 @@ func setupMetamorphic(p perturbation) variations { v.disks = numDisks[rng.Intn(len(numDisks))] v.partitionSite = rng.Intn(2) == 0 v.cleanRestart = rng.Intn(2) == 0 + v.cloud = cloudSets[rng.Intn(len(cloudSets))] v.perturbation = p return v } @@ -166,6 +170,7 @@ func setupFull(p perturbation) variations { v.perturbationDuration = 10 * time.Minute v.ratioOfMax = 0.5 v.cleanRestart = true + v.cloud = registry.OnlyGCE v.perturbation = p return v } @@ -187,6 +192,7 @@ func setupDev(p perturbation) variations { v.perturbationDuration = 30 * time.Second v.ratioOfMax = 0.5 v.cleanRestart = true + v.cloud = registry.AllClouds v.perturbation = p return v } @@ -245,11 +251,12 @@ func addMetamorphic(r registry.Registry, p perturbation, acceptableChange float6 // a given seed. r.Add(registry.TestSpec{ Name: fmt.Sprintf("perturbation/metamorphic/%s", reflect.TypeOf(p).Name()), - CompatibleClouds: registry.AllClouds, + CompatibleClouds: v.cloud, Suites: registry.Suites(registry.Nightly), Owner: registry.OwnerKV, Cluster: v.makeClusterSpec(), Leases: v.leaseType, + Randomized: true, Run: v.runTest, }) } @@ -259,7 +266,7 @@ func addFull(r registry.Registry, p perturbation, acceptableChange float64) { v.acceptableChange = acceptableChange r.Add(registry.TestSpec{ Name: fmt.Sprintf("perturbation/full/%s", reflect.TypeOf(p).Name()), - CompatibleClouds: registry.OnlyGCE, + CompatibleClouds: v.cloud, Suites: registry.Suites(registry.Nightly), Owner: registry.OwnerKV, Cluster: v.makeClusterSpec(), @@ -273,7 +280,7 @@ func addDev(r registry.Registry, p perturbation, acceptableChange float64) { v.acceptableChange = acceptableChange r.Add(registry.TestSpec{ Name: fmt.Sprintf("perturbation/dev/%s", reflect.TypeOf(p).Name()), - CompatibleClouds: registry.AllClouds, + CompatibleClouds: v.cloud, Suites: registry.ManualOnly, Owner: registry.OwnerKV, Cluster: v.makeClusterSpec(),