Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: implement XA transaction SQL statements #129448

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1835,14 +1835,26 @@
<tr><td>APPLICATION</td><td>sql.txn.commit.count.internal</td><td>Number of SQL transaction COMMIT statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit.started.count</td><td>Number of SQL transaction COMMIT statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit.started.count.internal</td><td>Number of SQL transaction COMMIT statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.count</td><td>Number of SQL COMMIT PREPARED statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.count.internal</td><td>Number of SQL COMMIT PREPARED statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.started.count</td><td>Number of SQL COMMIT PREPARED statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.started.count.internal</td><td>Number of SQL COMMIT PREPARED statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.contended.count</td><td>Number of SQL transactions experienced contention</td><td>Contention</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.contended.count.internal</td><td>Number of SQL transactions experienced contention (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.latency</td><td>Latency of SQL transactions</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.latency.internal</td><td>Latency of SQL transactions (internal queries)</td><td>SQL Internal Statements</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.count</td><td>Number of SQL PREPARE TRANSACTION statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.count.internal</td><td>Number of SQL PREPARE TRANSACTION statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.started.count</td><td>Number of SQL PREPARE TRANSACTION statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.started.count.internal</td><td>Number of SQL PREPARE TRANSACTION statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.count</td><td>Number of SQL transaction ROLLBACK statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.count.internal</td><td>Number of SQL transaction ROLLBACK statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.started.count</td><td>Number of SQL transaction ROLLBACK statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.started.count.internal</td><td>Number of SQL transaction ROLLBACK statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.count</td><td>Number of SQL ROLLBACK PREPARED statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.count.internal</td><td>Number of SQL ROLLBACK PREPARED statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.started.count</td><td>Number of SQL ROLLBACK PREPARED statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.started.count.internal</td><td>Number of SQL ROLLBACK PREPARED statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.upgraded_iso_level.count</td><td>Number of times a weak isolation level was automatically upgraded to a stronger one</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.upgraded_iso_level.count.internal</td><td>Number of times a weak isolation level was automatically upgraded to a stronger one (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txns.open</td><td>Number of currently open user SQL transactions</td><td>Open SQL Transactions</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/cli/clisqlshell/testdata/describe
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ pg_catalog,pg_policies,table,node,permanent,prefix,pg_policies was created for c
pg_catalog,pg_policy,table,node,permanent,prefix,pg_policy was created for compatibility and is currently unimplemented
pg_catalog,pg_prepared_statements,table,node,permanent,prefix,"prepared statements
https://www.postgresql.org/docs/9.6/view-pg-prepared-statements.html"
pg_catalog,pg_prepared_xacts,table,node,permanent,prefix,"prepared transactions (empty - feature does not exist)
pg_catalog,pg_prepared_xacts,table,node,permanent,prefix,"prepared transactions
https://www.postgresql.org/docs/9.6/view-pg-prepared-xacts.html"
pg_catalog,pg_proc,table,node,permanent,prefix,"built-in functions (incomplete)
https://www.postgresql.org/docs/16/catalog-pg-proc.html"
Expand Down
16 changes: 0 additions & 16 deletions pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,23 +621,7 @@ var pgjdbcBlockList = blocklist{
`org.postgresql.test.util.PasswordUtilTest.encryptionTypeValueOfOn()`: "73337",
`org.postgresql.test.util.PasswordUtilTest.mD5()`: "73337",
`org.postgresql.test.util.PasswordUtilTest.scramSha256()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.autoCommit()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.commitByDifferentConnection()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.commitUnknownXid()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.committingCommittedXid()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.databaseRemovesPreparedBeforeCommit()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.databaseRemovesPreparedBeforeRollback()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.mappingOfConstraintViolations()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.networkIssueOnCommit()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.networkIssueOnRollback()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.onePhaseCommitOfPrepared()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.preparingPreparedXid()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.recover()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.repeatedRolledBack()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.rollback()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.rollbackByDifferentConnection()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.rollbackUnknownXid()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.twoPhaseCommit()`: "unknown",
}

var pgjdbcIgnoreList = blocklist{
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/sqlsmith/sqlsmith.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ var DisableDDLs = simpleOption("disable DDLs", func(s *Smither) {
{2, makeRollbackToSavepoint},
{2, makeCommit},
{2, makeRollback},
// TODO(nvanbenschoten): add two-phase commit statements.
}
})

Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,13 @@ func (tc *TxnCoordSender) IsSerializablePushAndRefreshNotPossible() bool {
return isTxnSerializable && isTxnPushed && refreshAttemptNotPossible
}

// Key is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) Key() roachpb.Key {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.txn.Key
}

// Epoch is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) Epoch() enginepb.TxnEpoch {
tc.mu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func (m *MockTransactionalSender) CanUseSavepoint(context.Context, SavepointToke
panic("unimplemented")
}

// Key is part of the TxnSender interface.
func (m *MockTransactionalSender) Key() roachpb.Key { panic("unimplemented") }

// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") }

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ type TxnSender interface {
// https://github.com/cockroachdb/cockroach/issues/15012
Active() bool

// Key returns the current "anchor" key of the transaction, or nil if no such
// key has been set because the transaction has not yet acquired any locks.
Key() roachpb.Key

// Epoch returns the txn's epoch.
Epoch() enginepb.TxnEpoch

Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ func (txn *Txn) ID() uuid.UUID {
return txn.mu.ID
}

// Key returns the current "anchor" key of the transaction, or nil if no such
// key has been set because the transaction has not yet acquired any locks.
func (txn *Txn) Key() roachpb.Key {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.Key()
}

// Epoch exports the txn's epoch.
func (txn *Txn) Epoch() enginepb.TxnEpoch {
txn.mu.Lock()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ go_library(
"testutils.go",
"topk.go",
"truncate.go",
"two_phase_commit.go",
"txn_fingerprint_id_cache.go",
"txn_state.go",
"type_change.go",
Expand Down
31 changes: 27 additions & 4 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,7 +2161,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
}

switch ev.eventType {
case txnCommit, txnRollback:
case txnCommit, txnRollback, txnPrepare:
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
Expand Down Expand Up @@ -4080,10 +4080,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

fallthrough
case txnRollback:
case txnRollback, txnPrepare:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
// Since we're doing a complete rollback, there's no need to keep the
// prepared stmts for a txn rewind.
// Since we're finalizing the SQL transaction (commit, rollback, prepare),
// there's no need to keep the prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
Expand Down Expand Up @@ -4494,6 +4494,11 @@ type StatementCounters struct {
TxnRollbackCount telemetry.CounterWithMetric
TxnUpgradedCount *metric.Counter

// Transaction XA two-phase commit operations.
TxnPrepareCount telemetry.CounterWithMetric
TxnCommitPreparedCount telemetry.CounterWithMetric
TxnRollbackPreparedCount telemetry.CounterWithMetric

// Savepoint operations. SavepointCount is for real SQL savepoints;
// the RestartSavepoint variants are for the
// cockroach-specific client-side retry protocol.
Expand Down Expand Up @@ -4531,6 +4536,12 @@ func makeStartedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackStarted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
TxnPrepareCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnPrepareStarted, internal)),
TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitPreparedStarted, internal)),
TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackPreparedStarted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointStarted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
Expand Down Expand Up @@ -4576,6 +4587,12 @@ func makeExecutedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackExecuted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
TxnPrepareCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnPrepareExecuted, internal)),
TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitPreparedExecuted, internal)),
TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackPreparedExecuted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointExecuted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
Expand Down Expand Up @@ -4638,6 +4655,12 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen
} else {
sc.TxnRollbackCount.Inc()
}
case *tree.PrepareTransaction:
sc.TxnPrepareCount.Inc()
case *tree.CommitPrepared:
sc.TxnCommitPreparedCount.Inc()
case *tree.RollbackPrepared:
sc.TxnRollbackPreparedCount.Inc()
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(t.Name) {
sc.RestartSavepointCount.Inc()
Expand Down
21 changes: 15 additions & 6 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ func (ex *connExecutor) execStmtInOpenState(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil

case *tree.PrepareTransaction:
ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
return ev, payload, nil

case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
Expand Down Expand Up @@ -1799,6 +1803,10 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil

case *tree.PrepareTransaction:
ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
return ev, payload, nil

case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
Expand Down Expand Up @@ -3450,8 +3458,8 @@ func (ex *connExecutor) execStmtInNoTxnState(
)
case *tree.ShowCommitTimestamp:
return ex.execShowCommitTimestampInNoTxnState(ctx, s, res)
case *tree.CommitTransaction, *tree.ReleaseSavepoint,
*tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint:
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction,
*tree.SetTransaction, *tree.Savepoint, *tree.ReleaseSavepoint:
if ex.sessionData().AutoCommitBeforeDDL {
// If autocommit_before_ddl is set, we allow these statements to be
// executed, and send a warning rather than an error.
Expand Down Expand Up @@ -3523,7 +3531,7 @@ func (ex *connExecutor) beginImplicitTxn(

// execStmtInAbortedState executes a statement in a txn that's in state
// Aborted or RestartWait. All statements result in error events except:
// - COMMIT / ROLLBACK: aborts the current transaction.
// - COMMIT / ROLLBACK / PREPARE TRANSACTION: aborts the current transaction.
// - ROLLBACK TO SAVEPOINT / SAVEPOINT: reopens the current transaction,
// allowing it to be retried.
func (ex *connExecutor) execStmtInAbortedState(
Expand All @@ -3545,9 +3553,10 @@ func (ex *connExecutor) execStmtInAbortedState(
}

switch s := ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
if _, ok := s.(*tree.CommitTransaction); ok {
// Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction:
if _, ok := s.(*tree.RollbackTransaction); !ok {
// Note: Postgres replies to COMMIT and PREPARE TRANSACTION of failed
// transactions with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))
}
return ex.rollbackSQLTransaction(ctx, s)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ func (ex *connExecutor) execDescribe(
// prepared and executed inside of an aborted transaction.
func (ex *connExecutor) isAllowedInAbortedTxn(ast tree.Statement) bool {
switch s := ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.RollbackToSavepoint:
case *tree.CommitTransaction, *tree.PrepareTransaction,
*tree.RollbackTransaction, *tree.RollbackToSavepoint:
return true
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(s.Name) {
Expand Down
Loading
Loading