Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124306: ttljob: don't block job completion on stats queries r=rafiss a=rafiss

fixes cockroachdb#124305

Release note (bug fix): Previously, if the ttl_row_stats_poll_interval
storage parameter was non-zero for a table with row level TTL enabled,
the queries issued to update row stats could block the job from
completing. Now, if the job completes, these stats queries are cancelled
instead. This means that the jobs.row_level_ttl.total_rows and
jobs.row_level_ttl.total_expired_rows metrics will report 0 if the job
finishes before the row stats queries complete.

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed May 23, 2024
2 parents 5b8b7da + c614095 commit a19cc3f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 30 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,9 @@ type TTLTestingKnobs struct {
// PreSelectStatement runs before the start of the TTL select-delete
// loop.
PreSelectStatement string
// ExtraStatsQuery is an additional query to run while gathering stats if
// the ttl_row_stats_poll_interval is set. It is always run first.
ExtraStatsQuery string
}

// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
Expand Down
40 changes: 17 additions & 23 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,44 +131,30 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
ttlExpr := rowLevelTTL.GetTTLExpr()

labelMetrics := rowLevelTTL.LabelMetrics
group := ctxgroup.WithContext(ctx)
statsCtx, statsCancel := context.WithCancelCause(ctx)
defer statsCancel(nil)
statsGroup := ctxgroup.WithContext(statsCtx)
err := func() error {
statsCloseCh := make(chan struct{})
defer close(statsCloseCh)
if rowLevelTTL.RowStatsPollInterval != 0 {

defer statsCancel(errors.New("cancelling TTL stats query because TTL job completed"))
metrics := execCfg.JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics(
labelMetrics,
relationName,
)

group.GoCtx(func(ctx context.Context) error {

handleError := func(err error) error {
if err == nil {
return nil
}
if knobs.ReturnStatsError {
return err
}
log.Warningf(ctx, "failed to get statistics for table id %d: %v", details.TableID, err)
return nil
}

statsGroup.GoCtx(func(ctx context.Context) error {
// Do once initially to ensure we have some base statistics.
err := metrics.fetchStatistics(ctx, execCfg, relationName, details, aostDuration, ttlExpr)
if err := handleError(err); err != nil {
if err := metrics.fetchStatistics(ctx, execCfg, relationName, details, aostDuration, ttlExpr); err != nil {
return err
}
// Wait until poll interval is reached, or early exit when we are done
// with the TTL job.
for {
select {
case <-statsCloseCh:
case <-ctx.Done():
return nil
case <-time.After(rowLevelTTL.RowStatsPollInterval):
err := metrics.fetchStatistics(ctx, execCfg, relationName, details, aostDuration, ttlExpr)
if err := handleError(err); err != nil {
if err := metrics.fetchStatistics(ctx, execCfg, relationName, details, aostDuration, ttlExpr); err != nil {
return err
}
}
Expand Down Expand Up @@ -299,7 +285,15 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
return err
}

return group.Wait()
if err := statsGroup.Wait(); err != nil {
// If the stats group was cancelled, use that error instead.
err = errors.CombineErrors(context.Cause(statsCtx), err)
if knobs.ReturnStatsError {
return err
}
log.Warningf(ctx, "failed to get statistics for table id %d: %v", details.TableID, err)
}
return nil
}

// OnFailOrCancel implements the jobs.Resumer interface.
Expand Down
21 changes: 16 additions & 5 deletions pkg/sql/ttl/ttljob/ttljob_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,24 +197,35 @@ func (m *rowLevelTTLMetrics) fetchStatistics(
return err
}

for _, c := range []struct {
type statsQuery struct {
opName string
query string
args []interface{}
gauge *aggmetric.Gauge
}{
{
}
var statsQueries []statsQuery
if ttlKnobs := execCfg.TTLTestingKnobs; ttlKnobs != nil && ttlKnobs.ExtraStatsQuery != "" {
statsQueries = append(statsQueries, statsQuery{
opName: fmt.Sprintf("ttl extra stats query %s", relationName),
query: ttlKnobs.ExtraStatsQuery,
},
)
}
statsQueries = append(statsQueries,
statsQuery{
opName: fmt.Sprintf("ttl num rows stats %s", relationName),
query: `SELECT count(1) FROM [%d AS t] AS OF SYSTEM TIME %s`,
gauge: m.TotalRows,
},
{
statsQuery{
opName: fmt.Sprintf("ttl num expired rows stats %s", relationName),
query: `SELECT count(1) FROM [%d AS t] AS OF SYSTEM TIME %s WHERE (` + string(ttlExpr) + `) < $1`,
args: []interface{}{details.Cutoff},
gauge: m.TotalExpiredRows,
},
} {
)

for _, c := range statsQueries {
// User a super low quality of service (lower than TTL low), as we don't
// really care if statistics gets left behind and prefer the TTL job to
// have priority.
Expand Down
38 changes: 36 additions & 2 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
th, cleanupFunc := newRowLevelTTLTestJobTestHelper(
t,
&sql.TTLTestingKnobs{
AOSTDuration: &zeroDuration,
ReturnStatsError: true,
AOSTDuration: &zeroDuration,
},
tc.numSplits == 0 && !tc.forceNonMultiTenant, // SPLIT AT does not work with multi-tenant
1, /* numNodes */
Expand Down Expand Up @@ -931,6 +930,41 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
}
}

func TestRowLevelTTLCancelStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanupFunc := newRowLevelTTLTestJobTestHelper(
t,
&sql.TTLTestingKnobs{
AOSTDuration: &zeroDuration,
ReturnStatsError: true,
ExtraStatsQuery: "SELECT pg_sleep(100)",
},
false, /* testMultiTenant */
1, /* numNodes */
)
defer cleanupFunc()

th.sqlDB.Exec(t, `
CREATE TABLE t (
id INT PRIMARY KEY,
expire_at TIMESTAMPTZ
) WITH (
ttl_expiration_expression = 'expire_at',
ttl_row_stats_poll_interval = '1 minute'
)`)
th.sqlDB.Exec(t, `INSERT INTO t (id, expire_at) VALUES (1, '2020-01-01')`)

// Force the schedule to execute. Normally, the job would not fail due to a
// stats error, but we have set the ReturnStatsError knob to true in this
// test.
th.waitForScheduledJob(t, jobs.StatusFailed, "cancelling TTL stats query because TTL job completed")

results := th.sqlDB.QueryStr(t, "SELECT * FROM t")
require.Empty(t, results)
}

func TestOutboundForeignKey(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit a19cc3f

Please sign in to comment.