From 9deaac49d6aa9ff24353a5be5f33031551cea117 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 5 Dec 2024 17:57:08 -0800 Subject: [PATCH] Use retryable transactions --- clippy.toml | 6 + dev-tools/omdb/src/bin/omdb/db.rs | 2 + .../db-queries/src/db/datastore/deployment.rs | 35 +-- nexus/db-queries/src/db/datastore/dns.rs | 79 ++++--- .../db-queries/src/db/datastore/inventory.rs | 171 +++++++------- nexus/db-queries/src/db/datastore/ip_pool.rs | 141 +++++++----- nexus/db-queries/src/db/datastore/rack.rs | 5 +- .../src/db/datastore/region_replacement.rs | 211 +++++++++--------- .../datastore/region_snapshot_replacement.rs | 158 +++++++------ nexus/db-queries/src/db/datastore/role.rs | 5 + nexus/db-queries/src/db/datastore/saga.rs | 1 + nexus/db-queries/src/db/datastore/silo.rs | 26 ++- .../db-queries/src/db/datastore/silo_group.rs | 2 + .../db-queries/src/db/datastore/silo_user.rs | 18 +- nexus/db-queries/src/db/pagination.rs | 1 + .../region_snapshot_replacement_start.rs | 1 + 16 files changed, 478 insertions(+), 384 deletions(-) diff --git a/clippy.toml b/clippy.toml index ffa3ffac70..31e28d5911 100644 --- a/clippy.toml +++ b/clippy.toml @@ -10,4 +10,10 @@ disallowed-methods = [ # `IncompleteOnConflictExt::as_partial_index` in `nexus-db-queries`. # See the documentation of that method for more. "diesel::upsert::DecoratableTarget::filter_target", + + # This form of transaction is susceptible to serialization failures, + # and can fail spuriously. + # Instead, the "transaction_retry_wrapper" should be preferred, as it + # automatically retries transactions experiencing contention. + { path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. Feel free to override this for tests and nested transactions." }, ] diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 4cccc3c23e..667a666375 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -14,6 +14,8 @@ // NOTE: emanates from Tabled macros #![allow(clippy::useless_vec)] +// NOTE: allowing "transaction_async" without retry +#![allow(clippy::disallowed_methods)] use crate::check_allow_destructive::DestructiveOperationToken; use crate::helpers::const_max_len; diff --git a/nexus/db-queries/src/db/datastore/deployment.rs b/nexus/db-queries/src/db/datastore/deployment.rs index 380a5c1b00..0c73ae1ae2 100644 --- a/nexus/db-queries/src/db/datastore/deployment.rs +++ b/nexus/db-queries/src/db/datastore/deployment.rs @@ -335,6 +335,11 @@ impl DataStore { // batch rather than making a bunch of round-trips to the database. // We'd do that if we had an interface for doing that with bound // parameters, etc. See oxidecomputer/omicron#973. + + // The risk of a serialization error is possible here, but low, + // as most of the operations should be insertions rather than in-place + // modifications of existing tables. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { // Insert the row for the blueprint. { @@ -1087,6 +1092,7 @@ impl DataStore { // start removing it and we'd also need to make sure we didn't leak a // collection if we crash while deleting it. let conn = self.pool_connection_authorized(opctx).await?; + let err = OptionalError::new(); let ( nblueprints, @@ -1101,19 +1107,23 @@ impl DataStore { nclickhouse_cluster_configs, nclickhouse_keepers, nclickhouse_servers, - ) = conn - .transaction_async(|conn| async move { + ) = self.transaction_retry_wrapper("blueprint_delete") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { // Ensure that blueprint we're about to delete is not the // current target. - let current_target = - Self::blueprint_current_target_only(&conn).await?; + let current_target = Self::blueprint_current_target_only(&conn) + .await + .map_err(|txn_err| txn_err.into_diesel(&err))?; + if current_target.target_id == blueprint_id { - return Err(TransactionError::CustomError( + return Err(err.bail(TransactionError::CustomError( Error::conflict(format!( "blueprint {blueprint_id} is the \ current target and cannot be deleted", )), - )); + ))); } // Remove the record describing the blueprint itself. @@ -1130,9 +1140,9 @@ impl DataStore { // references to it in any of the remaining tables either, since // deletion always goes through this transaction. if nblueprints == 0 { - return Err(TransactionError::CustomError( + return Err(err.bail(TransactionError::CustomError( authz_blueprint.not_found(), - )); + ))); } // Remove rows associated with sled states. @@ -1259,13 +1269,12 @@ impl DataStore { nclickhouse_keepers, nclickhouse_servers, )) + } }) .await - .map_err(|error| match error { - TransactionError::CustomError(e) => e, - TransactionError::Database(e) => { - public_error_from_diesel(e, ErrorHandler::Server) - } + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), })?; info!(&opctx.log, "removed blueprint"; diff --git a/nexus/db-queries/src/db/datastore/dns.rs b/nexus/db-queries/src/db/datastore/dns.rs index a691ce43aa..3f0f7828fa 100644 --- a/nexus/db-queries/src/db/datastore/dns.rs +++ b/nexus/db-queries/src/db/datastore/dns.rs @@ -19,6 +19,7 @@ use crate::db::pagination::paginated; use crate::db::pagination::Paginator; use crate::db::pool::DbConnection; use crate::db::TransactionError; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; @@ -363,40 +364,49 @@ impl DataStore { ) -> Result<(), Error> { opctx.authorize(authz::Action::Modify, &authz::DNS_CONFIG).await?; let conn = self.pool_connection_authorized(opctx).await?; - conn.transaction_async(|c| async move { - let zones = self - .dns_zones_list_all_on_connection(opctx, &c, update.dns_group) - .await?; - // This looks like a time-of-check-to-time-of-use race, but this - // approach works because we're inside a transaction and the - // isolation level is SERIALIZABLE. - let version = self - .dns_group_latest_version_conn(opctx, &c, update.dns_group) - .await?; - if version.version != old_version { - return Err(TransactionError::CustomError(Error::conflict( - format!( - "expected current DNS version to be {}, found {}", - *old_version, *version.version, - ), - ))); - } - self.dns_write_version_internal( - &c, - update, - zones, - Generation(old_version.next()), - ) + let err = OptionalError::new(); + + self.transaction_retry_wrapper("dns_update_from_version") + .transaction(&conn, |c| { + let err = err.clone(); + let update = update.clone(); + async move { + let zones = self + .dns_zones_list_all_on_connection(opctx, &c, update.dns_group) + .await + .map_err(|txn_error| txn_error.into_diesel(&err))?; + // This looks like a time-of-check-to-time-of-use race, but this + // approach works because we're inside a transaction and the + // isolation level is SERIALIZABLE. + let version = self + .dns_group_latest_version_conn(opctx, &c, update.dns_group) + .await + .map_err(|txn_error| txn_error.into_diesel(&err))?; + if version.version != old_version { + return Err(err.bail(TransactionError::CustomError(Error::conflict( + format!( + "expected current DNS version to be {}, found {}", + *old_version, *version.version, + ), + )))); + } + + self.dns_write_version_internal( + &c, + update, + zones, + Generation(old_version.next()), + ) + .await + .map_err(|txn_error| txn_error.into_diesel(&err)) + } + }) .await - }) - .await - .map_err(|e| match e { - TransactionError::CustomError(e) => e, - TransactionError::Database(e) => { - public_error_from_diesel(e, ErrorHandler::Server) - } - }) + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), + }) } /// Update the configuration of a DNS zone as specified in `update` @@ -441,6 +451,9 @@ impl DataStore { .dns_zones_list_all_on_connection(opctx, conn, update.dns_group) .await?; + // This method is used in nested transactions, which are not supported + // with retryable transactions. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|c| async move { let version = self .dns_group_latest_version_conn(opctx, conn, update.dns_group) @@ -1724,6 +1737,8 @@ mod test { let cds = datastore.clone(); let copctx = opctx.child(std::collections::BTreeMap::new()); + + #[allow(clippy::disallowed_methods)] let mut fut = conn1 .transaction_async(|c1| async move { cds.dns_update_incremental(&copctx, &c1, update1) diff --git a/nexus/db-queries/src/db/datastore/inventory.rs b/nexus/db-queries/src/db/datastore/inventory.rs index a6e2a6cf2a..9269b233f3 100644 --- a/nexus/db-queries/src/db/datastore/inventory.rs +++ b/nexus/db-queries/src/db/datastore/inventory.rs @@ -11,7 +11,6 @@ use crate::db::error::public_error_from_diesel_lookup; use crate::db::error::ErrorHandler; use crate::db::pagination::{paginated, paginated_multicolumn, Paginator}; use crate::db::queries::ALLOW_FULL_TABLE_SCAN_SQL; -use crate::db::TransactionError; use anyhow::Context; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; @@ -280,6 +279,11 @@ impl DataStore { // We'd do that if we had an interface for doing that with bound // parameters, etc. See oxidecomputer/omicron#973. let pool = self.pool_connection_authorized(opctx).await?; + + // The risk of a serialization error is possible here, but low, + // as most of the operations should be insertions rather than in-place + // modifications of existing tables. + #[allow(clippy::disallowed_methods)] pool.transaction_async(|conn| async move { // Insert records (and generate ids) for any baseboards that do not // already exist in the database. These rows are not scoped to a @@ -1242,6 +1246,7 @@ impl DataStore { // collection if we crash while deleting it. let conn = self.pool_connection_authorized(opctx).await?; let db_collection_id = to_db_typed_uuid(collection_id); + let ( ncollections, nsps, @@ -1258,22 +1263,22 @@ impl DataStore { nzpools, nerrors, nclickhouse_keeper_membership, - ) = conn - .transaction_async(|conn| async move { - // Remove the record describing the collection itself. - let ncollections = { - use db::schema::inv_collection::dsl; - diesel::delete( - dsl::inv_collection - .filter(dsl::id.eq(db_collection_id)), - ) - .execute_async(&conn) - .await? - }; + ) = + self.transaction_retry_wrapper("inventory_delete_collection") + .transaction(&conn, |conn| async move { + // Remove the record describing the collection itself. + let ncollections = { + use db::schema::inv_collection::dsl; + diesel::delete( + dsl::inv_collection + .filter(dsl::id.eq(db_collection_id)), + ) + .execute_async(&conn) + .await? + }; - // Remove rows for service processors. - let nsps = - { + // Remove rows for service processors. + let nsps = { use db::schema::inv_service_processor::dsl; diesel::delete(dsl::inv_service_processor.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1282,9 +1287,8 @@ impl DataStore { .await? }; - // Remove rows for roots of trust. - let nrots = - { + // Remove rows for roots of trust. + let nrots = { use db::schema::inv_root_of_trust::dsl; diesel::delete(dsl::inv_root_of_trust.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1293,9 +1297,8 @@ impl DataStore { .await? }; - // Remove rows for cabooses found. - let ncabooses = - { + // Remove rows for cabooses found. + let ncabooses = { use db::schema::inv_caboose::dsl; diesel::delete(dsl::inv_caboose.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1304,9 +1307,8 @@ impl DataStore { .await? }; - // Remove rows for root of trust pages found. - let nrot_pages = - { + // Remove rows for root of trust pages found. + let nrot_pages = { use db::schema::inv_root_of_trust_page::dsl; diesel::delete(dsl::inv_root_of_trust_page.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1315,9 +1317,8 @@ impl DataStore { .await? }; - // Remove rows for sled agents found. - let nsled_agents = - { + // Remove rows for sled agents found. + let nsled_agents = { use db::schema::inv_sled_agent::dsl; diesel::delete(dsl::inv_sled_agent.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1326,9 +1327,8 @@ impl DataStore { .await? }; - // Remove rows for datasets - let ndatasets = - { + // Remove rows for datasets + let ndatasets = { use db::schema::inv_dataset::dsl; diesel::delete(dsl::inv_dataset.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1337,9 +1337,8 @@ impl DataStore { .await? }; - // Remove rows for physical disks found. - let nphysical_disks = - { + // Remove rows for physical disks found. + let nphysical_disks = { use db::schema::inv_physical_disk::dsl; diesel::delete(dsl::inv_physical_disk.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1348,9 +1347,8 @@ impl DataStore { .await? }; - // Remove rows for NVMe physical disk firmware found. - let nnvme_disk_firwmare = - { + // Remove rows for NVMe physical disk firmware found. + let nnvme_disk_firwmare = { use db::schema::inv_nvme_disk_firmware::dsl; diesel::delete(dsl::inv_nvme_disk_firmware.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1359,9 +1357,8 @@ impl DataStore { .await? }; - // Remove rows associated with Omicron zones - let nsled_agent_zones = - { + // Remove rows associated with Omicron zones + let nsled_agent_zones = { use db::schema::inv_sled_omicron_zones::dsl; diesel::delete(dsl::inv_sled_omicron_zones.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1370,8 +1367,7 @@ impl DataStore { .await? }; - let nzones = - { + let nzones = { use db::schema::inv_omicron_zone::dsl; diesel::delete(dsl::inv_omicron_zone.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1380,8 +1376,7 @@ impl DataStore { .await? }; - let nnics = - { + let nnics = { use db::schema::inv_omicron_zone_nic::dsl; diesel::delete(dsl::inv_omicron_zone_nic.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1390,8 +1385,7 @@ impl DataStore { .await? }; - let nzpools = - { + let nzpools = { use db::schema::inv_zpool::dsl; diesel::delete(dsl::inv_zpool.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1400,9 +1394,8 @@ impl DataStore { .await? }; - // Remove rows for errors encountered. - let nerrors = - { + // Remove rows for errors encountered. + let nerrors = { use db::schema::inv_collection_error::dsl; diesel::delete(dsl::inv_collection_error.filter( dsl::inv_collection_id.eq(db_collection_id), @@ -1411,43 +1404,40 @@ impl DataStore { .await? }; - // Remove rows for clickhouse keeper membership - let nclickhouse_keeper_membership = { - use db::schema::inv_clickhouse_keeper_membership::dsl; - diesel::delete( - dsl::inv_clickhouse_keeper_membership.filter( - dsl::inv_collection_id.eq(db_collection_id), - ), - ) - .execute_async(&conn) - .await? - }; - - Ok(( - ncollections, - nsps, - nrots, - ncabooses, - nrot_pages, - nsled_agents, - ndatasets, - nphysical_disks, - nnvme_disk_firwmare, - nsled_agent_zones, - nzones, - nnics, - nzpools, - nerrors, - nclickhouse_keeper_membership, - )) - }) - .await - .map_err(|error| match error { - TransactionError::CustomError(e) => e, - TransactionError::Database(e) => { - public_error_from_diesel(e, ErrorHandler::Server) - } - })?; + // Remove rows for clickhouse keeper membership + let nclickhouse_keeper_membership = { + use db::schema::inv_clickhouse_keeper_membership::dsl; + diesel::delete( + dsl::inv_clickhouse_keeper_membership.filter( + dsl::inv_collection_id.eq(db_collection_id), + ), + ) + .execute_async(&conn) + .await? + }; + + Ok(( + ncollections, + nsps, + nrots, + ncabooses, + nrot_pages, + nsled_agents, + ndatasets, + nphysical_disks, + nnvme_disk_firwmare, + nsled_agent_zones, + nzones, + nnics, + nzpools, + nerrors, + nclickhouse_keeper_membership, + )) + }) + .await + .map_err(|error| { + public_error_from_diesel(error, ErrorHandler::Server) + })?; info!(&opctx.log, "removed inventory collection"; "collection_id" => collection_id.to_string(), @@ -2429,6 +2419,9 @@ impl DataStoreInventoryTest for DataStore { .pool_connection_for_tests() .await .context("getting connection")?; + + // This transaction is used by tests, and does not need to retry. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL) .await @@ -2484,6 +2477,8 @@ mod test { impl CollectionCounts { async fn new(conn: &DataStoreConnection) -> anyhow::Result { + // This transaction is used by tests, and does not need to retry. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL) .await @@ -2933,6 +2928,8 @@ mod test { .expect("failed to delete collection"); assert!(datastore.inventory_collections().await.unwrap().is_empty()); + // This transaction is used by tests, and does not need to retry. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await.unwrap(); let count = schema::inv_collection::dsl::inv_collection @@ -3055,6 +3052,8 @@ mod test { bail!("Tables missing from information_schema query"); } + // This transaction is used by tests, and does not need to retry. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { // We need this to call "COUNT(*)" below. conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL) diff --git a/nexus/db-queries/src/db/datastore/ip_pool.rs b/nexus/db-queries/src/db/datastore/ip_pool.rs index 9548003ee5..2409839eb4 100644 --- a/nexus/db-queries/src/db/datastore/ip_pool.rs +++ b/nexus/db-queries/src/db/datastore/ip_pool.rs @@ -30,7 +30,7 @@ use crate::db::pagination::Paginator; use crate::db::pool::DbConnection; use crate::db::queries::ip_pool::FilterOverlappingIpRanges; use crate::db::TransactionError; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -722,67 +722,90 @@ impl DataStore { } type TxnError = TransactionError; - conn.transaction_async(|conn| async move { - // note this is matching the specified silo, but could be any pool - let existing_default_for_silo = dsl::ip_pool_resource - .filter(dsl::resource_type.eq(IpPoolResourceType::Silo)) - .filter(dsl::resource_id.eq(silo_id)) - .filter(dsl::is_default.eq(true)) - .select(IpPoolResource::as_select()) - .get_result_async(&conn) - .await; - - // if there is an existing default, we need to unset it before we can - // set the new default - if let Ok(existing_default) = existing_default_for_silo { - // if the pool we're making default is already default for this - // silo, don't error: just noop - if existing_default.ip_pool_id == ip_pool_id { - return Ok(existing_default); - } + let err = OptionalError::new(); + + self.transaction_retry_wrapper("ip_pool_set_default") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + // note this is matching the specified silo, but could be any pool + let existing_default_for_silo = dsl::ip_pool_resource + .filter(dsl::resource_type.eq(IpPoolResourceType::Silo)) + .filter(dsl::resource_id.eq(silo_id)) + .filter(dsl::is_default.eq(true)) + .select(IpPoolResource::as_select()) + .get_result_async(&conn) + .await; + + // if there is an existing default, we need to unset it before we can + // set the new default + if let Ok(existing_default) = existing_default_for_silo { + // if the pool we're making default is already default for this + // silo, don't error: just noop + if existing_default.ip_pool_id == ip_pool_id { + return Ok(existing_default); + } + + let unset_default = + diesel::update(dsl::ip_pool_resource) + .filter( + dsl::resource_id + .eq(existing_default.resource_id), + ) + .filter( + dsl::ip_pool_id + .eq(existing_default.ip_pool_id), + ) + .filter( + dsl::resource_type + .eq(existing_default.resource_type), + ) + .set(dsl::is_default.eq(false)) + .execute_async(&conn) + .await; + if let Err(e) = unset_default { + return Err(err.bail(TxnError::CustomError( + IpPoolResourceUpdateError::FailedToUnsetDefault( + e, + ), + ))); + } + } - let unset_default = diesel::update(dsl::ip_pool_resource) - .filter(dsl::resource_id.eq(existing_default.resource_id)) - .filter(dsl::ip_pool_id.eq(existing_default.ip_pool_id)) - .filter( - dsl::resource_type.eq(existing_default.resource_type), + let updated_link = diesel::update(dsl::ip_pool_resource) + .filter(dsl::resource_id.eq(silo_id)) + .filter(dsl::ip_pool_id.eq(ip_pool_id)) + .filter(dsl::resource_type.eq(IpPoolResourceType::Silo)) + .set(dsl::is_default.eq(true)) + .returning(IpPoolResource::as_returning()) + .get_result_async(&conn) + .await?; + Ok(updated_link) + } + }) + .await + .map_err(|e| match err.take() { + Some(TxnError::CustomError( + IpPoolResourceUpdateError::FailedToUnsetDefault(err), + )) => public_error_from_diesel(err, ErrorHandler::Server), + Some(TxnError::Database(err)) => { + public_error_from_diesel(err, ErrorHandler::Server) + } + None => { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::IpPoolResource, + // TODO: would be nice to put the actual names and/or ids in + // here but LookupType on each of the two silos doesn't have + // a nice to_string yet or a way of composing them + LookupType::ByCompositeId( + "(pool, silo)".to_string(), + ), + ), ) - .set(dsl::is_default.eq(false)) - .execute_async(&conn) - .await; - if let Err(e) = unset_default { - return Err(TxnError::CustomError( - IpPoolResourceUpdateError::FailedToUnsetDefault(e), - )); } - } - - let updated_link = diesel::update(dsl::ip_pool_resource) - .filter(dsl::resource_id.eq(silo_id)) - .filter(dsl::ip_pool_id.eq(ip_pool_id)) - .filter(dsl::resource_type.eq(IpPoolResourceType::Silo)) - .set(dsl::is_default.eq(true)) - .returning(IpPoolResource::as_returning()) - .get_result_async(&conn) - .await?; - Ok(updated_link) - }) - .await - .map_err(|e| match e { - TransactionError::CustomError( - IpPoolResourceUpdateError::FailedToUnsetDefault(e), - ) => public_error_from_diesel(e, ErrorHandler::Server), - TransactionError::Database(e) => public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::IpPoolResource, - // TODO: would be nice to put the actual names and/or ids in - // here but LookupType on each of the two silos doesn't have - // a nice to_string yet or a way of composing them - LookupType::ByCompositeId("(pool, silo)".to_string()), - ), - ), - }) + }) } /// Ephemeral and snat IPs are associated with a silo through an instance, diff --git a/nexus/db-queries/src/db/datastore/rack.rs b/nexus/db-queries/src/db/datastore/rack.rs index 74b3440a7d..dc3175c22d 100644 --- a/nexus/db-queries/src/db/datastore/rack.rs +++ b/nexus/db-queries/src/db/datastore/rack.rs @@ -674,8 +674,9 @@ impl DataStore { let log = opctx.log.clone(); let err = Arc::new(OnceLock::new()); - // NOTE: This transaction cannot yet be made retryable, as it uses - // nested transactions. + // This method uses nested transactions, which are not supported + // with retryable transactions. + #[allow(clippy::disallowed_methods)] let rack = self .pool_connection_authorized(opctx) .await? diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs index de047d6d0c..0fda6b46ba 100644 --- a/nexus/db-queries/src/db/datastore/region_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -21,7 +21,7 @@ use crate::db::pagination::Paginator; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use crate::db::TransactionError; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use omicron_common::api::external::Error; @@ -52,21 +52,28 @@ impl DataStore { opctx: &OpContext, request: RegionReplacement, ) -> Result<(), Error> { - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; + let conn = self.pool_connection_authorized(opctx).await?; - Self::volume_repair_insert_query(request.volume_id, request.id) - .execute_async(&conn) - .await?; + self.transaction_retry_wrapper("insert_region_replacement_request") + .transaction(&conn, |conn| { + let request = request.clone(); + async move { + use db::schema::region_replacement::dsl; - diesel::insert_into(dsl::region_replacement) - .values(request) + Self::volume_repair_insert_query( + request.volume_id, + request.id, + ) .execute_async(&conn) .await?; - Ok(()) + diesel::insert_into(dsl::region_replacement) + .values(request) + .execute_async(&conn) + .await?; + + Ok(()) + } }) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) @@ -666,60 +673,62 @@ impl DataStore { ) -> Result<(), Error> { type TxnError = TransactionError; - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { - Self::volume_repair_delete_query( - request.volume_id, - request.id, - ) - .execute_async(&conn) - .await?; - - use db::schema::region_replacement::dsl; - - let result = diesel::update(dsl::region_replacement) - .filter(dsl::id.eq(request.id)) - .filter( - dsl::replacement_state.eq(RegionReplacementState::Completing), + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper("set_region_replacement_complete") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + Self::volume_repair_delete_query( + request.volume_id, + request.id, ) - .filter(dsl::operating_saga_id.eq(operating_saga_id)) - .set(( - dsl::replacement_state.eq(RegionReplacementState::Complete), - dsl::operating_saga_id.eq(Option::::None), - )) - .check_if_exists::(request.id) - .execute_and_check(&conn) + .execute_async(&conn) .await?; - match result.status { - UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => { - let record = result.found; - - if record.operating_saga_id == None - && record.replacement_state - == RegionReplacementState::Complete - { - Ok(()) - } else { - Err(TxnError::CustomError(Error::conflict(format!( - "region replacement {} set to {:?} (operating saga id {:?})", - request.id, - record.replacement_state, - record.operating_saga_id, - )))) + use db::schema::region_replacement::dsl; + + let result = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(request.id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Completing), + ) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Complete), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(request.id) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::Complete + { + Ok(()) + } else { + Err(err.bail(TxnError::from(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + request.id, + record.replacement_state, + record.operating_saga_id, + ))))) + } } } } }) .await - .map_err(|e| match e { - TxnError::CustomError(error) => error, - - TxnError::Database(error) => { - public_error_from_diesel(error, ErrorHandler::Server) - } + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), }) } @@ -738,57 +747,59 @@ impl DataStore { RegionReplacementState::Requested, ); - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { - Self::volume_repair_delete_query( - request.volume_id, - request.id, - ) - .execute_async(&conn) - .await?; - - use db::schema::region_replacement::dsl; - - let result = diesel::update(dsl::region_replacement) - .filter(dsl::id.eq(request.id)) - .filter( - dsl::replacement_state.eq(RegionReplacementState::Requested), + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper("set_region_replacement_complete_from_requested") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + Self::volume_repair_delete_query( + request.volume_id, + request.id, ) - .filter(dsl::operating_saga_id.is_null()) - .set(( - dsl::replacement_state.eq(RegionReplacementState::Complete), - )) - .check_if_exists::(request.id) - .execute_and_check(&conn) + .execute_async(&conn) .await?; - match result.status { - UpdateStatus::Updated => Ok(()), - - UpdateStatus::NotUpdatedButExists => { - let record = result.found; - - if record.replacement_state == RegionReplacementState::Complete { - Ok(()) - } else { - Err(TxnError::CustomError(Error::conflict(format!( - "region replacement {} set to {:?} (operating saga id {:?})", - request.id, - record.replacement_state, - record.operating_saga_id, - )))) + use db::schema::region_replacement::dsl; + + let result = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(request.id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Requested), + ) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Complete), + )) + .check_if_exists::(request.id) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.replacement_state == RegionReplacementState::Complete { + Ok(()) + } else { + Err(err.bail(TxnError::from(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + request.id, + record.replacement_state, + record.operating_saga_id, + ))))) + } } } } }) .await - .map_err(|e| match e { - TxnError::CustomError(error) => error, - - TxnError::Database(error) => { - public_error_from_diesel(error, ErrorHandler::Server) - } + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), }) } diff --git a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs index 4faaf228f9..76a83cca2a 100644 --- a/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_snapshot_replacement.rs @@ -22,7 +22,7 @@ use crate::db::pagination::Paginator; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use crate::db::TransactionError; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use omicron_common::api::external::Error; @@ -93,9 +93,14 @@ impl DataStore { request: RegionSnapshotReplacement, volume_id: Uuid, ) -> Result<(), Error> { - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper( + "insert_region_snapshot_replacement_request_with_volume_id", + ) + .transaction(&conn, |conn| { + let request = request.clone(); + async move { use db::schema::region_snapshot_replacement::dsl; use db::schema::volume_repair::dsl as volume_repair_dsl; @@ -116,9 +121,10 @@ impl DataStore { .await?; Ok(()) - }) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } pub async fn get_region_snapshot_replacement_request_by_id( @@ -563,67 +569,69 @@ impl DataStore { ) -> Result<(), Error> { type TxnError = TransactionError; - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { - use db::schema::volume_repair::dsl as volume_repair_dsl; - - diesel::delete( - volume_repair_dsl::volume_repair.filter( - volume_repair_dsl::repair_id - .eq(region_snapshot_replacement_id), - ), - ) - .execute_async(&conn) - .await?; - - use db::schema::region_snapshot_replacement::dsl; + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; - let result = diesel::update(dsl::region_snapshot_replacement) - .filter(dsl::id.eq(region_snapshot_replacement_id)) - .filter( - dsl::replacement_state - .eq(RegionSnapshotReplacementState::Running), + self.transaction_retry_wrapper("set_region_snapshot_replacement_complete") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + use db::schema::volume_repair::dsl as volume_repair_dsl; + + diesel::delete( + volume_repair_dsl::volume_repair.filter( + volume_repair_dsl::repair_id + .eq(region_snapshot_replacement_id), + ), ) - .filter(dsl::operating_saga_id.is_null()) - .set((dsl::replacement_state - .eq(RegionSnapshotReplacementState::Complete),)) - .check_if_exists::( - region_snapshot_replacement_id, - ) - .execute_and_check(&conn) + .execute_async(&conn) .await?; - match result.status { - UpdateStatus::Updated => Ok(()), - UpdateStatus::NotUpdatedButExists => { - let record = result.found; + use db::schema::region_snapshot_replacement::dsl; - if record.replacement_state - == RegionSnapshotReplacementState::Complete - { - Ok(()) - } else { - Err(TxnError::CustomError(Error::conflict( - format!( - "region snapshot replacement {} set to {:?} \ - (operating saga id {:?})", - region_snapshot_replacement_id, - record.replacement_state, - record.operating_saga_id, - ), - ))) + let result = diesel::update(dsl::region_snapshot_replacement) + .filter(dsl::id.eq(region_snapshot_replacement_id)) + .filter( + dsl::replacement_state + .eq(RegionSnapshotReplacementState::Running), + ) + .filter(dsl::operating_saga_id.is_null()) + .set((dsl::replacement_state + .eq(RegionSnapshotReplacementState::Complete),)) + .check_if_exists::( + region_snapshot_replacement_id, + ) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.replacement_state + == RegionSnapshotReplacementState::Complete + { + Ok(()) + } else { + Err(err.bail(TxnError::from(Error::conflict( + format!( + "region snapshot replacement {} set to {:?} \ + (operating saga id {:?})", + region_snapshot_replacement_id, + record.replacement_state, + record.operating_saga_id, + ), + )))) + } } } } }) .await - .map_err(|e| match e { - TxnError::CustomError(error) => error, - - TxnError::Database(error) => { - public_error_from_diesel(error, ErrorHandler::Server) - } + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), }) } @@ -893,9 +901,15 @@ impl DataStore { ) -> Result<(), Error> { type TxnError = TransactionError; - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper( + "set_region_snapshot_replacement_step_complete", + ) + .transaction(&conn, |conn| { + let err = err.clone(); + async move { use db::schema::volume_repair::dsl as volume_repair_dsl; diesel::delete( @@ -943,27 +957,25 @@ impl DataStore { { Ok(()) } else { - Err(TxnError::CustomError(Error::conflict( + Err(err.bail(TxnError::from(Error::conflict( format!( "region snapshot replacement step {} set \ - to {:?} (operating saga id {:?})", + to {:?} (operating saga id {:?})", region_snapshot_replacement_step_id, record.replacement_state, record.operating_saga_id, ), - ))) + )))) } } } - }) - .await - .map_err(|e| match e { - TxnError::CustomError(error) => error, - - TxnError::Database(error) => { - public_error_from_diesel(error, ErrorHandler::Server) - } - }) + } + }) + .await + .map_err(|e| match err.take() { + Some(err) => err.into(), + None => public_error_from_diesel(e, ErrorHandler::Server), + }) } /// Count all in-progress region snapshot replacement steps for a particular diff --git a/nexus/db-queries/src/db/datastore/role.rs b/nexus/db-queries/src/db/datastore/role.rs index b91597ad1d..ed8ec6fcd9 100644 --- a/nexus/db-queries/src/db/datastore/role.rs +++ b/nexus/db-queries/src/db/datastore/role.rs @@ -209,6 +209,11 @@ impl DataStore { // We might instead want to first-class the idea of Policies in the // database so that we can build up a whole new Policy in batches and // then flip the resource over to using it. + + // This method should probably be retryable, but this is slightly + // complicated by the cloning semantics of the queries, which + // must be Clone to be retried. + #[allow(clippy::disallowed_methods)] self.pool_connection_authorized(opctx) .await? .transaction_async(|conn| async move { diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 4bc212e997..87d94e2377 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -654,6 +654,7 @@ mod test { .expect("failed to re-assign sagas"); // Fetch all the sagas and check their states. + #[allow(clippy::disallowed_methods)] let all_sagas: Vec<_> = datastore .pool_connection_for_tests() .await diff --git a/nexus/db-queries/src/db/datastore/silo.rs b/nexus/db-queries/src/db/datastore/silo.rs index 2b7afa3270..b862f3c461 100644 --- a/nexus/db-queries/src/db/datastore/silo.rs +++ b/nexus/db-queries/src/db/datastore/silo.rs @@ -67,10 +67,11 @@ impl DataStore { use db::schema::silo::dsl; use db::schema::silo_quotas::dsl as quotas_dsl; + let conn = self.pool_connection_authorized(opctx).await?; + let count = self - .pool_connection_authorized(opctx) - .await? - .transaction_async(|conn| async move { + .transaction_retry_wrapper("load_builtin_silos") + .transaction(&conn, |conn| async move { diesel::insert_into(quotas_dsl::silo_quotas) .values(SiloQuotas::arbitrarily_high_default( DEFAULT_SILO.id(), @@ -78,19 +79,17 @@ impl DataStore { .on_conflict(quotas_dsl::silo_id) .do_nothing() .execute_async(&conn) - .await - .map_err(TransactionError::CustomError) - .unwrap(); - diesel::insert_into(dsl::silo) + .await?; + let count = diesel::insert_into(dsl::silo) .values([&*DEFAULT_SILO, &*INTERNAL_SILO]) .on_conflict(dsl::id) .do_nothing() .execute_async(&conn) - .await - .map_err(TransactionError::CustomError) + .await?; + Ok(count) }) .await - .unwrap(); + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; info!(opctx.log, "created {} built-in silos", count); @@ -226,6 +225,9 @@ impl DataStore { None }; + // This method uses nested transactions, which are not supported + // with retryable transactions. + #[allow(clippy::disallowed_methods)] let silo = conn .transaction_async(|conn| async move { let silo = silo_create_query @@ -424,6 +426,10 @@ impl DataStore { let now = Utc::now(); type TxnError = TransactionError; + + // This method uses nested transactions, which are not supported + // with retryable transactions. + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { let updated_rows = diesel::update(silo::dsl::silo) .filter(silo::dsl::time_deleted.is_null()) diff --git a/nexus/db-queries/src/db/datastore/silo_group.rs b/nexus/db-queries/src/db/datastore/silo_group.rs index b8ef759116..e6168f4e42 100644 --- a/nexus/db-queries/src/db/datastore/silo_group.rs +++ b/nexus/db-queries/src/db/datastore/silo_group.rs @@ -199,6 +199,8 @@ impl DataStore { let group_id = authz_silo_group.id(); + // Prefer to use "transaction_retry_wrapper" + #[allow(clippy::disallowed_methods)] self.pool_connection_authorized(opctx) .await? .transaction_async(|conn| async move { diff --git a/nexus/db-queries/src/db/datastore/silo_user.rs b/nexus/db-queries/src/db/datastore/silo_user.rs index 2825e2a310..40f6b3f0be 100644 --- a/nexus/db-queries/src/db/datastore/silo_user.rs +++ b/nexus/db-queries/src/db/datastore/silo_user.rs @@ -21,7 +21,6 @@ use crate::db::model::UserBuiltin; use crate::db::model::UserProvisionType; use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -92,9 +91,10 @@ impl DataStore { // TODO-robustness We might consider the RFD 192 "rcgen" pattern as well // so that people can't, say, login while we do this. let authz_silo_user_id = authz_silo_user.id(); - self.pool_connection_authorized(opctx) - .await? - .transaction_async(|mut conn| async move { + + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("silo_user_delete") + .transaction(&conn, |conn| async move { // Delete the user record. { use db::schema::silo_user::dsl; @@ -103,7 +103,7 @@ impl DataStore { .filter(dsl::time_deleted.is_null()) .set(dsl::time_deleted.eq(Utc::now())) .check_if_exists::(authz_silo_user_id) - .execute_and_check(&mut conn) + .execute_and_check(&conn) .await?; } @@ -112,7 +112,7 @@ impl DataStore { use db::schema::console_session::dsl; diesel::delete(dsl::console_session) .filter(dsl::silo_user_id.eq(authz_silo_user_id)) - .execute_async(&mut conn) + .execute_async(&conn) .await?; } @@ -121,7 +121,7 @@ impl DataStore { use db::schema::device_access_token::dsl; diesel::delete(dsl::device_access_token) .filter(dsl::silo_user_id.eq(authz_silo_user_id)) - .execute_async(&mut conn) + .execute_async(&conn) .await?; } @@ -130,7 +130,7 @@ impl DataStore { use db::schema::silo_group_membership::dsl; diesel::delete(dsl::silo_group_membership) .filter(dsl::silo_user_id.eq(authz_silo_user_id)) - .execute_async(&mut conn) + .execute_async(&conn) .await?; } @@ -141,7 +141,7 @@ impl DataStore { .filter(dsl::silo_user_id.eq(authz_silo_user_id)) .filter(dsl::time_deleted.is_null()) .set(dsl::time_deleted.eq(Utc::now())) - .execute_async(&mut conn) + .execute_async(&conn) .await?; } diff --git a/nexus/db-queries/src/db/pagination.rs b/nexus/db-queries/src/db/pagination.rs index 01911eb802..1929632980 100644 --- a/nexus/db-queries/src/db/pagination.rs +++ b/nexus/db-queries/src/db/pagination.rs @@ -679,6 +679,7 @@ mod test { pagparams: &DataPageParams<'_, (i64, i64)>, ) -> Vec { let conn = pool.claim().await.unwrap(); + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { // I couldn't figure out how to make this work without requiring a full // table scan, and I just want the test to work so that I can get on diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index 55927f7de8..4855f64ac2 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -1041,6 +1041,7 @@ pub(crate) mod test { let conn = datastore.pool_connection_for_tests().await.unwrap(); + #[allow(clippy::disallowed_methods)] conn.transaction_async(|conn| async move { // Selecting all regions requires a full table scan conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await.unwrap();