diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index 847b02a8181e..42b6731903a4 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -86,7 +86,6 @@ go_library( "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_errors//issuelink", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 4ab0c236528d..5a3377183295 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -38,7 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/cockroachdb/errors/issuelink" ) func init() { @@ -94,7 +93,7 @@ func createLogicalReplicationStreamPlanHook( } if stmt.From.Database != "" { - return errors.UnimplementedErrorf(issuelink.IssueLink{}, "logical replication streams on databases are unsupported") + return errors.UnimplementedErrorf(errors.IssueLink{}, "logical replication streams on databases are unsupported") } if len(stmt.From.Tables) != len(stmt.Into.Tables) { return pgerror.New(pgcode.InvalidParameterValue, "the same number of source and destination tables must be specified") diff --git a/pkg/ccl/crosscluster/streamclient/BUILD.bazel b/pkg/ccl/crosscluster/streamclient/BUILD.bazel index c1b29423b012..e4a89f7186ca 100644 --- a/pkg/ccl/crosscluster/streamclient/BUILD.bazel +++ b/pkg/ccl/crosscluster/streamclient/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "@com_github_golang_snappy//:snappy", "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgx_v4//:pgx", - "@com_github_pkg_errors//:errors", ], ) diff --git a/pkg/ccl/crosscluster/streamclient/client_helpers.go b/pkg/ccl/crosscluster/streamclient/client_helpers.go index ff73c52f4686..3789bda677c6 100644 --- a/pkg/ccl/crosscluster/streamclient/client_helpers.go +++ b/pkg/ccl/crosscluster/streamclient/client_helpers.go @@ -11,9 +11,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" "github.com/golang/snappy" "github.com/jackc/pgx/v4" - "github.com/pkg/errors" ) func subscribeInternal( diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 82ec7674dd50..1ed5f3b261e2 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -71,7 +71,7 @@ type LDRWorkload struct { func registerLogicalDataReplicationTests(r registry.Registry) { specs := []ldrTestSpec{ { - name: "ldr/kv0/workload=both/ingestion", + name: "ldr/kv0/workload=both/basic/immediate", clusterSpec: multiClusterSpec{ leftNodes: 3, rightNodes: 3, @@ -82,10 +82,11 @@ func registerLogicalDataReplicationTests(r registry.Registry) { spec.VolumeSize(100), }, }, - run: TestLDRBasic, + mode: ModeImmediate, + run: TestLDRBasic, }, { - name: "ldr/kv0/workload=both/update_heavy", + name: "ldr/kv0/workload=both/basic/validated", clusterSpec: multiClusterSpec{ leftNodes: 3, rightNodes: 3, @@ -96,7 +97,38 @@ func registerLogicalDataReplicationTests(r registry.Registry) { spec.VolumeSize(100), }, }, - run: TestLDRUpdateHeavy, + mode: ModeValidated, + run: TestLDRBasic, + }, + { + name: "ldr/kv0/workload=both/update_heavy/immediate", + clusterSpec: multiClusterSpec{ + leftNodes: 3, + rightNodes: 3, + clusterOpts: []spec.Option{ + spec.CPU(8), + spec.WorkloadNode(), + spec.WorkloadNodeCPU(8), + spec.VolumeSize(100), + }, + }, + mode: ModeImmediate, + run: TestLDRUpdateHeavy, + }, + { + name: "ldr/kv0/workload=both/update_heavy/validated", + clusterSpec: multiClusterSpec{ + leftNodes: 3, + rightNodes: 3, + clusterOpts: []spec.Option{ + spec.CPU(8), + spec.WorkloadNode(), + spec.WorkloadNodeCPU(8), + spec.VolumeSize(100), + }, + }, + mode: ModeValidated, + run: TestLDRUpdateHeavy, }, { name: "ldr/kv0/workload=both/shutdown_node", @@ -143,6 +175,7 @@ func registerLogicalDataReplicationTests(r registry.Registry) { } for _, sp := range specs { + r.Add(registry.TestSpec{ Name: sp.name, Owner: registry.OwnerDisasterRecovery, @@ -162,14 +195,15 @@ func registerLogicalDataReplicationTests(r registry.Registry) { } setup, cleanup := mc.Start(ctx, t) defer cleanup() - sp.run(ctx, t, c, setup) - + sp.run(ctx, t, c, setup, sp.mode) }, }) } } -func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) { +func TestLDRBasic( + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, +) { duration := 15 * time.Minute initRows := 1000 maxBlockBytes := 1024 @@ -191,7 +225,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 2 * time.Minute @@ -236,7 +270,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul } func TestLDRSchemaChange( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 15 * time.Minute if c.IsLocal() { @@ -254,7 +288,7 @@ func TestLDRSchemaChange( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 2 * time.Minute @@ -312,7 +346,7 @@ func TestLDRSchemaChange( } func TestLDRUpdateHeavy( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute @@ -331,7 +365,7 @@ func TestLDRUpdateHeavy( tableName: "usertable", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 3 * time.Minute @@ -376,7 +410,7 @@ func TestLDRUpdateHeavy( } func TestLDROnNodeShutdown( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute @@ -395,7 +429,7 @@ func TestLDROnNodeShutdown( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers, remembering to account for latency spike from killing a node maxExpectedLatency := 5 * time.Minute @@ -472,7 +506,7 @@ func TestLDROnNodeShutdown( // aim to keep the workload going on both sides and wait for reconciliation // once the network partition has completed func TestLDROnNetworkPartition( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute if c.IsLocal() { @@ -490,7 +524,7 @@ func TestLDROnNetworkPartition( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) monitor := c.NewMonitor(ctx, setup.CRDBNodes()) monitor.Go(func(ctx context.Context) error { @@ -552,9 +586,29 @@ func getLogicalDataReplicationJobInfo(db *gosql.DB, jobID int) (jobInfo, error) type ldrTestSpec struct { name string clusterSpec multiClusterSpec - run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup) + run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup, mode) + mode mode +} + +type mode int + +func (m mode) String() string { + switch m { + case ModeImmediate: + return "immediate" + case ModeValidated: + return "validated" + default: + return "default" + } } +const ( + Default = iota + ModeImmediate + ModeValidated +) + type multiClusterSpec struct { clusterOpts []spec.Option @@ -669,6 +723,7 @@ func setupLDR( c cluster.Cluster, setup multiClusterSetup, ldrWorkload LDRWorkload, + mode mode, ) (int, int) { c.Run(ctx, option.WithNodes(setup.workloadNode), @@ -680,9 +735,13 @@ func setupLDR( dbName, tableName := ldrWorkload.dbName, ldrWorkload.tableName startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int { + options := "" + if mode.String() != "" { + options = fmt.Sprintf("WITH mode='%s'", mode) + } targetDB.Exec(t, fmt.Sprintf("USE %s", dbName)) r := targetDB.QueryRow(t, - fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s", tableName, tableName), + fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s %s", tableName, tableName, options), sourceURL, ) var jobID int diff --git a/pkg/raft/BUILD.bazel b/pkg/raft/BUILD.bazel index cefc4cad06c5..86ce5ee107ae 100644 --- a/pkg/raft/BUILD.bazel +++ b/pkg/raft/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/raft/raftstoreliveness", "//pkg/raft/tracker", "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", "@org_golang_x_exp//maps", ], ) diff --git a/pkg/raft/bootstrap.go b/pkg/raft/bootstrap.go index f54f6d998320..b9c8d41a346c 100644 --- a/pkg/raft/bootstrap.go +++ b/pkg/raft/bootstrap.go @@ -18,9 +18,8 @@ package raft import ( - "errors" - pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/errors" ) // Bootstrap initializes the RawNode for first use by appending configuration diff --git a/pkg/raft/confchange/BUILD.bazel b/pkg/raft/confchange/BUILD.bazel index 6042995d6f67..5f482696c648 100644 --- a/pkg/raft/confchange/BUILD.bazel +++ b/pkg/raft/confchange/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/raft/quorum", "//pkg/raft/raftpb", "//pkg/raft/tracker", + "@com_github_cockroachdb_errors//:errors", ], ) @@ -29,5 +30,6 @@ go_test( "//pkg/raft/raftpb", "//pkg/raft/tracker", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/raft/confchange/confchange.go b/pkg/raft/confchange/confchange.go index e10006cb6e2d..b0cc8507e304 100644 --- a/pkg/raft/confchange/confchange.go +++ b/pkg/raft/confchange/confchange.go @@ -18,13 +18,13 @@ package confchange import ( - "errors" "fmt" "strings" "github.com/cockroachdb/cockroach/pkg/raft/quorum" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" + "github.com/cockroachdb/errors" ) // Changer facilitates configuration changes. It exposes methods to handle diff --git a/pkg/raft/confchange/datadriven_test.go b/pkg/raft/confchange/datadriven_test.go index e979402db81b..0acfca86c1f3 100644 --- a/pkg/raft/confchange/datadriven_test.go +++ b/pkg/raft/confchange/datadriven_test.go @@ -18,7 +18,6 @@ package confchange import ( - "errors" "fmt" "strconv" "strings" @@ -28,6 +27,7 @@ import ( pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" ) func TestConfChangeDataDriven(t *testing.T) { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 61bd86c82878..d2b370fe3063 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "crypto/rand" - "errors" "fmt" "math" "math/big" @@ -36,6 +35,7 @@ import ( pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/raft/tracker" + "github.com/cockroachdb/errors" "golang.org/x/exp/maps" ) diff --git a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go index 51d4167546a0..82b15d68cbd2 100644 --- a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go @@ -19,7 +19,6 @@ package rafttest import ( "context" - "errors" "fmt" "reflect" "testing" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" ) func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { diff --git a/pkg/raft/rafttest/interaction_env_handler_process_append_thread.go b/pkg/raft/rafttest/interaction_env_handler_process_append_thread.go index 65039532ac9b..92bb80db89e3 100644 --- a/pkg/raft/rafttest/interaction_env_handler_process_append_thread.go +++ b/pkg/raft/rafttest/interaction_env_handler_process_append_thread.go @@ -18,13 +18,13 @@ package rafttest import ( - "errors" "fmt" "testing" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" ) func (env *InteractionEnv) handleProcessAppendThread(t *testing.T, d datadriven.TestData) error { diff --git a/pkg/raft/rafttest/interaction_env_handler_report_unreachable.go b/pkg/raft/rafttest/interaction_env_handler_report_unreachable.go index 6c8e310be9fd..496f3ccef858 100644 --- a/pkg/raft/rafttest/interaction_env_handler_report_unreachable.go +++ b/pkg/raft/rafttest/interaction_env_handler_report_unreachable.go @@ -18,10 +18,10 @@ package rafttest import ( - "errors" "testing" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" ) func (env *InteractionEnv) handleReportUnreachable(t *testing.T, d datadriven.TestData) error { diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 179f415b748a..3ac2e7399922 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -18,10 +18,9 @@ package raft import ( - "errors" - pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" + "github.com/cockroachdb/errors" ) // ErrStepLocalMsg is returned when try to step a local raft message diff --git a/pkg/raft/storage.go b/pkg/raft/storage.go index 97d75c81384c..2f55a3745973 100644 --- a/pkg/raft/storage.go +++ b/pkg/raft/storage.go @@ -18,11 +18,11 @@ package raft import ( - "errors" "sync" "github.com/cockroachdb/cockroach/pkg/raft/raftlogger" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/errors" ) // ErrCompacted is returned by Storage.Entries/Compact when a requested diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index d16b61412a6b..13252325a0d0 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -64,6 +64,7 @@ go_test( "//pkg/util/log", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go index d5d19f0458e2..aca5c598fcec 100644 --- a/pkg/sql/tablemetadatacache/cluster_settings.go +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -6,10 +6,10 @@ package tablemetadatacache import ( - "errors" "time" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/errors" ) const defaultDataValidDuration = time.Minute * 20 diff --git a/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go b/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go index 162e607aba76..c788ff9b68cb 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go +++ b/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go @@ -7,13 +7,13 @@ package tablemetadatacache import ( "context" - "errors" "fmt" "time" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/errors" ) const ( diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index b65e3d6d88f1..6d7b46505b5b 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -7,7 +7,6 @@ package tablemetadatacache import ( "context" - "errors" "fmt" "testing" "time" @@ -22,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index b99ef3e2e9b0..802be711d6fd 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1866,12 +1866,19 @@ func TestLint(t *testing.T) { pkgs, err := packages.Load( &packages.Config{ Mode: packages.NeedImports | packages.NeedName, + Dir: crdbDir, }, pkgPath, ) if err != nil { return errors.Wrapf(err, "error loading package %s", pkgPath) } + // NB: if no packages were found, this API confusingly + // returns no error, so we need to explicitly check that + // something was returned. + if len(pkgs) == 0 { + return errors.Newf("could not list packages under %s", pkgPath) + } for _, pkg := range pkgs { for _, s := range pkg.Imports { arg.Out <- pkg.PkgPath + ": " + s.PkgPath @@ -1891,6 +1898,7 @@ func TestLint(t *testing.T) { stream.GrepNot(`cockroach/pkg/util/sysutil: syscall$`), stream.GrepNot(`cockroachdb/cockroach/pkg/build/bazel/util/tinystringer: errors$`), stream.GrepNot(`cockroachdb/cockroach/pkg/build/engflow: github\.com/golang/protobuf/proto$`), + stream.GrepNot(`cockroachdb/cockroach/pkg/build/engflow: log$`), stream.GrepNot(`cockroachdb/cockroach/pkg/util/grpcutil: github\.com/cockroachdb\/errors\/errbase$`), stream.GrepNot(`cockroachdb/cockroach/pkg/util/future: github\.com/cockroachdb\/errors\/errbase$`), stream.GrepNot(`cockroach/pkg/roachprod/install: syscall$`), // TODO: switch to sysutil @@ -1904,6 +1912,10 @@ func TestLint(t *testing.T) { stream.GrepNot(`cockroachdb/cockroach/pkg/kv/kvpb/gen: log$`), stream.GrepNot(`cockroachdb/cockroach/pkg/util/log/gen: log$`), stream.GrepNot(`cockroach/pkg/util/uuid: github\.com/satori/go\.uuid$`), + // See #132262. + stream.GrepNot(`github.com/cockroachdb/cockroach/pkg/raft/raftlogger: log$`), + stream.GrepNot(`github.com/cockroachdb/cockroach/pkg/raft/rafttest: log$`), + stream.GrepNot(`github.com/cockroachdb/cockroach/pkg/workload/debug: log$`), ), func(s string) { pkgStr := strings.Split(s, ": ") importingPkg, importedPkg := pkgStr[0], pkgStr[1]