diff --git a/Cargo.lock b/Cargo.lock index 65ac0fd91c..04cea43c75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2321,9 +2321,9 @@ dependencies = [ [[package]] name = "diesel-dtrace" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5130181059723aae1cfdb678d3698052a225aaadb18000f77fec4200047acc" +checksum = "2750c8bd7a42381620b57f370ca5f757a71d554814e02a43c1bf54ae2656e01d" dependencies = [ "diesel", "serde", diff --git a/Cargo.toml b/Cargo.toml index ec862fb722..39d8f33ca6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -361,7 +361,7 @@ derive_more = "0.99.18" derive-where = "1.2.7" # Having the i-implement-... feature here makes diesel go away from the workspace-hack diesel = { version = "2.2.4", features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes", "postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] } -diesel-dtrace = "0.4.0" +diesel-dtrace = "0.4.2" dns-server = { path = "dns-server" } dns-server-api = { path = "dns-server-api" } dns-service-client = { path = "clients/dns-service-client" } diff --git a/clippy.toml b/clippy.toml index 31e28d5911..677fb67ade 100644 --- a/clippy.toml +++ b/clippy.toml @@ -15,5 +15,5 @@ disallowed-methods = [ # and can fail spuriously. # Instead, the "transaction_retry_wrapper" should be preferred, as it # automatically retries transactions experiencing contention. - { path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. Feel free to override this for tests and nested transactions." }, + { path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. For tests and nested transactions, use transaction_non_retry_wrapper to at least get dtrace probes" }, ] diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index 0c73ae1ae2..4210f2bee2 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -16,7 +16,6 @@ use crate::db::DbConnection; use crate::db::TransactionError; use crate::transaction_retry::OptionalError; use anyhow::Context; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; use chrono::Utc; @@ -106,7 +105,7 @@ impl DataStore { blueprint: &Blueprint, ) -> Result<(), Error> { let conn = self.pool_connection_authorized(opctx).await?; - Self::blueprint_insert_on_connection(&conn, opctx, blueprint).await + self.blueprint_insert_on_connection(&conn, opctx, blueprint).await } /// Creates a transaction iff the current blueprint is "bp_id". @@ -182,6 +181,7 @@ impl DataStore { /// Variant of [Self::blueprint_insert] which may be called from a /// transaction context. pub(crate) async fn blueprint_insert_on_connection( + &self, conn: &async_bb8_diesel::Connection, opctx: &OpContext, blueprint: &Blueprint, @@ -340,7 +340,8 @@ impl DataStore { // as most of the operations should be insertions rather than in-place // modifications of existing tables. #[allow(clippy::disallowed_methods)] - conn.transaction_async(|conn| async move { + self.transaction_non_retry_wrapper("blueprint_insert") + .transaction(&conn, |conn| async move { // Insert the row for the blueprint. { use db::schema::blueprint::dsl; diff --git a/nexus/db-queries/src/db/datastore/dns.rs b/nexus/db-queries/src/db/datastore/dns.rs index 3f0f7828fa..114d553aac 100644 --- a/nexus/db-queries/src/db/datastore/dns.rs +++ b/nexus/db-queries/src/db/datastore/dns.rs @@ -20,7 +20,6 @@ use crate::db::pagination::Paginator; use crate::db::pool::DbConnection; use crate::db::TransactionError; use crate::transaction_retry::OptionalError; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use futures::future::BoxFuture; @@ -453,20 +452,24 @@ impl DataStore { // This method is used in nested transactions, which are not supported // with retryable transactions. - #[allow(clippy::disallowed_methods)] - conn.transaction_async(|c| async move { - let version = self - .dns_group_latest_version_conn(opctx, conn, update.dns_group) - .await?; - self.dns_write_version_internal( - &c, - update, - zones, - Generation(version.version.next()), - ) + self.transaction_non_retry_wrapper("dns_update_incremental") + .transaction(&conn, |c| async move { + let version = self + .dns_group_latest_version_conn( + opctx, + conn, + update.dns_group, + ) + .await?; + self.dns_write_version_internal( + &c, + update, + zones, + Generation(version.version.next()), + ) + .await + }) .await - }) - .await } // This must only be used inside a transaction. Otherwise, it may make diff --git a/nexus/db-queries/src/db/datastore/inventory.rs b/nexus/db-queries/src/db/datastore/inventory.rs index 9269b233f3..08c64a3254 100644 --- a/nexus/db-queries/src/db/datastore/inventory.rs +++ b/nexus/db-queries/src/db/datastore/inventory.rs @@ -278,13 +278,13 @@ impl DataStore { // batch rather than making a bunch of round-trips to the database. // We'd do that if we had an interface for doing that with bound // parameters, etc. See oxidecomputer/omicron#973. - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; // The risk of a serialization error is possible here, but low, // as most of the operations should be insertions rather than in-place // modifications of existing tables. - #[allow(clippy::disallowed_methods)] - pool.transaction_async(|conn| async move { + self.transaction_non_retry_wrapper("inventory_insert_collection") + .transaction(&conn, |conn| async move { // Insert records (and generate ids) for any baseboards that do not // already exist in the database. These rows are not scoped to a // particular collection. They contain only immutable data -- diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 25f1f282f7..c90f1cc92e 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -320,6 +320,14 @@ impl DataStore { ) } + /// Constructs a non-retryable transaction helper + pub fn transaction_non_retry_wrapper( + &self, + name: &'static str, + ) -> crate::transaction_retry::NonRetryHelper { + crate::transaction_retry::NonRetryHelper::new(&self.log, name) + } + #[cfg(test)] pub(crate) fn transaction_retry_producer( &self, diff --git a/nexus/db-queries/src/db/datastore/rack.rs b/nexus/db-queries/src/db/datastore/rack.rs index dc3175c22d..067b8ed3ce 100644 --- a/nexus/db-queries/src/db/datastore/rack.rs +++ b/nexus/db-queries/src/db/datastore/rack.rs @@ -26,7 +26,6 @@ use crate::db::model::Zpool; use crate::db::pagination::paginated; use crate::db::pool::DbConnection; use crate::db::TransactionError; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -676,11 +675,9 @@ impl DataStore { // This method uses nested transactions, which are not supported // with retryable transactions. - #[allow(clippy::disallowed_methods)] - let rack = self - .pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| { + let conn = self.pool_connection_authorized(opctx).await?; + let rack = self.transaction_non_retry_wrapper("rack_set_initialized") + .transaction(&conn, |conn| { let err = err.clone(); let log = log.clone(); let authz_service_pool = authz_service_pool.clone(); @@ -752,7 +749,7 @@ impl DataStore { } // Insert the RSS-generated blueprint. - Self::blueprint_insert_on_connection( + self.blueprint_insert_on_connection( &conn, opctx, &blueprint, ) .await diff --git a/nexus/db-queries/src/db/datastore/role.rs b/nexus/db-queries/src/db/datastore/role.rs index ed8ec6fcd9..757658351b 100644 --- a/nexus/db-queries/src/db/datastore/role.rs +++ b/nexus/db-queries/src/db/datastore/role.rs @@ -20,7 +20,6 @@ use crate::db::model::RoleAssignment; use crate::db::model::RoleBuiltin; use crate::db::pagination::paginated_multicolumn; use crate::db::pool::DbConnection; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use nexus_db_fixed_data::role_assignment::BUILTIN_ROLE_ASSIGNMENTS; @@ -213,10 +212,9 @@ impl DataStore { // This method should probably be retryable, but this is slightly // complicated by the cloning semantics of the queries, which // must be Clone to be retried. - #[allow(clippy::disallowed_methods)] - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_non_retry_wrapper("role_assignment_replace_visible") + .transaction(&conn, |conn| async move { delete_old_query.execute_async(&conn).await?; Ok(insert_new_query.get_results_async(&conn).await?) }) diff --git a/nexus/db-queries/src/db/datastore/silo.rs b/nexus/db-queries/src/db/datastore/silo.rs index b862f3c461..c963e13ae4 100644 --- a/nexus/db-queries/src/db/datastore/silo.rs +++ b/nexus/db-queries/src/db/datastore/silo.rs @@ -24,7 +24,6 @@ use crate::db::model::VirtualProvisioningCollection; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; use crate::db::pool::DbConnection; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -227,9 +226,9 @@ impl DataStore { // This method uses nested transactions, which are not supported // with retryable transactions. - #[allow(clippy::disallowed_methods)] - let silo = conn - .transaction_async(|conn| async move { + let silo = self + .transaction_non_retry_wrapper("silo_create") + .transaction(&conn, |conn| async move { let silo = silo_create_query .get_result_async(&conn) .await @@ -429,48 +428,49 @@ impl DataStore { // This method uses nested transactions, which are not supported // with retryable transactions. - #[allow(clippy::disallowed_methods)] - conn.transaction_async(|conn| async move { - let updated_rows = diesel::update(silo::dsl::silo) - .filter(silo::dsl::time_deleted.is_null()) - .filter(silo::dsl::id.eq(id)) - .filter(silo::dsl::rcgen.eq(rcgen)) - .set(silo::dsl::time_deleted.eq(now)) - .execute_async(&conn) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByResource(authz_silo), - ) - })?; + self.transaction_non_retry_wrapper("silo_delete") + .transaction(&conn, |conn| async move { + let updated_rows = diesel::update(silo::dsl::silo) + .filter(silo::dsl::time_deleted.is_null()) + .filter(silo::dsl::id.eq(id)) + .filter(silo::dsl::rcgen.eq(rcgen)) + .set(silo::dsl::time_deleted.eq(now)) + .execute_async(&conn) + .await + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByResource(authz_silo), + ) + })?; - if updated_rows == 0 { - return Err(TxnError::CustomError(Error::invalid_request( - "silo deletion failed due to concurrent modification", - ))); - } + if updated_rows == 0 { + return Err(TxnError::CustomError(Error::invalid_request( + "silo deletion failed due to concurrent modification", + ))); + } - self.silo_quotas_delete(opctx, &conn, &authz_silo).await?; + self.silo_quotas_delete(opctx, &conn, &authz_silo).await?; - self.virtual_provisioning_collection_delete_on_connection( - &opctx.log, &conn, id, - ) - .await?; + self.virtual_provisioning_collection_delete_on_connection( + &opctx.log, &conn, id, + ) + .await?; - self.dns_update_incremental(dns_opctx, &conn, dns_update).await?; + self.dns_update_incremental(dns_opctx, &conn, dns_update) + .await?; - info!(opctx.log, "deleted silo {}", id); + info!(opctx.log, "deleted silo {}", id); - Ok(()) - }) - .await - .map_err(|e| match e { - TxnError::CustomError(e) => e, - TxnError::Database(e) => { - public_error_from_diesel(e, ErrorHandler::Server) - } - })?; + Ok(()) + }) + .await + .map_err(|e| match e { + TxnError::CustomError(e) => e, + TxnError::Database(e) => { + public_error_from_diesel(e, ErrorHandler::Server) + } + })?; // TODO-correctness This needs to happen in a saga or some other // mechanism that ensures it happens even if we crash at this point. diff --git a/nexus/db-queries/src/db/datastore/silo_group.rs b/nexus/db-queries/src/db/datastore/silo_group.rs index e6168f4e42..4d5543cc9c 100644 --- a/nexus/db-queries/src/db/datastore/silo_group.rs +++ b/nexus/db-queries/src/db/datastore/silo_group.rs @@ -16,7 +16,6 @@ use crate::db::model::SiloGroup; use crate::db::model::SiloGroupMembership; use crate::db::pagination::paginated; use crate::db::IncompleteOnConflictExt; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -198,12 +197,11 @@ impl DataStore { type TxnError = TransactionError; let group_id = authz_silo_group.id(); + let conn = self.pool_connection_authorized(opctx).await?; // Prefer to use "transaction_retry_wrapper" - #[allow(clippy::disallowed_methods)] - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { + self.transaction_non_retry_wrapper("silo_group_delete") + .transaction(&conn, |conn| async move { use db::schema::silo_group_membership; // Don't delete groups that still have memberships diff --git a/nexus/db-queries/src/lib.rs b/nexus/db-queries/src/lib.rs index 003310f920..c02c781c8e 100644 --- a/nexus/db-queries/src/lib.rs +++ b/nexus/db-queries/src/lib.rs @@ -38,4 +38,10 @@ mod probes { // Fires when we fail to find a VNI in the provided range. fn vni__search__range__empty(_: &usdt::UniqueId) {} + + // Fires when a transaction has started + fn transaction__start(conn_id: uuid::Uuid, name: &str) {} + + // Fires when a transaction has completed + fn transaction__done(conn_id: uuid::Uuid, name: &str) {} } diff --git a/nexus/db-queries/src/transaction_retry.rs b/nexus/db-queries/src/transaction_retry.rs index 02d00f8215..cf8ee22376 100644 --- a/nexus/db-queries/src/transaction_retry.rs +++ b/nexus/db-queries/src/transaction_retry.rs @@ -106,9 +106,15 @@ impl RetryHelper { + Send + Sync, { + crate::probes::transaction__start!(|| { + (conn.as_sync_conn().id(), self.name) + }); let result = conn .transaction_async_with_retry(f, || self.retry_callback()) .await; + crate::probes::transaction__done!(|| { + (conn.as_sync_conn().id(), self.name) + }); let retry_info = self.inner.lock().unwrap(); if retry_info.has_retried() { @@ -179,6 +185,48 @@ impl oximeter::Producer for Producer { } } +/// Helper utility to have a similar interface to RetryHelper (also emitting +/// probes) but for non-retryable transactions. +pub struct NonRetryHelper { + log: Logger, + name: &'static str, +} + +impl NonRetryHelper { + pub(crate) fn new(log: &Logger, name: &'static str) -> Self { + Self { log: log.new(o!("transaction" => name)), name } + } + + /// Calls the function "f" in an asynchronous, non-retryable transaction. + pub async fn transaction( + self, + conn: &async_bb8_diesel::Connection, + f: Func, + ) -> Result + where + R: Send + 'static, + E: From + std::fmt::Debug + Send + 'static, + Fut: std::future::Future> + Send, + Func: FnOnce(async_bb8_diesel::Connection) -> Fut + + Send + + Sync, + { + crate::probes::transaction__start!(|| { + (conn.as_sync_conn().id(), self.name) + }); + + #[allow(clippy::disallowed_methods)] + let result = conn.transaction_async(f).await; + crate::probes::transaction__done!(|| { + (conn.as_sync_conn().id(), self.name) + }); + if let Err(err) = result.as_ref() { + warn!(self.log, "Non-retryable transaction failure"; "err" => ?err); + } + result + } +} + /// Helper utility for passing non-retryable errors out-of-band from /// transactions. ///