Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[nexus] Use retryable transactions more extensively #7212

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ disallowed-methods = [
# `IncompleteOnConflictExt::as_partial_index` in `nexus-db-queries`.
# See the documentation of that method for more.
"diesel::upsert::DecoratableTarget::filter_target",

# This form of transaction is susceptible to serialization failures,
# and can fail spuriously.
# Instead, the "transaction_retry_wrapper" should be preferred, as it
# automatically retries transactions experiencing contention.
{ path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. Feel free to override this for tests and nested transactions." },
]
2 changes: 2 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

// NOTE: emanates from Tabled macros
#![allow(clippy::useless_vec)]
// NOTE: allowing "transaction_async" without retry
#![allow(clippy::disallowed_methods)]

use crate::check_allow_destructive::DestructiveOperationToken;
use crate::helpers::const_max_len;
Expand Down
35 changes: 22 additions & 13 deletions nexus/db-queries/src/db/datastore/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ impl DataStore {
// batch rather than making a bunch of round-trips to the database.
// We'd do that if we had an interface for doing that with bound
// parameters, etc. See oxidecomputer/omicron#973.

// The risk of a serialization error is possible here, but low,
// as most of the operations should be insertions rather than in-place
// modifications of existing tables.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|conn| async move {
// Insert the row for the blueprint.
{
Expand Down Expand Up @@ -1087,6 +1092,7 @@ impl DataStore {
// start removing it and we'd also need to make sure we didn't leak a
// collection if we crash while deleting it.
let conn = self.pool_connection_authorized(opctx).await?;
let err = OptionalError::new();

let (
nblueprints,
Expand All @@ -1101,19 +1107,23 @@ impl DataStore {
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
) = conn
.transaction_async(|conn| async move {
) = self.transaction_retry_wrapper("blueprint_delete")
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
.transaction(&conn, |conn| {
let err = err.clone();
async move {
// Ensure that blueprint we're about to delete is not the
// current target.
let current_target =
Self::blueprint_current_target_only(&conn).await?;
let current_target = Self::blueprint_current_target_only(&conn)
.await
.map_err(|txn_err| txn_err.into_diesel(&err))?;

if current_target.target_id == blueprint_id {
return Err(TransactionError::CustomError(
return Err(err.bail(TransactionError::CustomError(
Error::conflict(format!(
"blueprint {blueprint_id} is the \
current target and cannot be deleted",
)),
));
)));
}

// Remove the record describing the blueprint itself.
Expand All @@ -1130,9 +1140,9 @@ impl DataStore {
// references to it in any of the remaining tables either, since
// deletion always goes through this transaction.
if nblueprints == 0 {
return Err(TransactionError::CustomError(
return Err(err.bail(TransactionError::CustomError(
authz_blueprint.not_found(),
));
)));
}

// Remove rows associated with sled states.
Expand Down Expand Up @@ -1259,13 +1269,12 @@ impl DataStore {
nclickhouse_keepers,
nclickhouse_servers,
))
}
})
.await
.map_err(|error| match error {
TransactionError::CustomError(e) => e,
TransactionError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
.map_err(|e| match err.take() {
Some(err) => err.into(),
None => public_error_from_diesel(e, ErrorHandler::Server),
})?;

info!(&opctx.log, "removed blueprint";
Expand Down
79 changes: 47 additions & 32 deletions nexus/db-queries/src/db/datastore/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
Expand Down Expand Up @@ -363,40 +364,49 @@ impl DataStore {
) -> Result<(), Error> {
opctx.authorize(authz::Action::Modify, &authz::DNS_CONFIG).await?;
let conn = self.pool_connection_authorized(opctx).await?;
conn.transaction_async(|c| async move {
let zones = self
.dns_zones_list_all_on_connection(opctx, &c, update.dns_group)
.await?;
// This looks like a time-of-check-to-time-of-use race, but this
// approach works because we're inside a transaction and the
// isolation level is SERIALIZABLE.
let version = self
.dns_group_latest_version_conn(opctx, &c, update.dns_group)
.await?;
if version.version != old_version {
return Err(TransactionError::CustomError(Error::conflict(
format!(
"expected current DNS version to be {}, found {}",
*old_version, *version.version,
),
)));
}

self.dns_write_version_internal(
&c,
update,
zones,
Generation(old_version.next()),
)
let err = OptionalError::new();

self.transaction_retry_wrapper("dns_update_from_version")
.transaction(&conn, |c| {
let err = err.clone();
let update = update.clone();
async move {
let zones = self
.dns_zones_list_all_on_connection(opctx, &c, update.dns_group)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))?;
// This looks like a time-of-check-to-time-of-use race, but this
// approach works because we're inside a transaction and the
// isolation level is SERIALIZABLE.
let version = self
.dns_group_latest_version_conn(opctx, &c, update.dns_group)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))?;
if version.version != old_version {
return Err(err.bail(TransactionError::CustomError(Error::conflict(
format!(
"expected current DNS version to be {}, found {}",
*old_version, *version.version,
),
))));
}

self.dns_write_version_internal(
&c,
update,
zones,
Generation(old_version.next()),
)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))
}
})
.await
})
.await
.map_err(|e| match e {
TransactionError::CustomError(e) => e,
TransactionError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})
.map_err(|e| match err.take() {
Some(err) => err.into(),
None => public_error_from_diesel(e, ErrorHandler::Server),
})
}

/// Update the configuration of a DNS zone as specified in `update`
Expand Down Expand Up @@ -441,6 +451,9 @@ impl DataStore {
.dns_zones_list_all_on_connection(opctx, conn, update.dns_group)
.await?;

// This method is used in nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|c| async move {
let version = self
.dns_group_latest_version_conn(opctx, conn, update.dns_group)
Expand Down Expand Up @@ -1724,6 +1737,8 @@ mod test {

let cds = datastore.clone();
let copctx = opctx.child(std::collections::BTreeMap::new());

#[allow(clippy::disallowed_methods)]
let mut fut = conn1
.transaction_async(|c1| async move {
cds.dns_update_incremental(&copctx, &c1, update1)
Expand Down
Loading
Loading