Skip to content

Commit

Permalink
sql: use internalDB in two_phase_commit.go
Browse files Browse the repository at this point in the history
Squash if this is what Rafi was looking for.
  • Loading branch information
nvanbenschoten committed Dec 17, 2024
1 parent 59b4dfd commit 9ecfba9
Showing 1 changed file with 33 additions and 43 deletions.
76 changes: 33 additions & 43 deletions pkg/sql/two_phase_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,21 @@ func (ex *connExecutor) execPrepareTransactionInOpenStateInternal(

// Insert the prepared transaction's row into the system table. We do this
// before preparing the transaction in the KV layer so that we can track the
// existence of the prepared transaction in the event of a crash.
if err := insertPreparedTransaction(
ctx,
ex.server.cfg.InternalDB.Executor(),
nil, /* sqlTxn */
globalID,
txnID,
txnKey,
ex.sessionData().User().Normalized(),
ex.sessionData().Database,
); err != nil {
// existence of the prepared transaction in the event of a crash. We do this
// non-transactionally so that the system row is committed and readable while
// the transaction that it references remains in the PREPARED state.
err := ex.server.cfg.InternalDB.Txn(ctx, func(ctx context.Context, sqlTxn isql.Txn) error {
return insertPreparedTransaction(
ctx,
sqlTxn,
globalID,
txnID,
txnKey,
ex.sessionData().User().Normalized(),
ex.sessionData().Database,
)
})
if err != nil {
if pgerror.GetPGCode(err) == pgcode.UniqueViolation {
return pgerror.Newf(pgcode.DuplicateObject, "transaction identifier %q is already in use", globalID)
}
Expand Down Expand Up @@ -169,13 +173,11 @@ func (ex *connExecutor) cleanupAfterFailedPrepareTransaction(ctx context.Context
}

// We're certain that the transaction has been rolled back and that its record
// is not in the PREPARED state. Clean up the system.prepared_transactions row.
err = deletePreparedTransaction(
ctx,
ex.server.cfg.InternalDB.Executor(),
nil, /* sqlTxn */
globalID,
)
// is not in the PREPARED state. Clean up the system.prepared_transactions row,
// non-transactionally.
err = ex.server.cfg.InternalDB.Txn(ctx, func(ctx context.Context, sqlTxn isql.Txn) error {
return deletePreparedTransaction(ctx, sqlTxn, globalID)
})
if err != nil {
log.Warningf(ctx, "cleanup prepared transaction row failed: %s", err)
}
Expand Down Expand Up @@ -277,12 +279,7 @@ func (f *endPreparedTxnNode) checkNoActiveTxn(params runParams) error {
func (f *endPreparedTxnNode) selectPreparedTxn(
params runParams,
) (txnID uuid.UUID, txnKey roachpb.Key, owner string, err error) {
row, err := selectPreparedTransaction(
params.ctx,
params.p.InternalSQLTxn(),
params.p.Txn(),
f.globalID,
)
row, err := selectPreparedTransaction(params.ctx, params.p.InternalSQLTxn(), f.globalID)
if err != nil {
return uuid.UUID{}, nil, "", err
}
Expand Down Expand Up @@ -354,12 +351,7 @@ func (f *endPreparedTxnNode) endPreparedTxn(

// deletePreparedTxn deletes the prepared transaction from the system table.
func (f *endPreparedTxnNode) deletePreparedTxn(params runParams) error {
return deletePreparedTransaction(
params.ctx,
params.p.InternalSQLTxn(),
params.p.Txn(),
f.globalID,
)
return deletePreparedTransaction(params.ctx, params.p.InternalSQLTxn(), f.globalID)
}

func (f *endPreparedTxnNode) Next(params runParams) (bool, error) { return false, nil }
Expand All @@ -368,17 +360,16 @@ func (f *endPreparedTxnNode) Close(ctx context.Context) {}

func insertPreparedTransaction(
ctx context.Context,
ie isql.Executor,
sqlTxn *kv.Txn,
sqlTxn isql.Txn,
globalID string,
txnID uuid.UUID,
txnKey roachpb.Key,
owner, database string,
) error {
_, err := ie.ExecEx(
_, err := sqlTxn.ExecEx(
ctx,
"insert-prepared-transaction",
sqlTxn,
sqlTxn.KV(),
sessiondata.NodeUserSessionDataOverride,
`INSERT INTO system.prepared_transactions
(global_id, transaction_id, transaction_key, owner, database)
Expand All @@ -392,13 +383,11 @@ func insertPreparedTransaction(
return err
}

func deletePreparedTransaction(
ctx context.Context, ie isql.Executor, sqlTxn *kv.Txn, globalID string,
) error {
_, err := ie.ExecEx(
func deletePreparedTransaction(ctx context.Context, sqlTxn isql.Txn, globalID string) error {
_, err := sqlTxn.ExecEx(
ctx,
"delete-prepared-transaction",
sqlTxn,
sqlTxn.KV(),
sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.prepared_transactions WHERE global_id = $1`,
globalID,
Expand All @@ -407,14 +396,15 @@ func deletePreparedTransaction(
}

func selectPreparedTransaction(
ctx context.Context, ie isql.Executor, sqlTxn *kv.Txn, globalID string,
ctx context.Context, sqlTxn isql.Txn, globalID string,
) (tree.Datums, error) {
return ie.QueryRowEx(
return sqlTxn.QueryRowEx(
ctx,
"select-prepared-txn",
sqlTxn,
sqlTxn.KV(),
sessiondata.NodeUserSessionDataOverride,
`SELECT transaction_id, transaction_key, owner FROM system.prepared_transactions WHERE global_id = $1 FOR UPDATE`,
`SELECT transaction_id, transaction_key, owner
FROM system.prepared_transactions WHERE global_id = $1 FOR UPDATE`,
globalID,
)
}
Expand Down

0 comments on commit 9ecfba9

Please sign in to comment.