From 2c2a480abc1dbb53939b50ff85cae562777d1909 Mon Sep 17 00:00:00 2001 From: Kyle Wong <37189875+kyle-a-wong@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:57:37 -0400 Subject: [PATCH] tablemetadatacache: use AOST in update job queries In order to reduce contention on system tables in the table metadata update job, the `AS OF SYSTEM TIME` has been added to the tableMetadataBatchIterator "fetch-table-metadata-batch" query. Resolves: #131829 Epic: CRDB-37558 Release note: None --- pkg/sql/tablemetadatacache/BUILD.bazel | 1 + .../table_metadata_batch_iterator.go | 96 +++++++++++-------- .../table_metadata_updater.go | 2 +- .../table_metadata_updater_test.go | 9 +- 4 files changed, 65 insertions(+), 43 deletions(-) diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index 50c4a992871a..918ba53be4b5 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -64,6 +64,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go b/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go index 6668215a44a7..2b84804d609a 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go +++ b/pkg/sql/tablemetadatacache/table_metadata_batch_iterator.go @@ -8,6 +8,7 @@ package tablemetadatacache import ( "context" "errors" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -68,9 +69,13 @@ type tableMetadataBatchIterator struct { lastID paginationKey // The current batch of rows. batchRows []tableMetadataIterRow + // query statement to use for retrieving batches + queryStatement string } -func newTableMetadataBatchIterator(ie isql.Executor) *tableMetadataBatchIterator { +func newTableMetadataBatchIterator( + ie isql.Executor, aostClause string, +) *tableMetadataBatchIterator { return &tableMetadataBatchIterator{ ie: ie, batchRows: make([]tableMetadataIterRow, 0, tableBatchSize), @@ -79,6 +84,7 @@ func newTableMetadataBatchIterator(ie isql.Executor) *tableMetadataBatchIterator schemaID: 0, name: "", }, + queryStatement: newBatchQueryStatement(aostClause), } } @@ -110,46 +116,7 @@ func (batchIter *tableMetadataBatchIterator) fetchNextBatch( ctx, "fetch-table-metadata-batch", nil, /* txn */ - sessiondata.NodeUserWithBulkLowPriSessionDataOverride, ` -WITH tables AS (SELECT n.id, - n.name, - n."parentID", - n."parentSchemaID", - d.descriptor, - crdb_internal.table_span(n.id) as span - FROM system.namespace n - JOIN system.descriptor d ON n.id = d.id - WHERE (n."parentID", n."parentSchemaID", n.name) > ($1, $2, $3) - ORDER BY (n."parentID", n."parentSchemaID", n.name) - LIMIT $4), -span_array AS (SELECT array_agg((span[1], span[2])) as all_spans FROM tables), -span_stats AS (SELECT stats, t.id FROM crdb_internal.tenant_span_stats((SELECT all_spans FROM span_array)) s - JOIN tables t on t.span[1] = s.start_key and t.span[2] = s.end_key) -SELECT t.id, - t.name, - t."parentID", - db_name.name, - t."parentSchemaID", - schema_name.name, - json_array_length(d -> 'table' -> 'columns') as columns, - COALESCE(json_array_length(d -> 'table' -> 'indexes'), 0), - s.stats, - CASE - WHEN d->'table'->>'isMaterializedView' = 'true' THEN 'MATERIALIZED_VIEW' - WHEN d->'table'->>'viewQuery' IS NOT NULL THEN 'VIEW' - WHEN d->'table'->'sequenceOpts' IS NOT NULL THEN 'SEQUENCE' - ELSE 'TABLE' - END as table_type, - (d->'table'->'autoStatsSettings'->>'enabled')::BOOL as auto_stats_enabled, - ts.last_updated as stats_last_updated -FROM tables t -LEFT JOIN span_stats s ON t.id = s.id -LEFT JOIN (select "tableID", max("createdAt") as last_updated from system.table_statistics group by "tableID") ts ON ts."tableID" = t.id -LEFT JOIN system.namespace db_name ON t."parentID" = db_name.id -LEFT JOIN system.namespace schema_name ON t."parentSchemaID" = schema_name.id, -crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', t.descriptor) AS d -ORDER BY (t."parentID", t."parentSchemaID", t.name) -`, + sessiondata.NodeUserWithBulkLowPriSessionDataOverride, batchIter.queryStatement, batchIter.lastID.parentID, batchIter.lastID.schemaID, batchIter.lastID.name, batchSize, ) if err != nil { @@ -219,3 +186,50 @@ ORDER BY (t."parentID", t."parentSchemaID", t.name) return len(batchIter.batchRows) > 0, nil } + +// newBatchQueryStatement creates a query statement to fetch batches of table metadata to insert into +// system.table_metadata. +func newBatchQueryStatement(aostClause string) string { + return fmt.Sprintf(` +WITH tables AS (SELECT n.id, + n.name, + n."parentID", + n."parentSchemaID", + d.descriptor, + crdb_internal.table_span(n.id) as span + FROM system.namespace n + JOIN system.descriptor d ON n.id = d.id + %[1]s + WHERE (n."parentID", n."parentSchemaID", n.name) > ($1, $2, $3) + ORDER BY (n."parentID", n."parentSchemaID", n.name) + LIMIT $4), +span_array AS (SELECT array_agg((span[1], span[2])) as all_spans FROM tables), +span_stats AS (SELECT stats, t.id FROM crdb_internal.tenant_span_stats((SELECT all_spans FROM span_array)) s + JOIN tables t on t.span[1] = s.start_key and t.span[2] = s.end_key) +SELECT t.id, + t.name, + t."parentID", + db_name.name, + t."parentSchemaID", + schema_name.name, + json_array_length(d -> 'table' -> 'columns') as columns, + COALESCE(json_array_length(d -> 'table' -> 'indexes'), 0), + s.stats, + CASE + WHEN d->'table'->>'isMaterializedView' = 'true' THEN 'MATERIALIZED_VIEW' + WHEN d->'table'->>'viewQuery' IS NOT NULL THEN 'VIEW' + WHEN d->'table'->'sequenceOpts' IS NOT NULL THEN 'SEQUENCE' + ELSE 'TABLE' + END as table_type, + (d->'table'->'autoStatsSettings'->>'enabled')::BOOL as auto_stats_enabled, + ts.last_updated as stats_last_updated +FROM tables t +LEFT JOIN span_stats s ON t.id = s.id +LEFT JOIN (select "tableID", max("createdAt") as last_updated from system.table_statistics group by "tableID") ts ON ts."tableID" = t.id +LEFT JOIN system.namespace db_name ON t."parentID" = db_name.id +LEFT JOIN system.namespace schema_name ON t."parentSchemaID" = schema_name.id, +crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', t.descriptor) AS d +%[1]s +ORDER BY (t."parentID", t."parentSchemaID", t.name) +`, aostClause) +} diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater.go b/pkg/sql/tablemetadatacache/table_metadata_updater.go index c7dd261c854f..34f9acf8bb75 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater.go @@ -84,7 +84,7 @@ func (u *tableMetadataUpdater) updateCache(ctx context.Context) (updated int, er // upsertQuery is the query used to upsert table metadata rows, // it is reused for each batch to avoid allocations between batches. upsert := newTableMetadataBatchUpsertQuery(tableBatchSize) - it := newTableMetadataBatchIterator(u.ie) + it := newTableMetadataBatchIterator(u.ie, u.testKnobs.GetAOSTClause()) estimatedRowsToUpdate, err := u.getRowsToUpdateCount(ctx) if err != nil { log.Errorf(ctx, "failed to get estimated row count. err=%s", err.Error()) diff --git a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go index eaac9b8d2c43..e006b7ea3682 100644 --- a/pkg/sql/tablemetadatacache/table_metadata_updater_test.go +++ b/pkg/sql/tablemetadatacache/table_metadata_updater_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) @@ -146,11 +147,17 @@ func TestTableMetadataUpdateJobProgressAndMetrics(t *testing.T) { require.Greater(t, metrics.Duration.CumulativeSnapshot().Mean(), float64(0)) count = 0 currentProgress = 0 - + sw := timeutil.NewStopWatch() + sw.Start() // generate 500 random tables conn.Exec(t, `SELECT crdb_internal.generate_test_objects('{"names": "random_table_", "counts": [500], "randomize_columns": true}'::JSONB)`) + sw.Stop() + t.Logf("Time elapsed to generate tables: %s", sw.Elapsed()) + sw.Start() require.NoError(t, updater.RunUpdater(ctx)) + sw.Stop() + t.Logf("Time elapsed to run update job: %s", sw.Elapsed()) // The updated tables metric doesn't reset between runs, so it should increase by updatedTables + 500, because 500 // random tables were previously generated expectedTablesUpdated := (updatedTables * 2) + 500