From e5c9e5dd22181539398af5fd6d477260cfeef974 Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Wed, 15 Jan 2025 16:42:29 +0800 Subject: [PATCH] always rollback the transaction Signed-off-by: Yang Keao --- pkg/ttl/session/session.go | 22 ++++++++++++++-------- pkg/ttl/ttlworker/scan_test.go | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/ttl/session/session.go b/pkg/ttl/session/session.go index 8a9ea06adba64..812108b3e4061 100644 --- a/pkg/ttl/session/session.go +++ b/pkg/ttl/session/session.go @@ -111,6 +111,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...any) ([]ch // RunInTxn executes the specified function in a txn func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) { + success := false + defer func() { + // Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed + // after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed. + if !success { + // For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled. + // Using another timeout context to avoid that this behavior will be changed in the future. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK") + terror.Log(rollbackErr) + cancel() + } + }() + tracer := metrics.PhaseTracerFromCtx(ctx) defer tracer.EnterPhase(tracer.Phase()) @@ -129,14 +143,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode } tracer.EnterPhase(metrics.PhaseOther) - success := false - defer func() { - if !success { - _, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK") - terror.Log(rollbackErr) - } - }() - if err = fn(); err != nil { return err } diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index c860f89c1e510..d682859ac1a0f 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -554,3 +554,21 @@ func TestScanTaskCancelStmt(t *testing.T) { task.ctx, cancel = context.WithCancel(context.Background()) testCancel(context.Background(), cancel) } + +// NewTTLScanTask creates a new TTL scan task for test. +func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask { + return &ttlScanTask{ + ctx: ctx, + tbl: tbl, + TTLTask: ttlTask, + statistics: &ttlStatistics{}, + } +} + +// DoScan is an exported version of `doScan` for test. +func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult { + return t.doScan(ctx, delCh, sessPool) +} + +// TTLDeleteTask is an exported version of `ttlDeleteTask` for test. +type TTLDeleteTask = ttlDeleteTask