Skip to content

Commit

Permalink
always rollback the transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jan 15, 2025
1 parent 9c01dcd commit e5c9e5d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
22 changes: 14 additions & 8 deletions pkg/ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...any) ([]ch

// RunInTxn executes the specified function in a txn
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
success := false
defer func() {
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
if !success {
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
// Using another timeout context to avoid that this behavior will be changed in the future.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
cancel()
}
}()

tracer := metrics.PhaseTracerFromCtx(ctx)
defer tracer.EnterPhase(tracer.Phase())

Expand All @@ -129,14 +143,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
}
tracer.EnterPhase(metrics.PhaseOther)

success := false
defer func() {
if !success {
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
}
}()

if err = fn(); err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,21 @@ func TestScanTaskCancelStmt(t *testing.T) {
task.ctx, cancel = context.WithCancel(context.Background())
testCancel(context.Background(), cancel)
}

// NewTTLScanTask creates a new TTL scan task for test.
func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask {
return &ttlScanTask{
ctx: ctx,
tbl: tbl,
TTLTask: ttlTask,
statistics: &ttlStatistics{},
}
}

// DoScan is an exported version of `doScan` for test.
func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
return t.doScan(ctx, delCh, sessPool)
}

// TTLDeleteTask is an exported version of `ttlDeleteTask` for test.
type TTLDeleteTask = ttlDeleteTask

0 comments on commit e5c9e5d

Please sign in to comment.