From 42687c1346c9f5be1faf41362247db1201c426a8 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 3 Dec 2024 13:04:40 -0500 Subject: [PATCH] Move Nexus notifications to a standalone task --- Cargo.lock | 2 - upstairs/Cargo.toml | 10 +- upstairs/src/client.rs | 2 +- upstairs/src/downstairs.rs | 772 +++++++++---------------------------- upstairs/src/lib.rs | 62 +-- upstairs/src/notify.rs | 354 +++++++++++++++++ upstairs/src/upstairs.rs | 24 +- 7 files changed, 558 insertions(+), 668 deletions(-) create mode 100644 upstairs/src/notify.rs diff --git a/Cargo.lock b/Cargo.lock index 79bb0fb7c..cce38ec7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -928,7 +928,6 @@ dependencies = [ "expectorate", "futures", "futures-core", - "http 0.2.12", "humantime", "internal-dns", "itertools 0.13.0", @@ -940,7 +939,6 @@ dependencies = [ "openapiv3", "oximeter", "oximeter-producer", - "progenitor-client 0.9.1", "proptest", "rand 0.8.5", "rand_chacha 0.3.1", diff --git a/upstairs/Cargo.toml b/upstairs/Cargo.toml index 0e6c89e3a..b344d4e28 100644 --- a/upstairs/Cargo.toml +++ b/upstairs/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" [features] asm = ["usdt/asm"] -notify-nexus = ["nexus-client", "internal-dns", "progenitor-client", "http", "omicron-uuid-kinds"] +notify-nexus = [] integration-tests = [] [dependencies] @@ -56,11 +56,9 @@ aes-gcm-siv.workspace = true rand_chacha.workspace = true reqwest.workspace = true crucible-workspace-hack.workspace = true -nexus-client = { workspace = true, optional = true } -internal-dns = { workspace = true, optional = true } -omicron-uuid-kinds = { workspace = true, optional = true } -progenitor-client = { workspace = true, optional = true } -http = { workspace = true, optional = true } +nexus-client.workspace = true +omicron-uuid-kinds.workspace = true +internal-dns.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 416c185c9..1f75ae013 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1919,7 +1919,7 @@ impl DownstairsClient { self.client_delay_us.load(Ordering::Relaxed) } - #[cfg(feature = "notify-nexus")] + /// Looks up the region UUID pub(crate) fn id(&self) -> Option { self.region_uuid } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index c69ca5422..b1f7499c3 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -10,11 +10,13 @@ use crate::{ cdt, client::{ ClientAction, ClientFaultReason, ClientNegotiationFailed, - ClientStopReason, DownstairsClient, EnqueueResult, NegotiationState, + ClientRunResult, ClientStopReason, DownstairsClient, EnqueueResult, + NegotiationState, }, guest::GuestBlockRes, io_limits::{IOLimitGuard, IOLimits}, live_repair::ExtentInfo, + notify::{NotifyQueue, NotifyRequest}, stats::DownstairsStatOuter, upstairs::{UpstairsConfig, UpstairsState}, AckStatus, ActiveJobs, AllocRingBuffer, BlockRes, Buffer, ClientData, @@ -34,36 +36,11 @@ use ringbuffer::RingBuffer; use slog::{debug, error, info, o, warn, Logger}; use uuid::Uuid; -cfg_if::cfg_if! { - if #[cfg(feature = "notify-nexus")] { - use chrono::Utc; - - use crate::client::ClientRunResult; - use crate::get_nexus_client; - - use nexus_client::types::DownstairsClientStopped; - use nexus_client::types::DownstairsClientStoppedReason; - use nexus_client::types::DownstairsUnderRepair; - use nexus_client::types::RepairFinishInfo; - use nexus_client::types::RepairProgress; - use nexus_client::types::RepairStartInfo; - use nexus_client::types::UpstairsRepairType; - - use omicron_uuid_kinds::DownstairsKind; - use omicron_uuid_kinds::GenericUuid; - use omicron_uuid_kinds::TypedUuid; - use omicron_uuid_kinds::UpstairsKind; - use omicron_uuid_kinds::UpstairsRepairKind; - use omicron_uuid_kinds::UpstairsSessionKind; - } -} - /// Downstairs data /// /// This data structure is responsible for tracking outstanding jobs from the /// perspective of the (3x) downstairs. It contains a list of all active jobs, /// as well as three `DownstairsClient` with per-client data. -#[derive(Debug)] pub(crate) struct Downstairs { /// Shared configuration cfg: Arc, @@ -140,9 +117,8 @@ pub(crate) struct Downstairs { /// Handle for stats stats: DownstairsStatOuter, - /// A reqwest client, to be reused when creating Nexus clients - #[cfg(feature = "notify-nexus")] - reqwest_client: reqwest::Client, + /// A queue to send notifications to Nexus + notify: Option, } /// Tuple storing a job and an optional result @@ -343,6 +319,7 @@ impl Downstairs { tls_context: Option>, stats: DownstairsStatOuter, io_limits: &IOLimits, + notify: Option, log: Logger, ) -> Self { let mut clients = [None, None, None]; @@ -356,6 +333,7 @@ impl Downstairs { tls_context.clone(), )); } + let clients = clients.map(Option::unwrap); Self { clients: ClientData(clients), @@ -393,13 +371,7 @@ impl Downstairs { repair: None, ddef: None, stats, - - #[cfg(feature = "notify-nexus")] - reqwest_client: reqwest::ClientBuilder::new() - .connect_timeout(std::time::Duration::from_secs(15)) - .timeout(std::time::Duration::from_secs(15)) - .build() - .unwrap(), + notify, } } @@ -422,8 +394,15 @@ impl Downstairs { // Build a set of fake IO limits that we won't hit let io_limits = IOLimits::new(u32::MAX as usize, u32::MAX as usize); - let mut ds = - Self::new(cfg, ClientMap::new(), None, stats, &io_limits, log); + let mut ds = Self::new( + cfg, + ClientMap::new(), + None, + stats, + &io_limits, + None, + log, + ); // Create a fake repair address so this field is populated. for cid in ClientId::iter() { @@ -996,11 +975,7 @@ impl Downstairs { reconcile.reconcile_repair_needed, ); - #[cfg(feature = "notify-nexus")] - { - self.notify_nexus_of_reconcile_start(&reconcile); - } - + self.notify_reconcile_start(&reconcile); self.reconcile = Some(reconcile); self.reconcile_repaired = 0; @@ -1095,11 +1070,8 @@ impl Downstairs { self.repair.as_ref().unwrap().id ); - #[cfg(feature = "notify-nexus")] - { - let repair = self.repair.as_ref().unwrap(); - self.notify_nexus_of_live_repair_start(repair); - } + let repair = self.repair.as_ref().unwrap(); + self.notify_live_repair_start(repair); // We'll be back in on_live_repair once the initial job finishes true @@ -1281,17 +1253,14 @@ impl Downstairs { let aborting = repair.aborting_repair; let source_downstairs = repair.source_downstairs; - #[cfg(feature = "notify-nexus")] - { - let repair_id = repair.id; - let extent_count = repair.extent_count; + let repair_id = repair.id; + let extent_count = repair.extent_count; - self.notify_nexus_of_live_repair_progress( - repair_id, - active_extent, - extent_count, - ); - } + self.notify_live_repair_progress( + repair_id, + active_extent, + extent_count, + ); self.begin_repair_for( active_extent, @@ -1316,11 +1285,8 @@ impl Downstairs { } } - #[cfg(feature = "notify-nexus")] - { - let repair = self.repair.as_ref().unwrap(); - self.notify_nexus_of_live_repair_finish(repair); - } + let repair = self.repair.as_ref().unwrap(); + self.notify_live_repair_finish(repair); // Set `self.repair` to `None` on our way out the door (because // repair is done, one way or the other) @@ -1838,22 +1804,19 @@ impl Downstairs { reconcile.current_work = Some(next); - #[cfg(feature = "notify-nexus")] - { - let reconcile_id = reconcile.id; - - // `on_reconciliation_job_done` increments one of these and - // decrements the other, so add them together to get total task - // count to send to Nexus. - let task_count = - self.reconcile_repaired + reconcile.reconcile_repair_needed; - - self.notify_nexus_of_reconcile_progress( - reconcile_id, - self.reconcile_repaired, - task_count, - ); - } + let reconcile_id = reconcile.id; + + // `on_reconciliation_job_done` increments one of these and + // decrements the other, so add them together to get total task + // count to send to Nexus. + let task_count = + self.reconcile_repaired + reconcile.reconcile_repair_needed; + + self.notify_reconcile_progress( + reconcile_id, + self.reconcile_repaired, + task_count, + ); false } @@ -1964,16 +1927,7 @@ impl Downstairs { info!(self.log, "Clear out existing repair work queue"); if let Some(r) = self.reconcile.take() { - #[cfg(feature = "notify-nexus")] - { - self.notify_nexus_of_reconcile_finished( - &r, true, /* aborted */ - ); - } - #[cfg(not(feature = "notify-nexus"))] - { - let _ = r; // avoid unused warning - } + self.notify_reconcile_finished(&r, true /* aborted */); self.reconcile_repaired = 0; } else { // If reconcile is None, then these should also be cleared @@ -1997,13 +1951,7 @@ impl Downstairs { // reconciliation completed let r = self.reconcile.take().unwrap(); assert!(r.task_list.is_empty()); - - #[cfg(feature = "notify-nexus")] - { - self.notify_nexus_of_reconcile_finished( - &r, false, /* aborted */ - ); - } + self.notify_reconcile_finished(&r, false /* aborted */); } else { // no reconciliation was required assert!(self.reconcile.is_none()); @@ -3771,562 +3719,194 @@ impl Downstairs { } } - #[cfg(feature = "notify-nexus")] - fn get_target_addrs(&self) -> Vec { - self.clients - .iter() - .filter_map(|client| client.target_addr) - .collect() - } - - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_live_repair_start(&self, repair: &LiveRepairData) { - let log = self.log.new(o!("repair" => repair.id.to_string())); - - let mut repairs = Vec::with_capacity(repair.repair_downstairs.len()); - - for cid in &repair.repair_downstairs { - let Some(region_uuid) = self.clients[*cid].id() else { - // A downstairs doesn't have an id but is being repaired...? - warn!(log, "downstairs {cid} has a None id?"); - continue; - }; + fn notify_live_repair_start(&self, repair: &LiveRepairData) { + if let Some(notify) = &self.notify { + let log = self.log.new(o!("repair" => repair.id.to_string())); + let mut repairs = + Vec::with_capacity(repair.repair_downstairs.len()); - let Some(target_addr) = self.clients[*cid].target_addr else { - // A downstairs doesn't have a target_addr but is being - // repaired...? - warn!(log, "downstairs {cid} has a None target_addr?"); - continue; - }; + for cid in &repair.repair_downstairs { + let Some(region_uuid) = self.clients[*cid].id() else { + // A downstairs doesn't have an id but is being repaired...? + warn!(log, "downstairs {cid} has a None id?"); + continue; + }; - repairs.push(DownstairsUnderRepair { - region_uuid: region_uuid.into(), - target_addr: target_addr.to_string(), - }); - } + let Some(target_addr) = self.clients[*cid].target_addr else { + // A downstairs doesn't have a target_addr but is being + // repaired...? + warn!(log, "downstairs {cid} has a None target_addr?"); + continue; + }; - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let session_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.session_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(repair.id); - - let now = Utc::now(); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting start notification" - ); - return; - }; + repairs.push((region_uuid, target_addr)); + } - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_start( - &upstairs_id, - &RepairStartInfo { - time: now, - repair_id, - repair_type: UpstairsRepairType::Live, - session_id, - repairs: repairs.clone(), - }, - ) - .await + notify.send(NotifyRequest::LiveRepairStart { + upstairs_id: self.cfg.upstairs_id, + session_id: self.cfg.session_id, + repair_id: repair.id, + repairs, }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of repair start"); - } - - Err(e) => { - error!(log, "failed to notify Nexus of repair start! {e}"); - } - } - }); + } } - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_live_repair_finish(&self, repair: &LiveRepairData) { - let log = self.log.new(o!("repair" => repair.id.to_string())); + fn notify_live_repair_finish(&self, repair: &LiveRepairData) { + if let Some(notify) = &self.notify { + let log = self.log.new(o!("repair" => repair.id.to_string())); - let aborted = repair.aborting_repair; + let mut repairs = + Vec::with_capacity(repair.repair_downstairs.len()); - let mut repairs = Vec::with_capacity(repair.repair_downstairs.len()); + for cid in &repair.repair_downstairs { + let Some(region_uuid) = self.clients[*cid].id() else { + // A downstairs doesn't have an id but is being repaired...? + warn!(log, "downstairs {cid} has a None id?"); + continue; + }; - for cid in &repair.repair_downstairs { - let Some(region_uuid) = self.clients[*cid].id() else { - // A downstairs doesn't have an id but is being repaired...? - warn!(log, "downstairs {cid} has a None id?"); - continue; - }; + let Some(target_addr) = self.clients[*cid].target_addr else { + // A downstairs doesn't have a target_addr but is being + // repaired...? + warn!(log, "downstairs {cid} has a None target_addr?"); + continue; + }; - let Some(target_addr) = self.clients[*cid].target_addr else { - // A downstairs doesn't have a target_addr but is being - // repaired...? - warn!(log, "downstairs {cid} has a None target_addr?"); - continue; - }; + repairs.push((region_uuid, target_addr)); + } - repairs.push(DownstairsUnderRepair { - region_uuid: region_uuid.into(), - target_addr: target_addr.to_string(), + notify.send(NotifyRequest::LiveRepairFinish { + upstairs_id: self.cfg.upstairs_id, + session_id: self.cfg.session_id, + repair_id: repair.id, + repairs, + aborted: repair.aborting_repair, }); } - - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let session_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.session_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(repair.id); - - let now = Utc::now(); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting finish notification" - ); - return; - }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_finish( - &upstairs_id, - &RepairFinishInfo { - time: now, - repair_id, - repair_type: UpstairsRepairType::Live, - session_id, - repairs: repairs.clone(), - aborted, - }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of repair finish"); - } - - Err(e) => { - error!(log, "failed to notify Nexus of repair finish! {e}"); - } - } - }); } - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_live_repair_progress( + fn notify_live_repair_progress( &self, repair_id: Uuid, current_extent: ExtentId, extent_count: u32, ) { - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(repair_id); - - let now = Utc::now(); - let log = self.log.new(o!("repair" => repair_id.to_string())); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting progress notification" - ); - return; - }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_progress( - &upstairs_id, - &repair_id, - &RepairProgress { - time: now, - // surely we won't have u64::MAX extents - current_item: current_extent.0 as i64, - // i am serious, and don't call me shirley - total_items: extent_count as i64, - }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of repair progress"); - } - - Err(e) => { - error!( - log, - "failed to notify Nexus of repair progress! {e}" - ); - } - } - }); + if let Some(notify) = &self.notify { + notify.send(NotifyRequest::LiveRepairProgress { + upstairs_id: self.cfg.upstairs_id, + repair_id, + // surely we won't have u64::MAX extents + current_item: current_extent.0 as i64, + // i am serious, and don't call me shirley + total_items: extent_count as i64, + }); + } } - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_reconcile_start(&self, reconcile: &ReconcileData) { - let log = self.log.new(o!("reconcile" => reconcile.id.to_string())); + fn notify_reconcile_start(&self, reconcile: &ReconcileData) { + if let Some(notify) = &self.notify { + let log = self.log.new(o!("reconcile" => reconcile.id.to_string())); - // Reconcilation involves everyone - let mut repairs = Vec::with_capacity(self.clients.len()); + // Reconcilation involves everyone + let mut repairs = Vec::with_capacity(self.clients.len()); - for (cid, client) in self.clients.iter().enumerate() { - let Some(region_uuid) = client.id() else { - // A downstairs doesn't have an id but is being reconciled...? - warn!(log, "downstairs {cid} has a None id?"); - continue; - }; + for (cid, client) in self.clients.iter().enumerate() { + let Some(region_uuid) = client.id() else { + // A downstairs doesn't have an id but is being reconciled...? + warn!(log, "downstairs {cid} has a None id?"); + continue; + }; - let Some(target_addr) = client.target_addr else { - // A downstairs doesn't have a target_addr but is being - // reconciled...? - warn!(log, "downstairs {cid} has a None target_addr?"); - continue; - }; + let Some(target_addr) = client.target_addr else { + // A downstairs doesn't have a target_addr but is being + // reconciled...? + warn!(log, "downstairs {cid} has a None target_addr?"); + continue; + }; - repairs.push(DownstairsUnderRepair { - region_uuid: region_uuid.into(), - target_addr: target_addr.to_string(), + repairs.push((region_uuid, target_addr)); + } + + notify.send(NotifyRequest::ReconcileStart { + upstairs_id: self.cfg.upstairs_id, + repair_id: reconcile.id, + session_id: self.cfg.session_id, + repairs, }); } - - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let session_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.session_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(reconcile.id); - - let now = Utc::now(); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting start notification" - ); - return; - }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_start( - &upstairs_id, - &RepairStartInfo { - time: now, - repair_id, - repair_type: UpstairsRepairType::Reconciliation, - session_id, - repairs: repairs.clone(), - }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of reconcile start"); - } - - Err(e) => { - error!( - log, - "error notifying Nexus of reconcile start! {e}" - ); - } - } - }); } - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_reconcile_finished( + fn notify_reconcile_finished( &self, reconcile: &ReconcileData, aborted: bool, ) { - let log = self.log.new(o!("reconcile" => reconcile.id.to_string())); + if let Some(notify) = &self.notify { + let log = self.log.new(o!("reconcile" => reconcile.id.to_string())); - // Reconcilation involves everyone - let mut repairs = Vec::with_capacity(self.clients.len()); + // Reconcilation involves everyone + let mut repairs = Vec::with_capacity(self.clients.len()); - for (cid, client) in self.clients.iter().enumerate() { - let Some(region_uuid) = client.id() else { - // A downstairs doesn't have an id but is being reconciled...? - warn!(log, "downstairs {cid} has a None id?"); - continue; - }; + for (cid, client) in self.clients.iter().enumerate() { + let Some(region_uuid) = client.id() else { + // A downstairs doesn't have an id but is being reconciled...? + warn!(log, "downstairs {cid} has a None id?"); + continue; + }; - let Some(target_addr) = client.target_addr else { - // A downstairs doesn't have a target_addr but is being - // reconciled...? - warn!(log, "downstairs {cid} has a None target_addr?"); - continue; - }; + let Some(target_addr) = client.target_addr else { + // A downstairs doesn't have a target_addr but is being + // reconciled...? + warn!(log, "downstairs {cid} has a None target_addr?"); + continue; + }; + + repairs.push((region_uuid, target_addr)); + } - repairs.push(DownstairsUnderRepair { - region_uuid: region_uuid.into(), - target_addr: target_addr.to_string(), + notify.send(NotifyRequest::ReconcileFinish { + upstairs_id: self.cfg.upstairs_id, + session_id: self.cfg.session_id, + repair_id: reconcile.id, + aborted, + repairs, }); } - - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let session_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.session_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(reconcile.id); - - let now = Utc::now(); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting finish notification" - ); - return; - }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_finish( - &upstairs_id, - &RepairFinishInfo { - time: now, - repair_id, - repair_type: UpstairsRepairType::Reconciliation, - session_id, - repairs: repairs.clone(), - aborted, - }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of reconcile finish"); - } - - Err(e) => { - error!( - log, - "failed to notify Nexus of reconcile finish! {e}" - ); - } - } - }); } - #[cfg(feature = "notify-nexus")] - fn notify_nexus_of_reconcile_progress( + fn notify_reconcile_progress( &self, reconcile_id: Uuid, current_task: usize, task_count: usize, ) { - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - let repair_id: TypedUuid = - TypedUuid::from_untyped_uuid(reconcile_id); - - let now = Utc::now(); - let log = self.log.new(o!("reconcile" => repair_id.to_string())); - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. - error!( - log, - "no Nexus client from DNS, aborting progress notification" - ); - return; - }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_upstairs_repair_progress( - &upstairs_id, - &repair_id, - &RepairProgress { - time: now, - // surely we won't have usize::MAX extents - current_item: current_task as i64, - // i am serious, and don't call me shirley - total_items: task_count as i64, - }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of reconcile progress"); - } - - Err(e) => { - error!( - log, - "failed to notify Nexus of reconcile progress! {e}" - ); - } - } - }); + if let Some(notify) = &self.notify { + notify.send(NotifyRequest::ReconcileProgress { + upstairs_id: self.cfg.upstairs_id, + repair_id: reconcile_id, + // surely we won't have usize::MAX extents + current_item: current_task as i64, + // i am serious, and don't call me shirley + total_items: task_count as i64, + }); + } } - #[cfg(feature = "notify-nexus")] - pub(crate) fn notify_nexus_of_client_task_stopped( + pub(crate) fn notify_client_task_stopped( &self, client_id: ClientId, reason: ClientRunResult, ) { - let upstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(self.cfg.upstairs_id); - - let Some(downstairs_id) = self.clients[client_id].id() else { - return; - }; - let downstairs_id: TypedUuid = - TypedUuid::from_untyped_uuid(downstairs_id); - - let now = Utc::now(); - let log = self - .log - .new(o!("downstairs_id" => downstairs_id.to_string())); - - let reason = match reason { - ClientRunResult::ConnectionTimeout => { - DownstairsClientStoppedReason::ConnectionTimeout - } - ClientRunResult::ConnectionFailed(_) => { - // skip this notification, it's too noisy during connection - // retries - //DownstairsClientStoppedReason::ConnectionFailed - return; - } - ClientRunResult::Timeout => DownstairsClientStoppedReason::Timeout, - ClientRunResult::WriteFailed(_) => { - DownstairsClientStoppedReason::WriteFailed - } - ClientRunResult::ReadFailed(_) => { - DownstairsClientStoppedReason::ReadFailed - } - ClientRunResult::RequestedStop => { - // skip this notification, it fires for *every* Upstairs - // deactivation - //DownstairsClientStoppedReason::RequestedStop - return; - } - ClientRunResult::Finished => { - DownstairsClientStoppedReason::Finished - } - ClientRunResult::QueueClosed => { - DownstairsClientStoppedReason::QueueClosed - } - ClientRunResult::ReceiveTaskCancelled => { - DownstairsClientStoppedReason::ReceiveTaskCancelled - } - }; - - // Spawn a task so we don't block the main loop talking to - // Nexus. - let target_addrs = self.get_target_addrs(); - let client = self.reqwest_client.clone(); - tokio::spawn(async move { - let Some(nexus_client) = - get_nexus_client(&log, client, &target_addrs).await - else { - // Exit if no Nexus client returned from DNS - our notification - // is best effort. + if let Some(notify) = &self.notify { + let Some(downstairs_id) = self.clients[client_id].id() else { return; }; - - match omicron_common::retry_until_known_result(&log, || async { - nexus_client - .cpapi_downstairs_client_stopped( - &upstairs_id, - &downstairs_id, - &DownstairsClientStopped { time: now, reason }, - ) - .await - }) - .await - { - Ok(_) => { - info!(log, "notified Nexus of client stopped"); - } - - Err(e) => { - error!( - log, - "failed to notify Nexus of client stopped: {e}" - ); - } - } - }); + notify.send(NotifyRequest::ClientTaskStopped { + upstairs_id: self.cfg.upstairs_id, + downstairs_id, + reason, + }); + } } pub(crate) fn set_ddef(&mut self, ddef: RegionDefinition) { diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index f6cf449a9..4bc371e7a 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -82,6 +82,8 @@ use active_jobs::ActiveJobs; use async_trait::async_trait; +pub(crate) mod notify; + mod client; mod downstairs; mod upstairs; @@ -1622,63 +1624,3 @@ pub fn up_main( Ok(join_handle) } - -/// Gets a Nexus client based on any IPv6 address -#[cfg(feature = "notify-nexus")] -pub(crate) async fn get_nexus_client( - log: &Logger, - client: reqwest::Client, - target_addrs: &[SocketAddr], -) -> Option { - use internal_dns::resolver::Resolver; - use internal_dns::ServiceName; - use std::net::Ipv6Addr; - - // Use any rack internal address for `Resolver::new_from_ip`, as that will - // use the AZ_PREFIX to find internal DNS servers. - let mut addr: Option = None; - - for target_addr in target_addrs { - match &target_addr { - SocketAddr::V6(target_addr) => { - addr = Some(*target_addr.ip()); - break; - } - - SocketAddr::V4(_) => { - // This branch is seen if compiling with the `notify-nexus` - // feature but deploying in an ipv4 environment, usually during - // development. `Resolver::new_from_ip` only accepts IPv6 - // addresses, so we can't use it to look up an address for the - // Nexus client. - } - } - } - - let Some(addr) = addr else { - return None; - }; - - let resolver = match Resolver::new_from_ip(log.clone(), addr) { - Ok(resolver) => resolver, - Err(e) => { - error!(log, "could not make resolver: {e}"); - return None; - } - }; - - let nexus_address = - match resolver.lookup_socket_v6(ServiceName::Nexus).await { - Ok(addr) => addr, - Err(e) => { - error!(log, "lookup Nexus address failed: {e}"); - return None; - } - }; - - Some(nexus_client::Client::new_with_client( - &format!("http://{}", nexus_address), - client, - log.clone(), - )) -} diff --git a/upstairs/src/notify.rs b/upstairs/src/notify.rs new file mode 100644 index 000000000..916ded694 --- /dev/null +++ b/upstairs/src/notify.rs @@ -0,0 +1,354 @@ +// Copyright 2024 Oxide Computer Company +//! Runs a task to send best-effort notifications to Nexus +//! +//! The queue receives Crucible-flavored types, and converts them to +//! Nexus-flavored types internally. + +use chrono::{DateTime, Utc}; +use slog::{debug, error, info, o, warn, Logger}; +use std::net::{Ipv6Addr, SocketAddr}; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::client::ClientRunResult; +use nexus_client::types::{ + DownstairsClientStopped, DownstairsClientStoppedReason, + DownstairsUnderRepair, RepairFinishInfo, RepairProgress, RepairStartInfo, + UpstairsRepairType, +}; +use omicron_uuid_kinds::{GenericUuid, TypedUuid}; + +#[derive(Debug)] +pub(crate) enum NotifyRequest { + ClientTaskStopped { + upstairs_id: Uuid, + downstairs_id: Uuid, + reason: ClientRunResult, + }, + LiveRepairStart { + upstairs_id: Uuid, + repair_id: Uuid, + session_id: Uuid, + repairs: Vec<(Uuid, SocketAddr)>, + }, + LiveRepairProgress { + upstairs_id: Uuid, + repair_id: Uuid, + current_item: i64, + total_items: i64, + }, + LiveRepairFinish { + upstairs_id: Uuid, + repair_id: Uuid, + session_id: Uuid, + repairs: Vec<(Uuid, SocketAddr)>, + aborted: bool, + }, + ReconcileStart { + upstairs_id: Uuid, + repair_id: Uuid, + session_id: Uuid, + repairs: Vec<(Uuid, SocketAddr)>, + }, + ReconcileProgress { + upstairs_id: Uuid, + repair_id: Uuid, + current_item: i64, + total_items: i64, + }, + ReconcileFinish { + upstairs_id: Uuid, + repair_id: Uuid, + session_id: Uuid, + repairs: Vec<(Uuid, SocketAddr)>, + aborted: bool, + }, +} + +pub(crate) struct NotifyQueue { + tx: mpsc::Sender<(DateTime, NotifyRequest)>, + log: Logger, +} + +impl NotifyQueue { + /// Insert a time-stamped request into the queue + pub fn send(&self, r: NotifyRequest) { + let now = Utc::now(); + if let Err(r) = self.tx.try_send((now, r)) { + warn!(self.log, "could not send notify {r:?}; queue is full"); + } + } +} + +pub(crate) fn spawn_notify_task(addr: Ipv6Addr, log: &Logger) -> NotifyQueue { + let (tx, rx) = mpsc::channel(128); + let task_log = log.new(slog::o!("job" => "notify")); + tokio::spawn(async move { notify_task_nexus(addr, rx, task_log).await }); + NotifyQueue { + tx, + log: log.new(o!("job" => "notify_queue")), + } +} + +async fn notify_task_nexus( + addr: Ipv6Addr, + mut rx: mpsc::Receiver<(DateTime, NotifyRequest)>, + log: Logger, +) { + let reqwest_client = reqwest::ClientBuilder::new() + .connect_timeout(std::time::Duration::from_secs(15)) + .timeout(std::time::Duration::from_secs(15)) + .build() + .unwrap(); + while let Some((time, m)) = rx.recv().await { + debug!(log, "notify {m:?}"); + let client = reqwest_client.clone(); + let Some(nexus_client) = get_nexus_client(&log, client, addr).await + else { + // Skip if no Nexus client returned from DNS; our notification is + // best effort. + warn!( + log, + "could not find nexus client; \ + dropping nexus notification {m:?}" + ); + continue; + }; + let (r, s) = match m { + NotifyRequest::ClientTaskStopped { + upstairs_id, + downstairs_id, + reason, + } => { + let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id); + let downstairs_id = TypedUuid::from_untyped_uuid(downstairs_id); + let reason = match reason { + ClientRunResult::ConnectionTimeout => { + DownstairsClientStoppedReason::ConnectionTimeout + } + ClientRunResult::ConnectionFailed(_) => { + // skip this notification, it's too noisy during connection + // retries + //DownstairsClientStoppedReason::ConnectionFailed + continue; + } + ClientRunResult::Timeout => { + DownstairsClientStoppedReason::Timeout + } + ClientRunResult::WriteFailed(_) => { + DownstairsClientStoppedReason::WriteFailed + } + ClientRunResult::ReadFailed(_) => { + DownstairsClientStoppedReason::ReadFailed + } + ClientRunResult::RequestedStop => { + // skip this notification, it fires for *every* Upstairs + // deactivation + //DownstairsClientStoppedReason::RequestedStop + continue; + } + ClientRunResult::Finished => { + DownstairsClientStoppedReason::Finished + } + ClientRunResult::QueueClosed => { + DownstairsClientStoppedReason::QueueClosed + } + ClientRunResult::ReceiveTaskCancelled => { + DownstairsClientStoppedReason::ReceiveTaskCancelled + } + }; + + ( + omicron_common::retry_until_known_result(&log, || async { + nexus_client + .cpapi_downstairs_client_stopped( + &upstairs_id, + &downstairs_id, + &DownstairsClientStopped { time, reason }, + ) + .await + }) + .await, + "client stopped", + ) + } + NotifyRequest::LiveRepairStart { + upstairs_id, + repair_id, + session_id, + ref repairs, + } + | NotifyRequest::ReconcileStart { + upstairs_id, + repair_id, + session_id, + ref repairs, + } => { + let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id); + let (description, repair_type) = + if matches!(m, NotifyRequest::LiveRepairStart { .. }) { + ("live repair start", UpstairsRepairType::Live) + } else { + ("reconcile start", UpstairsRepairType::Reconciliation) + }; + let info = RepairStartInfo { + time, + repair_id: TypedUuid::from_untyped_uuid(repair_id), + repair_type, + session_id: TypedUuid::from_untyped_uuid(session_id), + repairs: repairs + .iter() + .map(|(region_uuid, target_addr)| { + DownstairsUnderRepair { + region_uuid: (*region_uuid).into(), + target_addr: target_addr.to_string(), + } + }) + .collect(), + }; + + ( + omicron_common::retry_until_known_result(&log, || async { + nexus_client + .cpapi_upstairs_repair_start(&upstairs_id, &info) + .await + }) + .await, + description, + ) + } + NotifyRequest::LiveRepairProgress { + upstairs_id, + repair_id, + current_item, + total_items, + } + | NotifyRequest::ReconcileProgress { + upstairs_id, + repair_id, + current_item, + total_items, + } => { + let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id); + let repair_id = TypedUuid::from_untyped_uuid(repair_id); + let description = + if matches!(m, NotifyRequest::LiveRepairProgress { .. }) { + "live repair progress" + } else { + "reconcile progress" + }; + + ( + omicron_common::retry_until_known_result(&log, || async { + nexus_client + .cpapi_upstairs_repair_progress( + &upstairs_id, + &repair_id, + &RepairProgress { + current_item, + total_items, + time, + }, + ) + .await + }) + .await, + description, + ) + } + NotifyRequest::LiveRepairFinish { + upstairs_id, + repair_id, + session_id, + aborted, + ref repairs, + } + | NotifyRequest::ReconcileFinish { + upstairs_id, + repair_id, + session_id, + aborted, + ref repairs, + } => { + let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id); + let (description, repair_type) = + if matches!(m, NotifyRequest::LiveRepairFinish { .. }) { + ("live repair finish", UpstairsRepairType::Live) + } else { + ("reconcile finish", UpstairsRepairType::Reconciliation) + }; + let info = RepairFinishInfo { + time, + repair_id: TypedUuid::from_untyped_uuid(repair_id), + repair_type, + session_id: TypedUuid::from_untyped_uuid(session_id), + repairs: repairs + .iter() + .map(|(region_uuid, target_addr)| { + DownstairsUnderRepair { + region_uuid: (*region_uuid).into(), + target_addr: target_addr.to_string(), + } + }) + .collect(), + aborted, + }; + + ( + omicron_common::retry_until_known_result(&log, || async { + nexus_client + .cpapi_upstairs_repair_finish(&upstairs_id, &info) + .await + }) + .await, + description, + ) + } + }; + match r { + Ok(_) => { + info!(log, "notified Nexus of {s}"); + } + + Err(e) => { + error!(log, "failed to notify Nexus of {s}: {e}"); + } + } + } + info!(log, "notify_task exiting"); +} + +/// Gets a Nexus client based on any rack-internal IPv6 address +pub(crate) async fn get_nexus_client( + log: &Logger, + client: reqwest::Client, + addr: Ipv6Addr, +) -> Option { + use internal_dns::resolver::Resolver; + use internal_dns::ServiceName; + + // Use any rack internal address for `Resolver::new_from_ip`, as that will + // use the AZ_PREFIX to find internal DNS servers. + let resolver = match Resolver::new_from_ip(log.clone(), addr) { + Ok(resolver) => resolver, + Err(e) => { + error!(log, "could not make resolver: {e}"); + return None; + } + }; + + let nexus_address = + match resolver.lookup_socket_v6(ServiceName::Nexus).await { + Ok(addr) => addr, + Err(e) => { + error!(log, "lookup Nexus address failed: {e}"); + return None; + } + }; + + Some(nexus_client::Client::new_with_client( + &format!("http://{}", nexus_address), + client, + log.clone(), + )) +} diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 01d2c11c6..ff4d7555b 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -387,12 +387,32 @@ impl Upstairs { }); info!(log, "Crucible stats registered with UUID: {}", uuid); + + // Use one of the Downstairs addresses for the Nexus notify task, since + // we just need a rack-internal IPv6 address. Otherwise, we don't do + // any notifications (passing `notify: None`). + let ipv6_addr = ds_target.iter().find_map(|(_i, a)| match a { + std::net::SocketAddr::V6(addr) => Some(*addr.ip()), + _ => None, + }); + let notify = match ipv6_addr { + Some(addr) if cfg!(feature = "notify-nexus") => { + Some(crate::notify::spawn_notify_task(addr, &log)) + } + None if cfg!(feature = "notify-nexus") => { + warn!(log, "could not find Downstairs address for Nexus"); + None + } + _ => None, + }; + let mut downstairs = Downstairs::new( cfg.clone(), ds_target, tls_context, stats.ds_stats(), guest.io_limits(), + notify, log.new(o!("" => "downstairs")), ); let flush_timeout_secs = opt.flush_timeout.unwrap_or(0.5); @@ -2093,10 +2113,8 @@ impl Upstairs { "downstairs task for {client_id} stopped due to {reason:?}" ); - #[cfg(feature = "notify-nexus")] self.downstairs - .notify_nexus_of_client_task_stopped(client_id, reason); - + .notify_client_task_stopped(client_id, reason); self.downstairs.reinitialize(client_id, &self.state); }