Skip to content

Commit

Permalink
roachtest: run perf tests with sql and kv mode
Browse files Browse the repository at this point in the history
Epic: none

Release note: none
  • Loading branch information
msbutler committed Oct 10, 2024
1 parent 6a88862 commit 904f0ed
Showing 1 changed file with 77 additions and 18 deletions.
95 changes: 77 additions & 18 deletions pkg/cmd/roachtest/tests/logical_data_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type LDRWorkload struct {
func registerLogicalDataReplicationTests(r registry.Registry) {
specs := []ldrTestSpec{
{
name: "ldr/kv0/workload=both/ingestion",
name: "ldr/kv0/workload=both/basic/immediate",
clusterSpec: multiClusterSpec{
leftNodes: 3,
rightNodes: 3,
Expand All @@ -87,10 +87,11 @@ func registerLogicalDataReplicationTests(r registry.Registry) {
spec.VolumeSize(100),
},
},
run: TestLDRBasic,
mode: ModeImmediate,
run: TestLDRBasic,
},
{
name: "ldr/kv0/workload=both/update_heavy",
name: "ldr/kv0/workload=both/basic/validated",
clusterSpec: multiClusterSpec{
leftNodes: 3,
rightNodes: 3,
Expand All @@ -101,7 +102,38 @@ func registerLogicalDataReplicationTests(r registry.Registry) {
spec.VolumeSize(100),
},
},
run: TestLDRUpdateHeavy,
mode: ModeValidated,
run: TestLDRBasic,
},
{
name: "ldr/kv0/workload=both/update_heavy/immediate",
clusterSpec: multiClusterSpec{
leftNodes: 3,
rightNodes: 3,
clusterOpts: []spec.Option{
spec.CPU(8),
spec.WorkloadNode(),
spec.WorkloadNodeCPU(8),
spec.VolumeSize(100),
},
},
mode: ModeImmediate,
run: TestLDRUpdateHeavy,
},
{
name: "ldr/kv0/workload=both/update_heavy/validated",
clusterSpec: multiClusterSpec{
leftNodes: 3,
rightNodes: 3,
clusterOpts: []spec.Option{
spec.CPU(8),
spec.WorkloadNode(),
spec.WorkloadNodeCPU(8),
spec.VolumeSize(100),
},
},
mode: ModeValidated,
run: TestLDRUpdateHeavy,
},
{
name: "ldr/kv0/workload=both/shutdown_node",
Expand Down Expand Up @@ -148,6 +180,7 @@ func registerLogicalDataReplicationTests(r registry.Registry) {
}

for _, sp := range specs {

r.Add(registry.TestSpec{
Name: sp.name,
Owner: registry.OwnerDisasterRecovery,
Expand All @@ -167,14 +200,15 @@ func registerLogicalDataReplicationTests(r registry.Registry) {
}
setup, cleanup := mc.Start(ctx, t)
defer cleanup()
sp.run(ctx, t, c, setup)

sp.run(ctx, t, c, setup, sp.mode)
},
})
}
}

func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {
func TestLDRBasic(
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode,
) {
duration := 15 * time.Minute
initRows := 1000
maxBlockBytes := 1024
Expand All @@ -196,7 +230,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul
tableName: "kv",
}

leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload)
leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode)

// Setup latency verifiers
maxExpectedLatency := 2 * time.Minute
Expand Down Expand Up @@ -241,7 +275,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul
}

func TestLDRSchemaChange(
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup,
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode,
) {
duration := 15 * time.Minute
if c.IsLocal() {
Expand All @@ -259,7 +293,7 @@ func TestLDRSchemaChange(
tableName: "kv",
}

leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload)
leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode)

// Setup latency verifiers
maxExpectedLatency := 2 * time.Minute
Expand Down Expand Up @@ -317,7 +351,7 @@ func TestLDRSchemaChange(
}

func TestLDRUpdateHeavy(
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup,
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode,
) {

duration := 10 * time.Minute
Expand All @@ -336,7 +370,7 @@ func TestLDRUpdateHeavy(
tableName: "usertable",
}

leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload)
leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode)

// Setup latency verifiers
maxExpectedLatency := 3 * time.Minute
Expand Down Expand Up @@ -381,7 +415,7 @@ func TestLDRUpdateHeavy(
}

func TestLDROnNodeShutdown(
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup,
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode,
) {

duration := 10 * time.Minute
Expand All @@ -400,7 +434,7 @@ func TestLDROnNodeShutdown(
tableName: "kv",
}

leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload)
leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode)

// Setup latency verifiers, remembering to account for latency spike from killing a node
maxExpectedLatency := 5 * time.Minute
Expand Down Expand Up @@ -477,7 +511,7 @@ func TestLDROnNodeShutdown(
// aim to keep the workload going on both sides and wait for reconciliation
// once the network partition has completed
func TestLDROnNetworkPartition(
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup,
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode,
) {
duration := 10 * time.Minute
if c.IsLocal() {
Expand All @@ -495,7 +529,7 @@ func TestLDROnNetworkPartition(
tableName: "kv",
}

leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload)
leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode)

monitor := c.NewMonitor(ctx, setup.CRDBNodes())
monitor.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -557,9 +591,29 @@ func getLogicalDataReplicationJobInfo(db *gosql.DB, jobID int) (jobInfo, error)
type ldrTestSpec struct {
name string
clusterSpec multiClusterSpec
run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup)
run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup, mode)
mode mode
}

type mode int

func (m mode) String() string {
switch m {
case ModeImmediate:
return "immediate"
case ModeValidated:
return "validated"
default:
return "default"
}
}

const (
Default = iota
ModeImmediate
ModeValidated
)

type multiClusterSpec struct {
clusterOpts []spec.Option

Expand Down Expand Up @@ -674,6 +728,7 @@ func setupLDR(
c cluster.Cluster,
setup multiClusterSetup,
ldrWorkload LDRWorkload,
mode mode,
) (int, int) {
c.Run(ctx,
option.WithNodes(setup.workloadNode),
Expand All @@ -690,9 +745,13 @@ func setupLDR(
setup.right.sysSQL.Exec(t, timestampQuery)

startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int {
options := ""
if mode.String() != "" {
options = fmt.Sprintf("WITH mode='%s'", mode)
}
targetDB.Exec(t, fmt.Sprintf("USE %s", dbName))
r := targetDB.QueryRow(t,
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s", tableName, tableName),
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s %s", tableName, tableName, options),
sourceURL,
)
var jobID int
Expand Down

0 comments on commit 904f0ed

Please sign in to comment.