Skip to content

Commit

Permalink
Create dtrace probes for transactions (#7248)
Browse files Browse the repository at this point in the history
- Adds a "NonRetryHelper" struct to help instrument non-retryable
transactions
- Adds `transaction__start` and `transaction__done` probes which wrap
our retryable (and now, non-retryable transactions)

Intended to supplement
#7244 , and provide
transaction names
  • Loading branch information
smklein authored Dec 13, 2024
1 parent 7e7c3bb commit 06f8045
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 81 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ derive_more = "0.99.18"
derive-where = "1.2.7"
# Having the i-implement-... feature here makes diesel go away from the workspace-hack
diesel = { version = "2.2.4", features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes", "postgres", "r2d2", "chrono", "serde_json", "network-address", "uuid"] }
diesel-dtrace = "0.4.0"
diesel-dtrace = "0.4.2"
dns-server = { path = "dns-server" }
dns-server-api = { path = "dns-server-api" }
dns-service-client = { path = "clients/dns-service-client" }
Expand Down
2 changes: 1 addition & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ disallowed-methods = [
# and can fail spuriously.
# Instead, the "transaction_retry_wrapper" should be preferred, as it
# automatically retries transactions experiencing contention.
{ path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. Feel free to override this for tests and nested transactions." },
{ path = "async_bb8_diesel::AsyncConnection::transaction_async", reason = "Prefer to use transaction_retry_wrapper, if possible. For tests and nested transactions, use transaction_non_retry_wrapper to at least get dtrace probes" },
]
7 changes: 4 additions & 3 deletions nexus/db-queries/src/db/datastore/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::db::DbConnection;
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use anyhow::Context;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -106,7 +105,7 @@ impl DataStore {
blueprint: &Blueprint,
) -> Result<(), Error> {
let conn = self.pool_connection_authorized(opctx).await?;
Self::blueprint_insert_on_connection(&conn, opctx, blueprint).await
self.blueprint_insert_on_connection(&conn, opctx, blueprint).await
}

/// Creates a transaction iff the current blueprint is "bp_id".
Expand Down Expand Up @@ -182,6 +181,7 @@ impl DataStore {
/// Variant of [Self::blueprint_insert] which may be called from a
/// transaction context.
pub(crate) async fn blueprint_insert_on_connection(
&self,
conn: &async_bb8_diesel::Connection<DbConnection>,
opctx: &OpContext,
blueprint: &Blueprint,
Expand Down Expand Up @@ -340,7 +340,8 @@ impl DataStore {
// as most of the operations should be insertions rather than in-place
// modifications of existing tables.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|conn| async move {
self.transaction_non_retry_wrapper("blueprint_insert")
.transaction(&conn, |conn| async move {
// Insert the row for the blueprint.
{
use db::schema::blueprint::dsl;
Expand Down
31 changes: 17 additions & 14 deletions nexus/db-queries/src/db/datastore/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
use futures::future::BoxFuture;
Expand Down Expand Up @@ -453,20 +452,24 @@ impl DataStore {

// This method is used in nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|c| async move {
let version = self
.dns_group_latest_version_conn(opctx, conn, update.dns_group)
.await?;
self.dns_write_version_internal(
&c,
update,
zones,
Generation(version.version.next()),
)
self.transaction_non_retry_wrapper("dns_update_incremental")
.transaction(&conn, |c| async move {
let version = self
.dns_group_latest_version_conn(
opctx,
conn,
update.dns_group,
)
.await?;
self.dns_write_version_internal(
&c,
update,
zones,
Generation(version.version.next()),
)
.await
})
.await
})
.await
}

// This must only be used inside a transaction. Otherwise, it may make
Expand Down
6 changes: 3 additions & 3 deletions nexus/db-queries/src/db/datastore/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,13 @@ impl DataStore {
// batch rather than making a bunch of round-trips to the database.
// We'd do that if we had an interface for doing that with bound
// parameters, etc. See oxidecomputer/omicron#973.
let pool = self.pool_connection_authorized(opctx).await?;
let conn = self.pool_connection_authorized(opctx).await?;

// The risk of a serialization error is possible here, but low,
// as most of the operations should be insertions rather than in-place
// modifications of existing tables.
#[allow(clippy::disallowed_methods)]
pool.transaction_async(|conn| async move {
self.transaction_non_retry_wrapper("inventory_insert_collection")
.transaction(&conn, |conn| async move {
// Insert records (and generate ids) for any baseboards that do not
// already exist in the database. These rows are not scoped to a
// particular collection. They contain only immutable data --
Expand Down
8 changes: 8 additions & 0 deletions nexus/db-queries/src/db/datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ impl DataStore {
)
}

/// Constructs a non-retryable transaction helper
pub fn transaction_non_retry_wrapper(
&self,
name: &'static str,
) -> crate::transaction_retry::NonRetryHelper {
crate::transaction_retry::NonRetryHelper::new(&self.log, name)
}

#[cfg(test)]
pub(crate) fn transaction_retry_producer(
&self,
Expand Down
11 changes: 4 additions & 7 deletions nexus/db-queries/src/db/datastore/rack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::db::model::Zpool;
use crate::db::pagination::paginated;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
use diesel::prelude::*;
Expand Down Expand Up @@ -676,11 +675,9 @@ impl DataStore {

// This method uses nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
let rack = self
.pool_connection_authorized(opctx)
.await?
.transaction_async(|conn| {
let conn = self.pool_connection_authorized(opctx).await?;
let rack = self.transaction_non_retry_wrapper("rack_set_initialized")
.transaction(&conn, |conn| {
let err = err.clone();
let log = log.clone();
let authz_service_pool = authz_service_pool.clone();
Expand Down Expand Up @@ -752,7 +749,7 @@ impl DataStore {
}

// Insert the RSS-generated blueprint.
Self::blueprint_insert_on_connection(
self.blueprint_insert_on_connection(
&conn, opctx, &blueprint,
)
.await
Expand Down
8 changes: 3 additions & 5 deletions nexus/db-queries/src/db/datastore/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::db::model::RoleAssignment;
use crate::db::model::RoleBuiltin;
use crate::db::pagination::paginated_multicolumn;
use crate::db::pool::DbConnection;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
use nexus_db_fixed_data::role_assignment::BUILTIN_ROLE_ASSIGNMENTS;
Expand Down Expand Up @@ -213,10 +212,9 @@ impl DataStore {
// This method should probably be retryable, but this is slightly
// complicated by the cloning semantics of the queries, which
// must be Clone to be retried.
#[allow(clippy::disallowed_methods)]
self.pool_connection_authorized(opctx)
.await?
.transaction_async(|conn| async move {
let conn = self.pool_connection_authorized(opctx).await?;
self.transaction_non_retry_wrapper("role_assignment_replace_visible")
.transaction(&conn, |conn| async move {
delete_old_query.execute_async(&conn).await?;
Ok(insert_new_query.get_results_async(&conn).await?)
})
Expand Down
80 changes: 40 additions & 40 deletions nexus/db-queries/src/db/datastore/silo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::db::model::VirtualProvisioningCollection;
use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
use diesel::prelude::*;
Expand Down Expand Up @@ -227,9 +226,9 @@ impl DataStore {

// This method uses nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
let silo = conn
.transaction_async(|conn| async move {
let silo = self
.transaction_non_retry_wrapper("silo_create")
.transaction(&conn, |conn| async move {
let silo = silo_create_query
.get_result_async(&conn)
.await
Expand Down Expand Up @@ -429,48 +428,49 @@ impl DataStore {

// This method uses nested transactions, which are not supported
// with retryable transactions.
#[allow(clippy::disallowed_methods)]
conn.transaction_async(|conn| async move {
let updated_rows = diesel::update(silo::dsl::silo)
.filter(silo::dsl::time_deleted.is_null())
.filter(silo::dsl::id.eq(id))
.filter(silo::dsl::rcgen.eq(rcgen))
.set(silo::dsl::time_deleted.eq(now))
.execute_async(&conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::NotFoundByResource(authz_silo),
)
})?;
self.transaction_non_retry_wrapper("silo_delete")
.transaction(&conn, |conn| async move {
let updated_rows = diesel::update(silo::dsl::silo)
.filter(silo::dsl::time_deleted.is_null())
.filter(silo::dsl::id.eq(id))
.filter(silo::dsl::rcgen.eq(rcgen))
.set(silo::dsl::time_deleted.eq(now))
.execute_async(&conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::NotFoundByResource(authz_silo),
)
})?;

if updated_rows == 0 {
return Err(TxnError::CustomError(Error::invalid_request(
"silo deletion failed due to concurrent modification",
)));
}
if updated_rows == 0 {
return Err(TxnError::CustomError(Error::invalid_request(
"silo deletion failed due to concurrent modification",
)));
}

self.silo_quotas_delete(opctx, &conn, &authz_silo).await?;
self.silo_quotas_delete(opctx, &conn, &authz_silo).await?;

self.virtual_provisioning_collection_delete_on_connection(
&opctx.log, &conn, id,
)
.await?;
self.virtual_provisioning_collection_delete_on_connection(
&opctx.log, &conn, id,
)
.await?;

self.dns_update_incremental(dns_opctx, &conn, dns_update).await?;
self.dns_update_incremental(dns_opctx, &conn, dns_update)
.await?;

info!(opctx.log, "deleted silo {}", id);
info!(opctx.log, "deleted silo {}", id);

Ok(())
})
.await
.map_err(|e| match e {
TxnError::CustomError(e) => e,
TxnError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})?;
Ok(())
})
.await
.map_err(|e| match e {
TxnError::CustomError(e) => e,
TxnError::Database(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})?;

// TODO-correctness This needs to happen in a saga or some other
// mechanism that ensures it happens even if we crash at this point.
Expand Down
8 changes: 3 additions & 5 deletions nexus/db-queries/src/db/datastore/silo_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::db::model::SiloGroup;
use crate::db::model::SiloGroupMembership;
use crate::db::pagination::paginated;
use crate::db::IncompleteOnConflictExt;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
use diesel::prelude::*;
Expand Down Expand Up @@ -198,12 +197,11 @@ impl DataStore {
type TxnError = TransactionError<SiloDeleteError>;

let group_id = authz_silo_group.id();
let conn = self.pool_connection_authorized(opctx).await?;

// Prefer to use "transaction_retry_wrapper"
#[allow(clippy::disallowed_methods)]
self.pool_connection_authorized(opctx)
.await?
.transaction_async(|conn| async move {
self.transaction_non_retry_wrapper("silo_group_delete")
.transaction(&conn, |conn| async move {
use db::schema::silo_group_membership;

// Don't delete groups that still have memberships
Expand Down
6 changes: 6 additions & 0 deletions nexus/db-queries/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ mod probes {

// Fires when we fail to find a VNI in the provided range.
fn vni__search__range__empty(_: &usdt::UniqueId) {}

// Fires when a transaction has started
fn transaction__start(conn_id: uuid::Uuid, name: &str) {}

// Fires when a transaction has completed
fn transaction__done(conn_id: uuid::Uuid, name: &str) {}
}
Loading

0 comments on commit 06f8045

Please sign in to comment.