diff --git a/common/src/api/external/mod.rs b/common/src/api/external/mod.rs index 8adb45beb50..8f8c6e51228 100644 --- a/common/src/api/external/mod.rs +++ b/common/src/api/external/mod.rs @@ -748,16 +748,7 @@ impl From for i64 { // store values greater than that as negative values, but surely 2**63 is // enough.) #[derive( - Copy, - Clone, - Debug, - Eq, - Hash, - JsonSchema, - Ord, - PartialEq, - PartialOrd, - Serialize, + Copy, Clone, Debug, Eq, Hash, JsonSchema, Ord, PartialEq, PartialOrd, )] pub struct Generation(u64); @@ -808,6 +799,17 @@ impl<'de> Deserialize<'de> for Generation { } } +// This is the equivalent of applying `#[serde(transparent)]`, but that has a +// side effect of changing the JsonSchema derive to no longer emit a schema. +impl Serialize for Generation { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.0.serialize(serializer) + } +} + impl Display for Generation { fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult { f.write_str(&self.0.to_string()) @@ -863,6 +865,17 @@ impl FromStr for Generation { } } +impl slog::Value for Generation { + fn serialize( + &self, + _rec: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_u64(key, self.0) + } +} + #[derive(Debug, thiserror::Error)] #[error("negative generation number")] pub struct GenerationNegativeError(()); diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 0af00f45648..f12e619bdaf 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -2152,26 +2152,26 @@ fn print_task_support_bundle_collector(details: &serde_json::Value) { fn print_task_tuf_artifact_replication(details: &serde_json::Value) { fn print_counters(counters: TufArtifactReplicationCounters) { const ROWS: &[&str] = &[ + "put config ok:", + "put config err:", "list ok:", "list err:", "put ok:", "put err:", "copy ok:", "copy err:", - "delete ok:", - "delete err:", ]; const WIDTH: usize = const_max_len(ROWS); for (label, value) in ROWS.iter().zip([ + counters.put_config_ok, + counters.put_config_err, counters.list_ok, counters.list_err, counters.put_ok, counters.put_err, counters.copy_ok, counters.copy_err, - counters.delete_ok, - counters.delete_err, ]) { println!(" {label:3}"); } diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 00c8c05b20f..f8109aea90f 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -730,23 +730,23 @@ task: "tuf_artifact_replication" request ringbuf: last run: - list ok: - list err: 0 - put ok: 0 - put err: 0 - copy ok: 0 - copy err: 0 - delete ok: 0 - delete err: 0 + put config ok: + put config err: 0 + list ok: + list err: 0 + put ok: 0 + put err: 0 + copy ok: 0 + copy err: 0 lifetime: - list ok: - list err: 0 - put ok: 0 - put err: 0 - copy ok: 0 - copy err: 0 - delete ok: 0 - delete err: 0 + put config ok: + put config err: 0 + list ok: + list err: 0 + put ok: 0 + put err: 0 + copy ok: 0 + copy err: 0 local repos: 0 task: "v2p_manager" @@ -1221,23 +1221,23 @@ task: "tuf_artifact_replication" request ringbuf: last run: - list ok: - list err: 0 - put ok: 0 - put err: 0 - copy ok: 0 - copy err: 0 - delete ok: 0 - delete err: 0 + put config ok: + put config err: 0 + list ok: + list err: 0 + put ok: 0 + put err: 0 + copy ok: 0 + copy err: 0 lifetime: - list ok: - list err: 0 - put ok: 0 - put err: 0 - copy ok: 0 - copy err: 0 - delete ok: 0 - delete err: 0 + put config ok: + put config err: 0 + list ok: + list err: 0 + put ok: 0 + put err: 0 + copy ok: 0 + copy err: 0 local repos: 0 task: "v2p_manager" diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 148b8eb1452..f05f4f70b8d 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -227,6 +227,7 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { // execution. These redactions work around the issue described in // https://github.com/oxidecomputer/omicron/issues/7417. redactor + .field("put config ok:", r"\d+") .field("list ok:", r"\d+") .section(&["task: \"tuf_artifact_replication\"", "request ringbuf:"]); diff --git a/docs/tuf-artifact-replication.adoc b/docs/tuf-artifact-replication.adoc new file mode 100644 index 00000000000..5a5115643ca --- /dev/null +++ b/docs/tuf-artifact-replication.adoc @@ -0,0 +1,185 @@ +:showtitle: +:numbered: + += TUF Artifact Replication (a.k.a. TUF Repo Depot) + +Our release artifact is a TUF repo consisting of all of the artifacts +the product requires to run. For the update system to work, it needs +access to those artifacts. There are some constraining factors: + +* Nexus is the only way into the system for these artifacts (either + through direct upload from an operator, or a download initiated by + Nexus to a service outside of the system). +* Nexus has no persistent local storage, nor can it directly use the + artifacts (OS and zone images, firmware, etc.) even if it did store + them. +* Sled Agent is generally what will directly use the artifacts (except + for SP and ROT images, which MGS needs), and it can also manage its + own local storage. + +Thus Nexus needs to accept artifacts from outside of the system and +immediately offload them to individual Sled Agents for persistent +storage and later use. + +We have chosen (see <>) the simplest possible implementation: +every Sled Agent stores a copy of every artifact on each of its M.2 +devices. This is storage inefficient but means that a Sled Agent can +directly use those resources to create zones from updated images, +install an updated OS, or manage the installation of updates on other +components, without Nexus having to ensure that it tells a sled to +have an artifact _before_ telling it to use it. A Nexus background task +periodically ensures that all sleds have all artifacts. + +== Sled Agent implementation + +Sled Agent stores artifacts as a content-addressed store on an *update* +dataset on each M.2 device: the file name of each stored artifact is its +SHA-256 hash. + +It also stores an _artifact configuration_ in memory: a list of all +artifact hashes that the sled should store, and a generation number. +The generation number is owned by Nexus, which increments the generation +number when the set of TUF repos on the system changes. Sled Agent +prevents modifying the configuration without an increase in the +generation number. + +Sled Agent offers the following APIs on the underlay network, intended +for Nexus: + +* `artifact_config_get`: Get the current artifact configuration. +* `artifact_config_put`: Put the artifact configuration that should be + in effect. This API is idempotent (putting the same configuration does + not change anything). Modified configurations must also increase the + generation number. +* `artifact_list`: List the artifacts present in the artifact + configuration along with the count of available copies of each + artifact across the *update* datasets. Also includes the current + generation number. +* `artifact_put`: Put the request body into the artifact store. + Rejects the request if the artifact does not belong to the current + configuration. +* `artifact_copy_from_depot`: Sends a request to another Sled Agent (via + the *TUF Repo Depot API*; see below) to fetch an artifact. The base + URL for the source sled is chosen by the requester. This API responds + after a successful HTTP response from the source sled and the copy + proceeds asynchronously. Rejects the request if the artifact does not + belong to the current configuration. + +Sled Agent also spawns another Dropshot API server called the *TUF Repo +Depot API* which offers one API on the underlay network, intended for +other Sled Agents: + +* `artifact_get_by_sha256`: Get the content of an artifact. + +In an asynchronous task called the _delete reconciler_, Sled Agent +periodically scans the *update* datasets for artifacts that are not +part of the present configuration and deletes them. Prior to each +filesystem operation the task checks the configuration for presence of +that artifact hash. The delete reconciler then waits for an artifact +configuration change until running again. + +== Nexus implementation + +Nexus has a `tuf_artifact_replication` background task which runs this +reliable persistent workflow: + +1. Collect the artifact configuration (the list of artifact hashes, and + the current generation number) from the database. +2. Call `artifact_config_put` on all sleds. Stop if any sled rejects the + configuration (our information is already out of date). +3. Call `artifact_list` on all sleds. Stop if any sled informs us of a + different generation number. +4. Delete any local copies of repositories where all artifacts are + sufficiently replicated across sleds. ("Sufficiently replicated" + currently means that at least 3 sleds each have at least one copy.) +5. For any artifacts this Nexus has a local copy of, send `artifact_put` + requests to N random sleds, where N is the number of puts required to + sufficienty replicate the artifact. +6. Send `artifact_copy_from_depot` requests to all remaining sleds + missing copies of an artifact. Nexus chooses the source sled randomly + out of the list of sleds that have a copy of the artifact. + +In each task execution, Nexus will attempt to do all possible work +that leads to every sled having a copy of the artifact. In the absence +of random I/O errors, a repository will be fully replicated across +all sleds in the system in the first execution, and the Nexus-local +copy of the repository will be deleted in the second execution. +`artifact_copy_from_depot` requests that require the presence of an +artifact on a sled that does not yet have it are scheduled after all +`artifact_put` requests complete. + +== Preventing conflicts and loss of artifacts + +The artifact configuration is used to prevent conflicts that may be +caused by two Nexus instances running the `tuf_artifact_replication` +background task simultaneously with different information. The worst +case scenario for a conflict is the total loss of an artifact across the +system, although there are lesser evils as well. This section describes +a number of possible faults and the mitigations taken. + +=== Recently-uploaded repositories and artifact deletion + +When Sled Agent receives an artifact configuration change, the delete +reconciler task begins scanning the *update* datasets for artifacts that +are no longer required and deletes them. + +Nexus maintains its local copy of recently-uploaded repositories +until it confirms (via the `artifact_list` operation) that all of the +artifacts in the repository are sufficiently replicated (currently, at +least 3 sleds each have at least 1 copy). + +If the `artifact_list` operation lists any artifacts that could be +deleted asynchronously, Nexus could incorrectly assume that an artifact +is sufficiently replicated when it is not. This could happen if a +repository is deleted, and another repository containing the same +artifact is uploaded while another Nexus is running the background task. + +The artifact configuration is designed to mitigate this. The +`artifact_list` operation filters the list of artifacts to contain +only artifacts present in the current configuration. The delete +reconciler decides whether to delete a file by re-checking the current +configuration. + +When Nexus receives the `artifact_list` response, it verifies that +the generation number reported is the same as the configuration it put +earlier in the same task execution. Because the response only contains +artifacts belonging to the current configuration, and that list of +artifacts is based on the same configuration Nexus believes is current, +it can trust that none of those artifacts are about to be deleted and +safely delete local copies of sufficiently-replicated artifacts. + +=== Loss of all sleds with the only copy + +There are two potential situations where we could lose the only copy of +an artifact. The first is a Nexus instance crashing or being replaced +before a local artifact can be put to any sleds. Crashes are difficult +to mitigate, as artifacts are currently stored in randomly-named +temporary directories that are non-trivial to recover on startup; +consequently there is no mitigation for this problem today. During +graceful removal of Nexus zones, a quiesced Nexus (see <> and +<>) should remain alive until all local artifacts are +sufficiently replicated. + +The second potential situation is a loss of all sleds that an artifact +is copied to after Nexus deletes its local copy. This is mostly +mitigated by Nexus attempting to fully replicate all artifacts onto +all sleds in every execution of the background task; if there are no +I/O errors, it only takes one task execution to ensure a repository is +present across the entire system. + +=== Unnecessary work + +`artifact_put` and `artifact_copy_from_depot` requests include the +current generation as a query string parameter. If the generation does +not match the current configuration, or the artifact is not present in +the configuration, Sled Agent rejects the request. + +[bibliography] +== References + +* [[[rfd424]]] Oxide Computer Company. + https://rfd.shared.oxide.computer/rfd/424[TUF Repo Depot]. +* [[[rfd459]]] Oxide Computer Company. + https://rfd.shared.oxide.computer/rfd/424[Control plane component lifecycle]. +* [[[omicron5677]]] oxidecomputer/omicron. + https://github.com/oxidecomputer/omicron/issues/5677[nexus 'quiesce' support]. diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 8343d2ff2fc..7617557da87 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1369,6 +1369,13 @@ allow_tables_to_appear_in_same_query!( joinable!(tuf_repo_artifact -> tuf_repo (tuf_repo_id)); joinable!(tuf_repo_artifact -> tuf_artifact (tuf_artifact_id)); +table! { + tuf_generation (singleton) { + singleton -> Bool, + generation -> Int8, + } +} + table! { support_bundle { id -> Uuid, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 79831241d99..c116e474710 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(123, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(124, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(124, "tuf-generation"), KnownVersion::new(123, "vpc-subnet-contention"), KnownVersion::new(122, "tuf-artifact-replication"), KnownVersion::new(121, "dataset-to-crucible-dataset"), diff --git a/nexus/db-queries/src/db/datastore/update.rs b/nexus/db-queries/src/db/datastore/update.rs index 8abddc8ada0..c4f5ac69d26 100644 --- a/nexus/db-queries/src/db/datastore/update.rs +++ b/nexus/db-queries/src/db/datastore/update.rs @@ -19,8 +19,8 @@ use diesel::prelude::*; use diesel::result::Error as DieselError; use nexus_db_model::{ArtifactHash, TufArtifact, TufRepo, TufRepoDescription}; use omicron_common::api::external::{ - self, CreateResult, DataPageParams, ListResultVec, LookupResult, - LookupType, ResourceType, TufRepoInsertStatus, + self, CreateResult, DataPageParams, Error, Generation, ListResultVec, + LookupResult, LookupType, ResourceType, TufRepoInsertStatus, }; use omicron_uuid_kinds::TufRepoKind; use omicron_uuid_kinds::TypedUuid; @@ -159,6 +159,26 @@ impl DataStore { .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + + /// Returns the current TUF repo generation number. + pub async fn update_tuf_generation_get( + &self, + opctx: &OpContext, + ) -> LookupResult { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + + use db::schema::tuf_generation::dsl; + + let generation: i64 = dsl::tuf_generation + .filter(dsl::singleton.eq(true)) + .select(dsl::generation) + .get_result_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + Generation::try_from(generation).map_err(|e| Error::InternalError { + internal_message: e.to_string(), + }) + } } // This is a separate method mostly to make rustfmt not bail out on long lines @@ -292,11 +312,17 @@ async fn insert_impl( "new_artifacts" => ?new_artifacts, ); + // If we are about to insert new artifacts, we need to bump the + // generation number. + if !new_artifacts.is_empty() { + increment_generation(&log, &conn).await?; + } // Insert new artifacts into the database. diesel::insert_into(dsl::tuf_artifact) .values(new_artifacts) .execute_async(&conn) .await?; + all_artifacts }; @@ -330,6 +356,28 @@ async fn insert_impl( }) } +async fn increment_generation( + log: &slog::Logger, + conn: &async_bb8_diesel::Connection, +) -> Result<(), DieselError> { + use db::schema::tuf_generation::dsl; + + // We use `get_result_async` instead of `exeucte_async` to check that we + // updated a single row; since we have it we may as well log it too. + let generation: i64 = + diesel::update(dsl::tuf_generation.filter(dsl::singleton.eq(true))) + .set(dsl::generation.eq(dsl::generation + 1)) + .returning(dsl::generation) + .get_result_async(conn) + .await?; + debug!( + log, + "incrementing generation number"; + "generation" => generation, + ); + Ok(()) +} + #[derive(Clone, Debug)] enum InsertError { /// The SHA256 of the uploaded repository doesn't match the SHA256 of the diff --git a/nexus/src/app/background/tasks/tuf_artifact_replication.rs b/nexus/src/app/background/tasks/tuf_artifact_replication.rs index 3ae62b8d400..00bcab27c59 100644 --- a/nexus/src/app/background/tasks/tuf_artifact_replication.rs +++ b/nexus/src/app/background/tasks/tuf_artifact_replication.rs @@ -4,6 +4,9 @@ //! TUF Repo Depot: Artifact replication across sleds (RFD 424) //! +//! See docs/tuf-artifact-replication.adoc for an architectural overview of the +//! TUF artifact replication system. +//! //! `Nexus::updates_put_repository` accepts a TUF repository, which Nexus //! unpacks, verifies, and reasons about the artifacts in. This uses temporary //! storage within the Nexus zone. After that, the update artifacts have to @@ -21,26 +24,26 @@ //! 1. The task moves `ArtifactsWithPlan` objects off the `mpsc` channel and //! into a `Vec` that represents the set of artifacts stored locally in this //! Nexus zone. -//! 2. The task fetches the list of artifacts from CockroachDB, and queries +//! 2. The task fetches the artifact configuration (list of artifacts and +//! generation number) from CockroachDB. +//! 3. The task puts the artifact configuration to each sled, and queries //! the list of artifacts stored on each sled. Sled artifact storage is //! content-addressed by SHA-256 checksum. Errors querying a sled are //! logged but otherwise ignored: the task proceeds as if that sled has no //! artifacts. (This means that the task will always be trying to replicate //! artifacts to that sled until it comes back or is pulled out of service.) -//! 3. The task builds a directory of all artifacts and where they can be found +//! 4. The task builds a list of all artifacts and where they can be found //! (local `ArtifactsWithPlan` and/or sled agents). -//! 4. If all the artifacts belonging to an `ArtifactsWithPlan` object have +//! 5. If all the artifacts belonging to an `ArtifactsWithPlan` object have //! been replicated to at least `MIN_SLED_REPLICATION` sleds, the task drops //! the object from its `Vec` (thus cleaning up the local storage of those //! files). -//! 5. The task generates a list of requests that need to be sent: +//! 6. The task generates a list of requests that need to be sent: //! - PUT each locally-stored artifact not present on any sleds to //! `MIN_SLED_REPLICATION` random sleds. //! - For each partially-replicated artifact, choose a sled that is missing //! the artifact, and tell it (via `artifact_copy_from_depot`) to fetch the //! artifact from a random sled that has it. -//! - DELETE all artifacts no longer tracked in CockroachDB from all sleds -//! that have that artifact. //! //! # Rate limits //! @@ -53,15 +56,17 @@ //! also rate limit requests per second sent by Nexus, as well as limit the //! number of ongoing copy requests being processed at once by Sled Agent. -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::future::Future; +use std::ops::ControlFlow; use std::str::FromStr; use std::sync::Arc; use anyhow::{Context, Result}; use chrono::Utc; -use futures::future::BoxFuture; -use futures::{FutureExt, Stream, StreamExt}; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesUnordered, Stream, StreamExt}; +use http::StatusCode; use nexus_auth::context::OpContext; use nexus_db_queries::db::{ datastore::SQL_BATCH_SIZE, pagination::Paginator, DataStore, @@ -72,10 +77,12 @@ use nexus_types::internal_api::background::{ TufArtifactReplicationCounters, TufArtifactReplicationOperation, TufArtifactReplicationRequest, TufArtifactReplicationStatus, }; +use omicron_common::api::external::Generation; use omicron_common::update::ArtifactHash; use omicron_uuid_kinds::{GenericUuid, SledUuid}; use rand::seq::SliceRandom; use serde_json::json; +use sled_agent_client::types::ArtifactConfig; use slog_error_chain::InlineErrorChain; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; @@ -111,8 +118,6 @@ struct ArtifactPresence { sleds: BTreeMap, /// Handle to the artifact's local storage if present. local: Option, - /// An artifact is wanted if it is listed in the `tuf_artifact` table. - wanted: bool, } /// Wrapper enum for `ExtractedArtifactDataHandle` so that we don't need to @@ -147,101 +152,80 @@ impl Inventory { ) -> Requests<'a> { let mut requests = Requests::default(); for (hash, presence) in self.0 { - if presence.wanted { - let (sleds_present, mut sleds_not_present) = - sleds.iter().partition::, _>(|sled| { - presence - .sleds - .get(&sled.id) - .copied() - .unwrap_or_default() - > 0 + let (sleds_present, mut sleds_not_present) = + sleds.iter().partition::, _>(|sled| { + presence.sleds.get(&sled.id).copied().unwrap_or_default() + > 0 + }); + sleds_not_present.shuffle(rng); + + // If we have a local copy, PUT the artifact to more sleds until + // we meet `MIN_SLED_REPLICATION`. + let mut sled_puts = Vec::new(); + if let Some(handle) = presence.local { + let count = + min_sled_replication.saturating_sub(sleds_present.len()); + for _ in 0..count { + let Some(sled) = sleds_not_present.pop() else { + break; + }; + requests.put.push(Request::Put { + sled, + handle: handle.clone(), + hash, }); - sleds_not_present.shuffle(rng); - - // If we have a local copy, PUT the artifact to more sleds until - // we meet `MIN_SLED_REPLICATION`. - let mut sled_puts = Vec::new(); - if let Some(handle) = presence.local { - let count = min_sled_replication - .saturating_sub(sleds_present.len()); - for _ in 0..count { - let Some(sled) = sleds_not_present.pop() else { - break; - }; - requests.put.push(Request::Put { - sled, - handle: handle.clone(), - hash, - }); - sled_puts.push(sled); - } + sled_puts.push(sled); } + } - // Tell each remaining sled missing the artifact to fetch it - // from a random sled that has it. - for target_sled in sleds_not_present { - if let Some(source_sled) = - sleds_present.choose(rng).copied() - { - requests.other.push(Request::CopyFromDepot { + // Tell each remaining sled missing the artifact to fetch it + // from a random sled that has it. + for target_sled in sleds_not_present { + if let Some(source_sled) = sleds_present.choose(rng).copied() { + requests.other.push(Request::CopyFromDepot { + target_sled, + source_sled, + hash, + }); + } else { + // There are no sleds that currently have the artifact, + // but we might have PUT requests going out. Choose one + // of the sleds we are PUTting this artifact onto and + // schedule it after all PUT requests are done. + if let Some(source_sled) = sled_puts.choose(rng).copied() { + requests.copy_after_put.push(Request::CopyFromDepot { target_sled, source_sled, hash, }); - } else { - // There are no sleds that currently have the artifact, - // but we might have PUT requests going out. Choose one - // of the sleds we are PUTting this artifact onto and - // schedule it after all PUT requests are done. - if let Some(source_sled) = - sled_puts.choose(rng).copied() - { - requests.copy_after_put.push( - Request::CopyFromDepot { - target_sled, - source_sled, - hash, - }, - ); - } } } + } - // If there are sleds with 0 < n < `EXPECTED_COUNT` copies, tell - // them to also fetch it from a random other sled. - for target_sled in sleds { - let Some(count) = presence.sleds.get(&target_sled.id) + // If there are sleds with 0 < n < `EXPECTED_COUNT` copies, tell + // them to also fetch it from a random other sled. + for target_sled in sleds { + let Some(count) = presence.sleds.get(&target_sled.id) else { + continue; + }; + if *count > 0 && *count < EXPECTED_COUNT { + let Ok(source_sled) = sleds_present + .choose_weighted(rng, |sled| { + if sled.id == target_sled.id { + 0 + } else { + 1 + } + }) + .copied() else { - continue; + break; }; - if *count > 0 && *count < EXPECTED_COUNT { - let Ok(source_sled) = sleds_present - .choose_weighted(rng, |sled| { - if sled.id == target_sled.id { - 0 - } else { - 1 - } - }) - .copied() - else { - break; - }; - requests.recopy.push(Request::CopyFromDepot { - target_sled, - source_sled, - hash, - }) - } - } - } else { - // We don't want this artifact to be stored anymore, so tell all - // sleds that have it to DELETE it. - for sled in sleds { - if presence.sleds.contains_key(&sled.id) { - requests.other.push(Request::Delete { sled, hash }); - } + requests.recopy.push(Request::CopyFromDepot { + target_sled, + source_sled, + hash, + }) } } } @@ -272,6 +256,7 @@ impl<'a> Requests<'a> { fn into_stream( self, log: &'a slog::Logger, + generation: Generation, ) -> impl Stream< Item = impl Future + use<'a>, > + use<'a> { @@ -293,10 +278,12 @@ impl<'a> Requests<'a> { } let put = futures::stream::iter(self.put.into_iter().zip(put_permits)) - .map(|(request, permit)| request.execute(log, Some(permit))); + .map(move |(request, permit)| { + request.execute(log, generation, Some(permit)) + }); let other = futures::stream::iter(self.other.into_iter().chain(self.recopy)) - .map(|request| request.execute(log, None)); + .map(move |request| request.execute(log, generation, None)); let copy_after_put = async move { // There's an awkward mix of usize and u32 in the `Semaphore` @@ -316,7 +303,7 @@ impl<'a> Requests<'a> { Vec::new() }; futures::stream::iter(iter) - .map(|request| request.execute(log, None)) + .map(move |request| request.execute(log, generation, None)) } .flatten_stream(); @@ -336,23 +323,24 @@ enum Request<'a> { source_sled: &'a Sled, hash: ArtifactHash, }, - Delete { - sled: &'a Sled, - hash: ArtifactHash, - }, } impl Request<'_> { async fn execute( self, log: &slog::Logger, + generation: Generation, _permit: Option, ) -> TufArtifactReplicationRequest { let err: Option> = async { match &self { Request::Put { sled, handle, hash } => { sled.client - .artifact_put(&hash.to_string(), handle.file().await?) + .artifact_put( + &hash.to_string(), + &generation, + handle.file().await?, + ) .await?; } Request::CopyFromDepot { target_sled, source_sled, hash } => { @@ -360,15 +348,13 @@ impl Request<'_> { .client .artifact_copy_from_depot( &hash.to_string(), + &generation, &sled_agent_client::types::ArtifactCopyFromDepotBody { depot_base_url: source_sled.depot_base_url.clone(), }, ) .await?; } - Request::Delete { sled, hash } => { - sled.client.artifact_delete(&hash.to_string()).await?; - } }; Ok(()) } @@ -378,8 +364,9 @@ impl Request<'_> { let time = Utc::now(); let (target_sled, hash) = match &self { Request::Put { sled, hash, .. } - | Request::CopyFromDepot { target_sled: sled, hash, .. } - | Request::Delete { sled, hash } => (sled, hash), + | Request::CopyFromDepot { target_sled: sled, hash, .. } => { + (sled, hash) + } }; let msg = match (&self, err.is_some()) { (Request::Put { .. }, true) => "Failed to put artifact", @@ -390,8 +377,6 @@ impl Request<'_> { (Request::CopyFromDepot { .. }, false) => { "Successfully requested artifact copy from depot" } - (Request::Delete { .. }, true) => "Failed to delete artifact", - (Request::Delete { .. }, false) => "Successfully deleted artifact", }; if let Some(ref err) = err { slog::warn!( @@ -423,9 +408,6 @@ impl Request<'_> { source_sled: source_sled.id, } } - Request::Delete { hash, .. } => { - TufArtifactReplicationOperation::Delete { hash } - } }, error: err.map(|err| err.to_string()), } @@ -461,6 +443,8 @@ impl BackgroundTask for ArtifactReplication { } } + // List sleds and artifacts from the database. These are the only + // parts of this task that can return a failure early. let sleds = match self .list_sleds(opctx) .await @@ -469,8 +453,7 @@ impl BackgroundTask for ArtifactReplication { Ok(sleds) => sleds, Err(err) => return json!({"error": format!("{err:#}")}), }; - let mut counters = TufArtifactReplicationCounters::default(); - let mut inventory = match self + let (config, inventory) = match self .list_artifacts_from_database(opctx) .await .context("failed to list artifacts from database") @@ -478,29 +461,87 @@ impl BackgroundTask for ArtifactReplication { Ok(inventory) => inventory, Err(err) => return json!({"error": format!("{err:#}")}), }; - self.list_artifacts_on_sleds( - opctx, - &sleds, - &mut inventory, - &mut counters, - ) - .await; - self.list_and_clean_up_local_artifacts(&mut inventory); - - let requests = inventory.into_requests( - &sleds, - &mut rand::thread_rng(), - self.min_sled_replication, - ); - let completed = requests - .into_stream(&opctx.log) - .buffer_unordered(MAX_REQUEST_CONCURRENCY) - .collect::>() + + // Create a channel to receive request ringbuf entries. + let (ringbuf_tx_owned, mut rx) = + mpsc::channel(MAX_REQUEST_CONCURRENCY); + let ringbuf_tx = &ringbuf_tx_owned; + let log_task_handle = tokio::task::spawn(async move { + let mut request_log = BTreeSet::new(); + let mut counters = TufArtifactReplicationCounters::default(); + while let Some(entry) = rx.recv().await { + counters.inc(&entry); + request_log.insert(entry); + } + (request_log, counters) + }); + + // Put the artifact configuration to and list the artifacts present + // on each sled. + let log = &opctx.log; + let config = &config; + let inventory = sleds + .iter() + .map(|sled| async move { + ( + sled, + Self::sled_put_config_and_list( + log, sled, config, ringbuf_tx, + ) + .await, + ) + }) + .collect::>() + .fold( + ControlFlow::Continue(inventory), + |inventory, (sled, result)| async { + let mut inventory = inventory?; + for (hash, count) in result? { + if let Some(entry) = inventory.0.get_mut(&hash) { + entry.sleds.insert(sled.id, count); + } + } + ControlFlow::Continue(inventory) + }, + ) .await; - self.insert_debug_requests(completed, &mut counters); + if let ControlFlow::Continue(mut inventory) = inventory { + self.list_and_clean_up_local_artifacts( + &opctx.log, + &mut inventory, + ); + let requests = inventory.into_requests( + &sleds, + &mut rand::thread_rng(), + self.min_sled_replication, + ); + requests + .into_stream(&opctx.log, config.generation) + .buffer_unordered(MAX_REQUEST_CONCURRENCY) + .for_each(|log_entry| async { + ringbuf_tx.send(log_entry).await.ok(); + }) + .await; + } + // Our work is done; prepare the status message. + drop(ringbuf_tx_owned); + let (request_log, counters) = log_task_handle.await.unwrap(); + { + // `Arc::make_mut` will either directly provide a mutable + // reference if there are no other references, or clone it if + // there are. At this point there should never be any other + // references; we only clone this Arc a few lines below when + // serializing the ringbuf to a `serde_json::Value`. + let ringbuf = Arc::make_mut(&mut self.request_debug_ringbuf); + let to_delete = (ringbuf.len() + request_log.len()) + .saturating_sub(MAX_REQUEST_DEBUG_BUFFER_LEN); + ringbuf.drain(0..to_delete); + ringbuf.extend(request_log); + } self.lifetime_counters += counters; serde_json::to_value(TufArtifactReplicationStatus { + generation: config.generation, last_run_counters: counters, lifetime_counters: self.lifetime_counters, request_debug_ringbuf: self.request_debug_ringbuf.clone(), @@ -528,28 +569,6 @@ impl ArtifactReplication { } } - fn insert_debug_requests( - &mut self, - mut requests: Vec, - counters: &mut TufArtifactReplicationCounters, - ) { - for request in &requests { - counters.inc(request); - } - - // `Arc::make_mut` will either directly provide a mutable reference - // if there are no other references, or clone it if there are. At this - // point there should never be any other references; we only clone this - // Arc when serializing the ringbuf to a `serde_json::Value`. - let ringbuf = Arc::make_mut(&mut self.request_debug_ringbuf); - let to_delete = (ringbuf.len() + requests.len()) - .saturating_sub(MAX_REQUEST_DEBUG_BUFFER_LEN); - ringbuf.drain(0..to_delete); - - requests.sort(); - ringbuf.extend(requests); - } - async fn list_sleds(&self, opctx: &OpContext) -> Result> { Ok(self .datastore @@ -574,7 +593,9 @@ impl ArtifactReplication { async fn list_artifacts_from_database( &self, opctx: &OpContext, - ) -> Result { + ) -> Result<(ArtifactConfig, Inventory)> { + let generation = + self.datastore.update_tuf_generation_get(opctx).await?; let mut inventory = Inventory::default(); let mut paginator = Paginator::new(SQL_BATCH_SIZE); while let Some(p) = paginator.next() { @@ -585,93 +606,151 @@ impl ArtifactReplication { paginator = p.found_batch(&batch, &|a| a.id.into_untyped_uuid()); for artifact in batch { inventory.0.entry(artifact.sha256.0).or_insert_with(|| { - ArtifactPresence { - sleds: BTreeMap::new(), - local: None, - wanted: true, - } + ArtifactPresence { sleds: BTreeMap::new(), local: None } }); } } - Ok(inventory) + let config = ArtifactConfig { + generation, + artifacts: inventory.0.keys().map(|h| h.to_string()).collect(), + }; + Ok((config, inventory)) } - /// Ask all sled agents to list the artifacts they have, and mark those - /// artifacts as present on those sleds. - async fn list_artifacts_on_sleds( - &mut self, - opctx: &OpContext, - sleds: &[Sled], - inventory: &mut Inventory, - counters: &mut TufArtifactReplicationCounters, - ) { - let responses = - futures::future::join_all(sleds.iter().map(|sled| async move { - let response = sled.client.artifact_list().await; - (sled, Utc::now(), response) - })) - .await; - let mut requests = Vec::new(); - for (sled, time, response) in responses { - let mut error = None; - match response { - Ok(response) => { + /// Put `config` to `sled`, then query `sled` for a list of its artifacts. + /// + /// Returns [`ControlFlow::Break`] if `config` was rejected by the sled due + /// to an invalid generation number, or if the list response contained a + /// different generation. + async fn sled_put_config_and_list( + log: &slog::Logger, + sled: &Sled, + config: &ArtifactConfig, + ringbuf_tx: &mpsc::Sender, + ) -> ControlFlow<(), BTreeMap> { + let response = sled.client.artifact_config_put(config).await; + ringbuf_tx + .send(TufArtifactReplicationRequest { + time: Utc::now(), + target_sled: sled.id, + operation: TufArtifactReplicationOperation::PutConfig { + generation: config.generation, + }, + error: response.as_ref().err().map(|err| { + error!( + log, + "Failed to put artifact config"; + "error" => InlineErrorChain::new(err), + "sled" => sled.client.baseurl(), + "generation" => &config.generation, + ); + err.to_string() + }), + }) + .await + .ok(); + // Bail without sending a list request if the config put failed. + if let Err(err) = response { + // If the request failed because the sled told us the + // generation was invalid, return `Break`. + if let sled_agent_client::Error::ErrorResponse(response) = err { + if response.status() == StatusCode::CONFLICT + && response.error_code.as_deref() + == Some("CONFIG_GENERATION") + { + return ControlFlow::Break(()); + } + } + return ControlFlow::Continue(BTreeMap::new()); + } + + let response = sled.client.artifact_list().await; + let time = Utc::now(); + let (result, error) = match response { + Ok(response) => { + let response = response.into_inner(); + if response.generation == config.generation { info!( - &opctx.log, + log, "Successfully got artifact list"; "sled" => sled.client.baseurl(), ); - for (hash, count) in response.into_inner() { - let Ok(hash) = ArtifactHash::from_str(&hash) else { - error = Some(format!( - "sled reported bogus artifact hash {hash:?}" - )); + match response + .list + .into_iter() + .map(|(hash, count)| { + match ArtifactHash::from_str(&hash) { + Ok(hash) => Ok((hash, count)), + Err(_) => Err(hash), + } + }) + .collect() + { + Ok(list) => (ControlFlow::Continue(list), None), + Err(bogus_hash) => { error!( - &opctx.log, - "Failed to get artifact list: \ - sled reported bogus artifact hash"; + log, + "Sled reported bogus artifact hash"; "sled" => sled.client.baseurl(), - "bogus_hash" => hash, + "bogus_hash" => &bogus_hash, ); - continue; - }; - let entry = - inventory.0.entry(hash).or_insert_with(|| { - ArtifactPresence { - sleds: BTreeMap::new(), - local: None, - // If we're inserting, this artifact wasn't - // listed in the database. - wanted: false, - } - }); - entry.sleds.insert(sled.id, count); + ( + ControlFlow::Continue(BTreeMap::new()), + Some(format!( + "sled reported bogus artifact hash \ + {bogus_hash:?}" + )), + ) + } } - } - Err(err) => { - warn!( - &opctx.log, - "Failed to get artifact list"; - "error" => InlineErrorChain::new(&err), + } else { + error!( + log, + "Failed to get artifact list: \ + sled reported different generation number"; "sled" => sled.client.baseurl(), + "sled_generation" => response.generation, + "config_generation" => config.generation, ); - error = Some(err.to_string()); + ( + ControlFlow::Break(()), + Some(format!( + "sled reported generation {}, expected {}", + response.generation, config.generation + )), + ) } - }; - requests.push(TufArtifactReplicationRequest { + } + Err(err) => { + error!( + log, + "Failed to get artifact list"; + "error" => InlineErrorChain::new(&err), + "sled" => sled.client.baseurl(), + ); + (ControlFlow::Continue(BTreeMap::new()), Some(err.to_string())) + } + }; + ringbuf_tx + .send(TufArtifactReplicationRequest { time, target_sled: sled.id, operation: TufArtifactReplicationOperation::List, error, - }); - } - self.insert_debug_requests(requests, counters); + }) + .await + .ok(); + result } /// Fill in the `local` field on the values of `inventory` with any local /// artifacts, while removing the locally-stored `ArtifactsWithPlan` objects /// once they reach the minimum requirement to be considered replicated. - fn list_and_clean_up_local_artifacts(&mut self, inventory: &mut Inventory) { + fn list_and_clean_up_local_artifacts( + &mut self, + log: &slog::Logger, + inventory: &mut Inventory, + ) { self.local.retain(|plan| { let mut keep_plan = false; for hash_id in plan.by_id().values().flatten() { @@ -685,6 +764,14 @@ impl ArtifactReplication { } } } + if !keep_plan { + let version = &plan.description().repo.system_version; + info!( + log, + "Cleaning up local repository"; + "repo_system_version" => version.to_string(), + ); + } keep_plan }) } @@ -743,9 +830,6 @@ mod tests { source_sled.id, target_sled.id, ) } - Request::Delete { sled, hash } => { - writeln!(s, "- DELETE {hash}\n from {}", sled.id) - } } .unwrap(); } @@ -792,15 +876,11 @@ mod tests { "request in `recopy` is not `CopyFromDepot`: {request:?}" ); } - // Everything in `other` should be `Copy` or `Delete`. + // Everything in `other` should be `Copy`. for request in &requests.other { assert!( - matches!( - request, - Request::CopyFromDepot { .. } | Request::Delete { .. } - ), - "request in `other` is not `CopyFromDepot` or `Delete`: \ - {request:?}" + matches!(request, Request::CopyFromDepot { .. }), + "request in `other` is not `CopyFromDepot`: {request:?}" ); } } @@ -817,7 +897,6 @@ mod tests { ArtifactPresence { sleds: BTreeMap::new(), local: Some(ArtifactHandle::Fake), - wanted: true, }, ); } @@ -851,11 +930,7 @@ mod tests { for _ in 0..10 { inventory.insert( ArtifactHash(rng.gen()), - ArtifactPresence { - sleds: sled_presence.clone(), - local: None, - wanted: true, - }, + ArtifactPresence { sleds: sled_presence.clone(), local: None }, ); } let requests = Inventory(inventory).into_requests( @@ -874,43 +949,6 @@ mod tests { ); } - #[test] - fn delete() { - // 4 sleds have an artifact we don't want anymore. - let mut rng = StdRng::from_seed(Default::default()); - let sleds = fake_sleds(4, &mut rng); - let mut inventory = BTreeMap::new(); - inventory.insert( - ArtifactHash(rng.gen()), - ArtifactPresence { - sleds: sleds.iter().map(|sled| (sled.id, 2)).collect(), - local: None, - wanted: false, - }, - ); - let requests = Inventory(inventory).into_requests( - &sleds, - &mut rng, - MIN_SLED_REPLICATION, - ); - check_consistency(&requests); - assert_eq!(requests.put.len(), 0); - assert_eq!(requests.copy_after_put.len(), 0); - assert_eq!(requests.recopy.len(), 0); - assert_eq!(requests.other.len(), 4); - assert!( - requests - .other - .iter() - .all(|request| matches!(request, Request::Delete { .. })), - "not all requests are deletes" - ); - assert_contents( - "tests/tuf-replication/delete.txt", - &requests_to_string(&requests), - ); - } - #[test] fn recopy() { // 3 sleds have two copies of an artifact; 1 has a single copy. @@ -926,7 +964,6 @@ mod tests { .map(|(i, sled)| (sled.id, if i == 0 { 1 } else { 2 })) .collect(), local: None, - wanted: true, }, ); let requests = Inventory(inventory).into_requests( @@ -956,7 +993,6 @@ mod tests { ArtifactPresence { sleds: sleds.iter().map(|sled| (sled.id, 2)).collect(), local: None, - wanted: true, }, ); let requests = Inventory(inventory).into_requests( diff --git a/nexus/tests/integration_tests/updates.rs b/nexus/tests/integration_tests/updates.rs index 37821a1b881..2a4c6c12f20 100644 --- a/nexus/tests/integration_tests/updates.rs +++ b/nexus/tests/integration_tests/updates.rs @@ -14,6 +14,7 @@ use clap::Parser; use dropshot::test_util::LogContext; use http::{Method, StatusCode}; use nexus_config::UpdatesConfig; +use nexus_db_queries::context::OpContext; use nexus_test_utils::background::run_tuf_artifact_replication_step; use nexus_test_utils::background::wait_tuf_artifact_replication_step; use nexus_test_utils::http_testing::{AuthnMode, NexusRequest, RequestBuilder}; @@ -65,7 +66,7 @@ async fn test_repo_upload_unconfigured() -> Result<()> { let status = run_tuf_artifact_replication_step(&cptestctx.internal_client).await; assert_eq!( - status.last_run_counters.sum() - status.last_run_counters.list_ok, + status.last_run_counters.put_ok + status.last_run_counters.copy_ok, 0 ); assert_eq!(status.local_repos, 0); @@ -108,6 +109,15 @@ async fn test_repo_upload() -> Result<()> { .await; let client = &cptestctx.external_client; + // The initial generation number should be 1. + let datastore = cptestctx.server.server_context().nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + assert_eq!( + datastore.update_tuf_generation_get(&opctx).await.unwrap(), + 1u32.into() + ); + // Build a fake TUF repo let archive_path = make_archive(&logctx.log).await?; @@ -152,16 +162,22 @@ async fn test_repo_upload() -> Result<()> { .iter() .any(|artifact| artifact.id.kind == KnownArtifactKind::ControlPlane.into())); + // The generation number should now be 2. + assert_eq!( + datastore.update_tuf_generation_get(&opctx).await.unwrap(), + 2u32.into() + ); // The artifact replication background task should have been activated, and // we should see a local repo and successful PUTs. let status = wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; eprintln!("{status:?}"); + assert_eq!(status.generation, 2u32.into()); + assert_eq!(status.last_run_counters.put_config_ok, 4); assert_eq!(status.last_run_counters.list_ok, 4); assert_eq!(status.last_run_counters.put_ok, 3 * unique_sha256_count); assert_eq!(status.last_run_counters.copy_ok, unique_sha256_count); - assert_eq!(status.last_run_counters.delete_ok, 0); // The local repo is not deleted until the next task run. assert_eq!(status.local_repos, 1); @@ -170,8 +186,9 @@ async fn test_repo_upload() -> Result<()> { let status = run_tuf_artifact_replication_step(&cptestctx.internal_client).await; eprintln!("{status:?}"); + assert_eq!(status.last_run_counters.put_config_ok, 4); assert_eq!(status.last_run_counters.list_ok, 4); - assert_eq!(status.last_run_counters.sum(), 4); + assert_eq!(status.last_run_counters.sum(), 8); assert_eq!(status.local_repos, 0); // Upload the repository to Nexus again. This should return a 200 with an @@ -198,6 +215,12 @@ async fn test_repo_upload() -> Result<()> { "initial description matches reupload" ); + // We didn't insert a new repo, so the generation number should still be 2. + assert_eq!( + datastore.update_tuf_generation_get(&opctx).await.unwrap(), + 2u32.into() + ); + // Now get the repository that was just uploaded. let mut get_description = { let response = make_get_request( @@ -292,16 +315,20 @@ async fn test_repo_upload() -> Result<()> { .context("error deserializing response body")?; assert_eq!(response.status, TufRepoInsertStatus::Inserted); } - - // No artifacts changed, so the task should have nothing to do and should + // No artifacts changed, so the generation number should still be 2... + assert_eq!( + datastore.update_tuf_generation_get(&opctx).await.unwrap(), + 2u32.into() + ); + // ... and the task should have nothing to do and should immediately // delete the local artifacts. let status = wait_tuf_artifact_replication_step(&cptestctx.internal_client).await; eprintln!("{status:?}"); + assert_eq!(status.generation, 2u32.into()); assert_eq!(status.last_run_counters.list_ok, 4); assert_eq!(status.last_run_counters.put_ok, 0); assert_eq!(status.last_run_counters.copy_ok, 0); - assert_eq!(status.last_run_counters.delete_ok, 0); assert_eq!(status.local_repos, 0); cptestctx.teardown().await; diff --git a/nexus/tests/tuf-replication/delete.txt b/nexus/tests/tuf-replication/delete.txt deleted file mode 100644 index 375179de219..00000000000 --- a/nexus/tests/tuf-replication/delete.txt +++ /dev/null @@ -1,8 +0,0 @@ -- DELETE f44b581f23222c10916b17a369b4da039d075952b58036f2a7b561446592403c - from 0b20868c-c619-454c-82a1-c61be9902717 -- DELETE f44b581f23222c10916b17a369b4da039d075952b58036f2a7b561446592403c - from b24b8ec1-68fe-4896-b5b7-0ccb10f1a705 -- DELETE f44b581f23222c10916b17a369b4da039d075952b58036f2a7b561446592403c - from e061d0fd-fa18-4618-928b-0d44efef92cb -- DELETE f44b581f23222c10916b17a369b4da039d075952b58036f2a7b561446592403c - from 9b07815f-0449-4e2e-85d2-2cac3aa06141 diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index 80b42bcd412..a354a75ad09 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -4,6 +4,7 @@ use chrono::DateTime; use chrono::Utc; +use omicron_common::api::external::Generation; use omicron_common::update::ArtifactHash; use omicron_uuid_kinds::BlueprintUuid; use omicron_uuid_kinds::CollectionUuid; @@ -243,6 +244,7 @@ impl SupportBundleCollectionReport { /// The status of a `tuf_artifact_replication` background task activation #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct TufArtifactReplicationStatus { + pub generation: Generation, pub last_run_counters: TufArtifactReplicationCounters, pub lifetime_counters: TufArtifactReplicationCounters, pub request_debug_ringbuf: Arc>, @@ -254,7 +256,6 @@ impl TufArtifactReplicationStatus { self.last_run_counters.list_err == 0 && self.last_run_counters.put_err == 0 && self.last_run_counters.copy_err == 0 - && self.last_run_counters.delete_err == 0 } } @@ -269,19 +270,25 @@ impl TufArtifactReplicationStatus { derive_more::AddAssign, )] pub struct TufArtifactReplicationCounters { + pub put_config_ok: usize, + pub put_config_err: usize, pub list_ok: usize, pub list_err: usize, pub put_ok: usize, pub put_err: usize, pub copy_ok: usize, pub copy_err: usize, - pub delete_ok: usize, - pub delete_err: usize, } impl TufArtifactReplicationCounters { pub fn inc(&mut self, request: &TufArtifactReplicationRequest) { match (&request.operation, &request.error) { + (TufArtifactReplicationOperation::PutConfig { .. }, Some(_)) => { + self.put_config_err += 1 + } + (TufArtifactReplicationOperation::PutConfig { .. }, None) => { + self.put_config_ok += 1 + } (TufArtifactReplicationOperation::List, Some(_)) => { self.list_err += 1 } @@ -298,27 +305,21 @@ impl TufArtifactReplicationCounters { (TufArtifactReplicationOperation::Copy { .. }, None) => { self.copy_ok += 1 } - (TufArtifactReplicationOperation::Delete { .. }, Some(_)) => { - self.delete_err += 1 - } - (TufArtifactReplicationOperation::Delete { .. }, None) => { - self.delete_ok += 1 - } } } pub fn ok(&self) -> usize { - self.list_ok + self.put_config_ok + .saturating_add(self.list_ok) .saturating_add(self.put_ok) .saturating_add(self.copy_ok) - .saturating_add(self.delete_ok) } pub fn err(&self) -> usize { - self.list_err + self.put_config_err + .saturating_add(self.list_err) .saturating_add(self.put_err) .saturating_add(self.copy_err) - .saturating_add(self.delete_err) } pub fn sum(&self) -> usize { @@ -343,10 +344,10 @@ pub struct TufArtifactReplicationRequest { )] #[serde(tag = "operation", rename_all = "snake_case")] pub enum TufArtifactReplicationOperation { + PutConfig { generation: Generation }, List, Put { hash: ArtifactHash }, Copy { hash: ArtifactHash, source_sled: SledUuid }, - Delete { hash: ArtifactHash }, } /// The status of an `blueprint_rendezvous` background task activation. diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 67a6d1f6981..8b4b8e9eb77 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -19,13 +19,7 @@ "content": { "application/json": { "schema": { - "title": "Map_of_uint", - "type": "object", - "additionalProperties": { - "type": "integer", - "format": "uint", - "minimum": 0 - } + "$ref": "#/components/schemas/ArtifactListResponse" } } } @@ -51,6 +45,14 @@ "type": "string", "format": "hex string (32 bytes)" } + }, + { + "in": "query", + "name": "generation", + "required": true, + "schema": { + "$ref": "#/components/schemas/Generation" + } } ], "requestBody": { @@ -82,31 +84,6 @@ "$ref": "#/components/responses/Error" } } - }, - "delete": { - "operationId": "artifact_delete", - "parameters": [ - { - "in": "path", - "name": "sha256", - "required": true, - "schema": { - "type": "string", - "format": "hex string (32 bytes)" - } - } - ], - "responses": { - "204": { - "description": "successful deletion" - }, - "4XX": { - "$ref": "#/components/responses/Error" - }, - "5XX": { - "$ref": "#/components/responses/Error" - } - } } }, "/artifacts/{sha256}/copy-from-depot": { @@ -121,6 +98,14 @@ "type": "string", "format": "hex string (32 bytes)" } + }, + { + "in": "query", + "name": "generation", + "required": true, + "schema": { + "$ref": "#/components/schemas/Generation" + } } ], "requestBody": { @@ -153,6 +138,53 @@ } } }, + "/artifacts-config": { + "get": { + "operationId": "artifact_config_get", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ArtifactConfig" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "put": { + "operationId": "artifact_config_put", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ArtifactConfig" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/boot-disk/{boot_disk}/os/write": { "post": { "summary": "Write a new host OS image to the specified boot disk", @@ -2139,6 +2171,26 @@ "start_request" ] }, + "ArtifactConfig": { + "type": "object", + "properties": { + "artifacts": { + "type": "array", + "items": { + "type": "string", + "format": "hex string (32 bytes)" + }, + "uniqueItems": true + }, + "generation": { + "$ref": "#/components/schemas/Generation" + } + }, + "required": [ + "artifacts", + "generation" + ] + }, "ArtifactCopyFromDepotBody": { "type": "object", "properties": { @@ -2153,6 +2205,26 @@ "ArtifactCopyFromDepotResponse": { "type": "object" }, + "ArtifactListResponse": { + "type": "object", + "properties": { + "generation": { + "$ref": "#/components/schemas/Generation" + }, + "list": { + "type": "object", + "additionalProperties": { + "type": "integer", + "format": "uint", + "minimum": 0 + } + } + }, + "required": [ + "generation", + "list" + ] + }, "ArtifactPutResponse": { "type": "object", "properties": { diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 08fe84241a9..261c8d6420a 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -2397,6 +2397,28 @@ CREATE TABLE IF NOT EXISTS omicron.public.tuf_repo_artifact ( PRIMARY KEY (tuf_repo_id, tuf_artifact_id) ); +-- Generation number for the current list of TUF artifacts the system wants. +-- This is incremented whenever a TUF repo is added or removed. +CREATE TABLE IF NOT EXISTS omicron.public.tuf_generation ( + -- There should only be one row of this table for the whole DB. + -- It's a little goofy, but filter on "singleton = true" before querying + -- or applying updates, and you'll access the singleton row. + -- + -- We also add a constraint on this table to ensure it's not possible to + -- access the version of this table with "singleton = false". + singleton BOOL NOT NULL PRIMARY KEY, + -- Generation number owned and incremented by Nexus + generation INT NOT NULL, + + CHECK (singleton = true) +); +INSERT INTO omicron.public.tuf_generation ( + singleton, + generation +) VALUES + (TRUE, 1) +ON CONFLICT DO NOTHING; + /*******************************************************************/ /* @@ -4831,7 +4853,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '123.0.0', NULL) + (TRUE, NOW(), NOW(), '124.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/tuf-generation/up.sql b/schema/crdb/tuf-generation/up.sql new file mode 100644 index 00000000000..3cc1d71c99e --- /dev/null +++ b/schema/crdb/tuf-generation/up.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS omicron.public.tuf_generation ( + singleton BOOL NOT NULL PRIMARY KEY, + generation INT NOT NULL, + CHECK (singleton = true) +); +INSERT INTO omicron.public.tuf_generation ( + singleton, + generation +) VALUES + (TRUE, 1) +ON CONFLICT DO NOTHING; diff --git a/sled-agent/api/src/lib.rs b/sled-agent/api/src/lib.rs index 4a393624e6a..925a7c962e4 100644 --- a/sled-agent/api/src/lib.rs +++ b/sled-agent/api/src/lib.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::time::Duration; use camino::Utf8PathBuf; @@ -16,6 +16,7 @@ use nexus_sled_agent_shared::inventory::{ Inventory, OmicronZonesConfig, SledRole, }; use omicron_common::{ + api::external::Generation, api::internal::{ nexus::{DiskRuntimeState, SledVmmState}, shared::{ @@ -402,13 +403,30 @@ pub trait SledAgentApi { body: TypedBody, ) -> Result, HttpError>; + #[endpoint { + method = GET, + path = "/artifacts-config" + }] + async fn artifact_config_get( + rqctx: RequestContext, + ) -> Result, HttpError>; + + #[endpoint { + method = PUT, + path = "/artifacts-config" + }] + async fn artifact_config_put( + rqctx: RequestContext, + body: TypedBody, + ) -> Result; + #[endpoint { method = GET, path = "/artifacts" }] async fn artifact_list( rqctx: RequestContext, - ) -> Result>, HttpError>; + ) -> Result, HttpError>; #[endpoint { method = POST, @@ -417,6 +435,7 @@ pub trait SledAgentApi { async fn artifact_copy_from_depot( rqctx: RequestContext, path_params: Path, + query_params: Query, body: TypedBody, ) -> Result, HttpError>; @@ -428,18 +447,10 @@ pub trait SledAgentApi { async fn artifact_put( rqctx: RequestContext, path_params: Path, + query_params: Query, body: StreamingBody, ) -> Result, HttpError>; - #[endpoint { - method = DELETE, - path = "/artifacts/{sha256}" - }] - async fn artifact_delete( - rqctx: RequestContext, - path_params: Path, - ) -> Result; - /// Take a snapshot of a disk that is attached to an instance #[endpoint { method = POST, @@ -793,17 +804,34 @@ pub struct DiskPathParam { pub disk_id: Uuid, } +#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)] +pub struct ArtifactConfig { + pub generation: Generation, + pub artifacts: BTreeSet, +} + #[derive(Deserialize, JsonSchema)] pub struct ArtifactPathParam { pub sha256: ArtifactHash, } +#[derive(Deserialize, JsonSchema)] +pub struct ArtifactQueryParam { + pub generation: Generation, +} + +#[derive(Debug, Serialize, JsonSchema)] +pub struct ArtifactListResponse { + pub generation: Generation, + pub list: BTreeMap, +} + #[derive(Deserialize, JsonSchema)] pub struct ArtifactCopyFromDepotBody { pub depot_base_url: String, } -#[derive(Serialize, JsonSchema)] +#[derive(Debug, Serialize, JsonSchema)] pub struct ArtifactCopyFromDepotResponse {} #[derive(Debug, Serialize, JsonSchema)] diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index 0ec7fb68f11..a0f60a72130 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -2,8 +2,11 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Manages TUF artifacts stored on this sled. The implementation is a very -//! basic content-addressed object store. +//! Manages TUF artifacts stored on this sled. The implementation is a +//! content-addressed object store. +//! +//! See docs/tuf-artifact-replication.adoc for an architectural overview of the +//! TUF artifact replication system. //! //! GET operations are handled by the "Repo Depot" API, which is deliberately //! a separate Dropshot service from the rest of Sled Agent. This is to avoid a @@ -11,10 +14,10 @@ //! it does not have from another Repo Depot that does have them (at Nexus's //! direction). This API's implementation is also part of this module. //! -//! POST, PUT, and DELETE operations are called by Nexus and handled by the Sled -//! Agent API. +//! Operations that list or modify artifacts or the configuration are called by +//! Nexus and handled by the Sled Agent API. -use std::collections::BTreeMap; +use std::future::Future; use std::io::ErrorKind; use std::net::SocketAddrV6; use std::str::FromStr; @@ -28,10 +31,13 @@ use dropshot::{ }; use futures::{Stream, TryStreamExt}; use omicron_common::address::REPO_DEPOT_PORT; +use omicron_common::api::external::Generation; use omicron_common::update::ArtifactHash; use repo_depot_api::*; use sha2::{Digest, Sha256}; -use sled_agent_api::ArtifactPutResponse; +use sled_agent_api::{ + ArtifactConfig, ArtifactListResponse, ArtifactPutResponse, +}; use sled_storage::dataset::M2_ARTIFACT_DATASET; use sled_storage::error::Error as StorageError; use sled_storage::manager::StorageHandle; @@ -39,11 +45,15 @@ use slog::{error, info, Logger}; use slog_error_chain::{InlineErrorChain, SlogInlineError}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; +use tokio::sync::watch; const TEMP_SUBDIR: &str = "tmp"; /// Content-addressable local storage for software artifacts. /// +/// If you need to read a file managed by the artifact store from somewhere else +/// in Sled Agent, use [`ArtifactStore::get`]. +/// /// Storage for artifacts is backed by datasets that are explicitly designated /// for this purpose. The `T: DatasetsManager` parameter, which varies between /// the real sled agent, the simulated sled agent, and unit tests, specifies @@ -59,25 +69,48 @@ const TEMP_SUBDIR: &str = "tmp"; /// - for PUT, we try to write to all datasets, logging errors as we go; if we /// successfully write the artifact to at least one, we return OK. /// - for GET, we look in each dataset until we find it. -/// - for DELETE, we attempt to delete it from each dataset, logging errors as -/// we go, and failing if we saw any errors. #[derive(Clone)] pub(crate) struct ArtifactStore { log: Logger, reqwest_client: reqwest::Client, + // `tokio::sync::watch` channels are a `std::sync::RwLock` with an + // asynchronous notification mechanism for value updates. Modifying the + // value takes a write lock, and borrowing the value takes a read lock until + // the guard is dropped. + config: watch::Sender>, storage: T, + + /// Used for synchronization in unit tests. + #[cfg(test)] + delete_done: watch::Receiver, } impl ArtifactStore { pub(crate) fn new(log: &Logger, storage: T) -> ArtifactStore { + let log = log.new(slog::o!("component" => "ArtifactStore")); + let (tx, rx) = watch::channel(None); + + #[cfg(test)] + let (done_signal, delete_done) = watch::channel(0u32.into()); + tokio::task::spawn(delete_reconciler( + log.clone(), + storage.clone(), + rx, + #[cfg(test)] + done_signal, + )); ArtifactStore { - log: log.new(slog::o!("component" => "ArtifactStore")), + log, reqwest_client: reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(15)) .read_timeout(Duration::from_secs(15)) .build() .unwrap(), + config: tx, storage, + + #[cfg(test)] + delete_done, } } } @@ -152,20 +185,70 @@ pub enum StartError { Dropshot(#[source] dropshot::BuildError), } -macro_rules! log_and_store { - ($last_error:expr, $log:expr, $verb:literal, $path:expr, $err:expr) => {{ +macro_rules! log_io_err { + ($log:expr, $verb:literal, $path:expr, $err:expr) => { error!( $log, concat!("Failed to ", $verb, " path"); "error" => &$err, "path" => $path.as_str(), - ); + ) + }; +} + +macro_rules! log_and_store { + ($last_error:expr, $log:expr, $verb:literal, $path:expr, $err:expr) => {{ + log_io_err!($log, $verb, $path, $err); $last_error = Some(Error::File { verb: $verb, path: $path, err: $err }); }}; } impl ArtifactStore { - /// GET operation (served by Repo Depot API) + /// Get the current [`ArtifactConfig`]. + pub(crate) fn get_config(&self) -> Option { + self.config.borrow().clone() + } + + /// Set a new [`ArtifactConfig`]. + /// + /// Rejects the configuration with an error if the configuration was + /// modified without increasing the generation number. + pub(crate) fn put_config( + &self, + new_config: ArtifactConfig, + ) -> Result<(), Error> { + let mut result = Ok(()); + let result_ref = &mut result; + // This closure runs during a write lock on `self.config`. This is the + // only write lock taken in this implementation. + self.config.send_if_modified(move |old_config| { + match old_config.as_mut() { + Some(old_config) => { + if new_config == *old_config { + false + } else if new_config.generation > old_config.generation { + *old_config = new_config; + true + } else { + *result_ref = Err(Error::GenerationConfig { + attempted_generation: new_config.generation, + current_generation: old_config.generation, + }); + false + } + } + None => { + *old_config = Some(new_config); + true + } + } + }); + result + } + + /// Open an artifact file by hash. + /// + /// Also the GET operation (served by Repo Depot API). /// /// We try all datasets, returning early if we find the artifact, logging /// errors as we go. If we don't find it we return the most recent error we @@ -201,66 +284,32 @@ impl ArtifactStore { /// /// We try all datasets, logging errors as we go; if we're experiencing I/O /// errors, Nexus should still be aware of the artifacts we think we have. - pub(crate) async fn list( - &self, - ) -> Result, Error> { - let mut map = BTreeMap::new(); + pub(crate) async fn list(&self) -> Result { + let mut response = if let Some(config) = self.config.borrow().as_ref() { + ArtifactListResponse { + generation: config.generation, + list: config.artifacts.iter().map(|hash| (*hash, 0)).collect(), + } + } else { + return Err(Error::NoConfig); + }; let mut any_datasets = false; for mountpoint in self.storage.artifact_storage_paths().await? { any_datasets = true; - let mut read_dir = match tokio::fs::read_dir(&mountpoint).await { - Ok(read_dir) => read_dir, - Err(err) => { - error!( - &self.log, - "Failed to read dir"; - "error" => &err, - "path" => mountpoint.as_str(), - ); - continue; - } - }; - // The semantics of tokio::fs::ReadDir are weird. At least with - // `std::fs::ReadDir`, we know when the end of the iterator is, - // because `.next()` returns `Option>`; we could - // theoretically log the error and continue trying to retrieve - // elements from the iterator (but whether this makes sense to do - // is not documented and likely system-dependent). - // - // The Tokio version returns `Result>`, which - // has no indication of whether there might be more items in - // the stream! (The stream adapter in tokio-stream simply calls - // `Result::transpose()`, so in theory an error is not the end of - // the stream.) - // - // For lack of any direction we stop reading entries from the stream - // on the first error. That way we at least don't get stuck retrying - // an operation that will always fail. - loop { - match read_dir.next_entry().await { - Ok(Some(entry)) => { - if let Ok(file_name) = entry.file_name().into_string() { - if let Ok(hash) = ArtifactHash::from_str(&file_name) - { - *map.entry(hash).or_default() += 1; - } - } - } - Ok(None) => break, + for (hash, count) in &mut response.list { + let path = mountpoint.join(hash.to_string()); + match tokio::fs::try_exists(&path).await { + Ok(true) => *count += 1, + Ok(false) => {} Err(err) => { - error!( - &self.log, - "Failed to read dir"; - "error" => &err, - "path" => mountpoint.as_str(), - ); - break; + log_io_err!(&self.log, "check existence of", path, err) } } } } if any_datasets { - Ok(map) + response.list.retain(|_, count| *count > 0); + Ok(response) } else { Err(Error::NoUpdateDataset) } @@ -281,7 +330,25 @@ impl ArtifactStore { async fn writer( &self, sha256: ArtifactHash, + attempted_generation: Generation, ) -> Result { + if let Some(config) = self.config.borrow().as_ref() { + if attempted_generation != config.generation { + return Err(Error::GenerationPut { + attempted_generation, + current_generation: config.generation, + }); + } + if !config.artifacts.contains(&sha256) { + return Err(Error::NotInConfig { + sha256, + generation: config.generation, + }); + } + } else { + return Err(Error::NoConfig); + } + let mut files = Vec::new(); let mut last_error = None; let mut datasets = 0; @@ -339,9 +406,10 @@ impl ArtifactStore { pub(crate) async fn put_body( &self, sha256: ArtifactHash, + generation: Generation, body: StreamingBody, ) -> Result { - self.writer(sha256) + self.writer(sha256, generation) .await? .write_stream(body.into_stream().map_err(Error::Body)) .await @@ -351,8 +419,12 @@ impl ArtifactStore { pub(crate) async fn copy_from_depot( &self, sha256: ArtifactHash, + generation: Generation, depot_base_url: &str, ) -> Result<(), Error> { + // Check that there's no conflict before we send the upstream request. + let writer = self.writer(sha256, generation).await?; + let client = repo_depot_client::Client::new_with_client( depot_base_url, self.reqwest_client.clone(), @@ -361,8 +433,6 @@ impl ArtifactStore { "base_url" => depot_base_url.to_owned(), )), ); - // Check that there's no conflict before we send the upstream request. - let writer = self.writer(sha256).await?; let response = client .artifact_get_by_sha256(&sha256.to_string()) .await @@ -392,55 +462,119 @@ impl ArtifactStore { }); Ok(()) } +} - /// DELETE operation (served by Sled Agent API) - /// - /// We attempt to delete the artifact in all datasets, logging errors as we - /// go. If any errors occurred we return the most recent error we logged. - pub(crate) async fn delete( - &self, - sha256: ArtifactHash, - ) -> Result<(), Error> { - let sha256 = sha256.to_string(); - let mut any_datasets = false; - let mut last_error = None; - for mountpoint in self.storage.artifact_storage_paths().await? { - any_datasets = true; - let path = mountpoint.join(&sha256); - match tokio::fs::remove_file(&path).await { - Ok(()) => { - info!( - &self.log, - "Removed artifact"; - "sha256" => &sha256, - "path" => path.as_str(), +async fn delete_reconciler( + log: Logger, + storage: T, + mut receiver: watch::Receiver>, + #[cfg(test)] done_signal: watch::Sender, +) { + while let Ok(()) = receiver.changed().await { + let generation = match receiver.borrow_and_update().as_ref() { + Some(config) => config.generation, + None => continue, + }; + info!( + &log, + "Starting delete reconciler"; + "generation" => &generation, + ); + let mountpoints = match storage.artifact_storage_paths().await { + Ok(iter) => iter, + Err(err) => { + error!( + &log, + "Error retrieving dataset configuration"; + "error" => InlineErrorChain::new(&err), + ); + continue; + } + }; + for mountpoint in mountpoints { + let mut read_dir = match tokio::fs::read_dir(&mountpoint).await { + Ok(read_dir) => read_dir, + Err(err) => { + error!( + log, + "Failed to read dir"; + "error" => &err, + "path" => mountpoint.as_str(), ); + continue; } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - log_and_store!(last_error, &self.log, "remove", path, err); + }; + while let Some(result) = read_dir.next_entry().await.transpose() { + let entry = match result { + Ok(entry) => entry, + Err(err) => { + error!( + log, + "Failed to read dir"; + "error" => &err, + "path" => mountpoint.as_str(), + ); + // It's not clear whether we should expect future calls + // to `next_entry` to work after the first error; we + // take the conservative approach and stop iterating. + break; + } + }; + let Ok(file_name) = entry.file_name().into_string() else { + // Content-addressed paths are ASCII-only, so this is + // clearly not a hash. + continue; + }; + let Ok(hash) = ArtifactHash::from_str(&file_name) else { + continue; + }; + if let Some(config) = receiver.borrow().as_ref() { + if config.artifacts.contains(&hash) { + continue; + } + } else { + continue; + } + let sha256 = hash.to_string(); + let path = mountpoint.join(&sha256); + match tokio::fs::remove_file(&path).await { + Ok(()) => { + info!( + &log, + "Removed artifact"; + "sha256" => &sha256, + "path" => path.as_str(), + ); + } + Err(err) if err.kind() == ErrorKind::NotFound => {} + Err(err) => { + log_io_err!(&log, "remove", path, err); + } } } } - if let Some(last_error) = last_error { - Err(last_error) - } else if any_datasets { - Ok(()) - } else { - // If we're here because there aren't any update datasets, we should - // report Service Unavailable instead of a successful result. - Err(Error::NoUpdateDataset) - } + #[cfg(test)] + done_signal.send_if_modified(|old| { + let modified = *old != generation; + *old = generation; + modified + }); } + warn!(log, "Delete reconciler sender dropped"); } /// Abstracts over what kind of sled agent we are; each of the real sled agent, /// simulated sled agent, and this module's unit tests have different ways of /// keeping track of the datasets on the system. -pub(crate) trait DatasetsManager: Sync { - async fn artifact_storage_paths( +pub(crate) trait DatasetsManager: Clone + Send + Sync + 'static { + fn artifact_storage_paths( &self, - ) -> Result + '_, StorageError>; + ) -> impl Future< + Output = Result< + impl Iterator + Send + '_, + StorageError, + >, + > + Send; } impl DatasetsManager for StorageHandle { @@ -456,6 +590,7 @@ impl DatasetsManager for StorageHandle { } /// Abstraction that handles writing to several temporary files. +#[derive(Debug)] struct ArtifactWriter { datasets: usize, files: Vec, Utf8PathBuf)>>, @@ -654,34 +789,69 @@ pub(crate) enum Error { err: std::io::Error, }, + #[error( + "Attempt to modify config to generation {attempted_generation} \ + while at {current_generation}" + )] + GenerationConfig { + attempted_generation: Generation, + current_generation: Generation, + }, + + #[error( + "Attempt to put object with generation {attempted_generation} \ + while at {current_generation}" + )] + GenerationPut { + attempted_generation: Generation, + current_generation: Generation, + }, + #[error("Digest mismatch: expected {expected}, actual {actual}")] HashMismatch { expected: ArtifactHash, actual: ArtifactHash }, #[error("Blocking task failed")] Join(#[from] tokio::task::JoinError), - #[error("Artifact {sha256} not found")] - NotFound { sha256: ArtifactHash }, + #[error("No artifact configuration present")] + NoConfig, #[error("No update datasets present")] NoUpdateDataset, + + #[error("Artifact {sha256} not found")] + NotFound { sha256: ArtifactHash }, + + #[error( + "Attempt to put artifact {sha256} not in config generation {generation}" + )] + NotInConfig { sha256: ArtifactHash, generation: Generation }, } impl From for HttpError { fn from(err: Error) -> HttpError { match err { // 4xx errors - Error::HashMismatch { .. } => { + Error::HashMismatch { .. } + | Error::NoConfig + | Error::NotInConfig { .. } => { HttpError::for_bad_request(None, err.to_string()) } Error::NotFound { .. } => { HttpError::for_not_found(None, err.to_string()) } - Error::AlreadyInProgress { .. } => HttpError::for_client_error( - None, + Error::GenerationConfig { .. } => HttpError::for_client_error( + Some("CONFIG_GENERATION".to_string()), dropshot::ClientErrorStatusCode::CONFLICT, err.to_string(), ), + Error::AlreadyInProgress { .. } | Error::GenerationPut { .. } => { + HttpError::for_client_error( + None, + dropshot::ClientErrorStatusCode::CONFLICT, + err.to_string(), + ) + } // 5xx errors: ensure the error chain is logged Error::Body(inner) => inner, @@ -703,6 +873,9 @@ impl From for HttpError { #[cfg(test)] mod test { + use std::collections::BTreeSet; + use std::sync::Arc; + use camino_tempfile::Utf8TempDir; use futures::stream; use hex_literal::hex; @@ -714,19 +887,21 @@ mod test { use omicron_common::zpool_name::ZpoolName; use omicron_test_utils::dev::test_setup_log; use omicron_uuid_kinds::{DatasetUuid, ZpoolUuid}; + use sled_agent_api::ArtifactConfig; use sled_storage::error::Error as StorageError; use tokio::io::AsyncReadExt; use super::{ArtifactStore, DatasetsManager, Error}; + #[derive(Clone)] struct TestBackend { datasets: DatasetsConfig, - mountpoint_root: Utf8TempDir, + mountpoint_root: Arc, } impl TestBackend { fn new(len: usize) -> TestBackend { - let mountpoint_root = camino_tempfile::tempdir().unwrap(); + let mountpoint_root = Arc::new(camino_tempfile::tempdir().unwrap()); let mut datasets = DatasetsConfig::default(); if len > 0 { @@ -773,21 +948,131 @@ mod test { )); #[tokio::test] - async fn list_get_put_delete() { - let log = test_setup_log("get_put_delete"); + async fn generations() { + macro_rules! assert_generation_err { + ($f:expr, $attempted:expr, $current:expr) => {{ + let err = $f.unwrap_err(); + match err { + Error::GenerationConfig { + attempted_generation, + current_generation, + } => { + assert_eq!( + attempted_generation, $attempted, + "attempted generation does not match" + ); + assert_eq!( + current_generation, $current, + "current generation does not match" + ); + } + err => panic!("wrong error: {err:?}"), + } + }}; + } + + let log = test_setup_log("generations"); let backend = TestBackend::new(2); let store = ArtifactStore::new(&log.log, backend); + // get_config returns None + assert!(store.get_config().is_none()); + // put our first config + let mut config = ArtifactConfig { + generation: 1u32.into(), + artifacts: BTreeSet::new(), + }; + store.put_config(config.clone()).unwrap(); + assert_eq!(store.get_config().unwrap(), config); + + // putting an unmodified config from the same generation succeeds (puts + // are idempotent) + store.put_config(config.clone()).unwrap(); + assert_eq!(store.get_config().unwrap(), config); + // putting an unmodified config from an older generation fails + config.generation = 0u32.into(); + assert_generation_err!( + store.put_config(config.clone()), + 0u32.into(), + 1u32.into() + ); + // putting an unmodified config from a newer generation succeeds + config.generation = 2u32.into(); + store.put_config(config.clone()).unwrap(); + + // putting a modified config from the same generation fails + config = store.get_config().unwrap(); + config.artifacts.insert(TEST_HASH); + assert_generation_err!( + store.put_config(config.clone()), + 2u32.into(), + 2u32.into() + ); + // putting a modified config from an older generation fails + config.generation = 0u32.into(); + assert_generation_err!( + store.put_config(config.clone()), + 0u32.into(), + 2u32.into() + ); + // putting a modified config from a newer generation succeeds + config.generation = store.get_config().unwrap().generation.next(); + store.put_config(config.clone()).unwrap(); + + log.cleanup_successful(); + } + + #[tokio::test] + async fn list_get_put() { + let log = test_setup_log("list_get_put"); + let backend = TestBackend::new(2); + let mut store = ArtifactStore::new(&log.log, backend); + + // get fails, because it doesn't exist yet + assert!(matches!( + store.get(TEST_HASH).await, + Err(Error::NotFound { .. }) + )); + // list/put fail, no config + assert!(matches!(store.list().await.unwrap_err(), Error::NoConfig)); + assert!(matches!( + store.writer(TEST_HASH, 1u32.into()).await.unwrap_err(), + Error::NoConfig + )); + + // put our first config + let mut config = ArtifactConfig { + generation: 1u32.into(), + artifacts: BTreeSet::new(), + }; + config.artifacts.insert(TEST_HASH); + store.put_config(config.clone()).unwrap(); + // list succeeds with an empty result - assert!(store.list().await.unwrap().is_empty()); + let response = store.list().await.unwrap(); + assert_eq!(response.generation, 1u32.into()); + assert!(response.list.is_empty()); // get fails, because it doesn't exist yet assert!(matches!( store.get(TEST_HASH).await, Err(Error::NotFound { .. }) )); - // delete does not fail because we don't fail if the artifact is not - // present - assert!(matches!(store.delete(TEST_HASH).await, Ok(()))); + + // put with the wrong generation fails + for generation in [0u32, 2] { + let err = + store.writer(TEST_HASH, generation.into()).await.unwrap_err(); + match err { + Error::GenerationPut { + attempted_generation, + current_generation, + } => { + assert_eq!(attempted_generation, generation.into()); + assert_eq!(current_generation, 1u32.into()); + } + err => panic!("wrong error: {err}"), + } + } // test several things here: // 1. put succeeds @@ -796,7 +1081,7 @@ mod test { // 3. we don't fail trying to create TEMP_SUBDIR twice for _ in 0..2 { store - .writer(TEST_HASH) + .writer(TEST_HASH, 1u32.into()) .await .unwrap() .write_stream(stream::once(async { Ok(TEST_ARTIFACT) })) @@ -807,6 +1092,7 @@ mod test { .list() .await .unwrap() + .list .into_iter() .eq([(TEST_HASH, 2)])); // get succeeds, file reads back OK @@ -827,16 +1113,26 @@ mod test { ); } - // delete succeeds and is idempotent - for _ in 0..2 { - store.delete(TEST_HASH).await.unwrap(); - // list succeeds with an empty result - assert!(store.list().await.unwrap().is_empty()); - // get now fails because it no longer exists - assert!(matches!( - store.get(TEST_HASH).await, - Err(Error::NotFound { .. }) - )); + // clear `delete_done` so we can synchronize with the delete reconciler + store.delete_done.mark_unchanged(); + // put a new config that says we don't want the artifact anymore. + config.generation = config.generation.next(); + config.artifacts.remove(&TEST_HASH); + store.put_config(config.clone()).unwrap(); + // list succeeds with an empty result, regardless of whether deletion + // has actually occurred yet + assert!(store.list().await.unwrap().list.is_empty()); + // wait for deletion to actually complete + store.delete_done.changed().await.unwrap(); + // get fails, because it has been deleted + assert!(matches!( + store.get(TEST_HASH).await, + Err(Error::NotFound { .. }) + )); + // all datasets should no longer have the artifact + for mountpoint in store.storage.artifact_storage_paths().await.unwrap() + { + assert!(!mountpoint.join(TEST_HASH.to_string()).exists()); } log.cleanup_successful(); @@ -851,9 +1147,15 @@ mod test { let log = test_setup_log("no_dataset"); let backend = TestBackend::new(0); let store = ArtifactStore::new(&log.log, backend); + let mut config = ArtifactConfig { + generation: 1u32.into(), + artifacts: BTreeSet::new(), + }; + config.artifacts.insert(TEST_HASH); + store.put_config(config).unwrap(); assert!(matches!( - store.writer(TEST_HASH).await, + store.writer(TEST_HASH, 1u32.into()).await, Err(Error::NoUpdateDataset) )); assert!(matches!( @@ -861,10 +1163,6 @@ mod test { Err(Error::NotFound { .. }) )); assert!(matches!(store.list().await, Err(Error::NoUpdateDataset))); - assert!(matches!( - store.delete(TEST_HASH).await, - Err(Error::NoUpdateDataset) - )); log.cleanup_successful(); } @@ -878,8 +1176,14 @@ mod test { let log = test_setup_log("wrong_hash"); let backend = TestBackend::new(2); let store = ArtifactStore::new(&log.log, backend); + let mut config = ArtifactConfig { + generation: 1u32.into(), + artifacts: BTreeSet::new(), + }; + config.artifacts.insert(TEST_HASH); + store.put_config(config).unwrap(); let err = store - .writer(TEST_HASH) + .writer(TEST_HASH, 1u32.into()) .await .unwrap() .write_stream(stream::once(async { diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index bf736c215b9..7fa3fbb830e 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -30,7 +30,6 @@ use omicron_common::disk::{ DatasetsConfig, DatasetsManagementResult, DiskVariant, DisksManagementResult, M2Slot, OmicronPhysicalDisksConfig, }; -use omicron_common::update::ArtifactHash; use range_requests::RequestContextEx; use sled_agent_api::*; use sled_agent_types::boot_disk::{ @@ -586,22 +585,44 @@ impl SledAgentApi for SledAgentImpl { async fn artifact_list( rqctx: RequestContext, - ) -> Result>, HttpError> { + ) -> Result, HttpError> { Ok(HttpResponseOk(rqctx.context().artifact_store().list().await?)) } + async fn artifact_config_get( + rqctx: RequestContext, + ) -> Result, HttpError> { + match rqctx.context().artifact_store().get_config() { + Some(config) => Ok(HttpResponseOk(config)), + None => Err(HttpError::for_not_found( + None, + "No artifact configuration present".to_string(), + )), + } + } + + async fn artifact_config_put( + rqctx: RequestContext, + body: TypedBody, + ) -> Result { + rqctx.context().artifact_store().put_config(body.into_inner())?; + Ok(HttpResponseUpdatedNoContent()) + } + async fn artifact_copy_from_depot( rqctx: RequestContext, path_params: Path, + query_params: Query, body: TypedBody, ) -> Result, HttpError> { let sha256 = path_params.into_inner().sha256; + let generation = query_params.into_inner().generation; let depot_base_url = body.into_inner().depot_base_url; rqctx .context() .artifact_store() - .copy_from_depot(sha256, &depot_base_url) + .copy_from_depot(sha256, generation, &depot_base_url) .await?; Ok(HttpResponseAccepted(ArtifactCopyFromDepotResponse {})) } @@ -609,23 +630,20 @@ impl SledAgentApi for SledAgentImpl { async fn artifact_put( rqctx: RequestContext, path_params: Path, + query_params: Query, body: StreamingBody, ) -> Result, HttpError> { let sha256 = path_params.into_inner().sha256; + let generation = query_params.into_inner().generation; Ok(HttpResponseOk( - rqctx.context().artifact_store().put_body(sha256, body).await?, + rqctx + .context() + .artifact_store() + .put_body(sha256, generation, body) + .await?, )) } - async fn artifact_delete( - rqctx: RequestContext, - path_params: Path, - ) -> Result { - let sha256 = path_params.into_inner().sha256; - rqctx.context().artifact_store().delete(sha256).await?; - Ok(HttpResponseDeleted()) - } - async fn vmm_issue_disk_snapshot_request( rqctx: RequestContext, path_params: Path, diff --git a/sled-agent/src/sim/http_entrypoints.rs b/sled-agent/src/sim/http_entrypoints.rs index a65d2114f4e..61563f4cc8a 100644 --- a/sled-agent/src/sim/http_entrypoints.rs +++ b/sled-agent/src/sim/http_entrypoints.rs @@ -37,7 +37,6 @@ use omicron_common::disk::DatasetsConfig; use omicron_common::disk::DatasetsManagementResult; use omicron_common::disk::DisksManagementResult; use omicron_common::disk::OmicronPhysicalDisksConfig; -use omicron_common::update::ArtifactHash; use range_requests::RequestContextEx; use sled_agent_api::*; use sled_agent_types::boot_disk::BootDiskOsWriteStatus; @@ -171,24 +170,46 @@ impl SledAgentApi for SledAgentSimImpl { )) } + async fn artifact_config_get( + rqctx: RequestContext, + ) -> Result, HttpError> { + match rqctx.context().artifact_store().get_config() { + Some(config) => Ok(HttpResponseOk(config)), + None => Err(HttpError::for_not_found( + None, + "No artifact configuration present".to_string(), + )), + } + } + + async fn artifact_config_put( + rqctx: RequestContext, + body: TypedBody, + ) -> Result { + rqctx.context().artifact_store().put_config(body.into_inner())?; + Ok(HttpResponseUpdatedNoContent()) + } + async fn artifact_list( rqctx: RequestContext, - ) -> Result>, HttpError> { + ) -> Result, HttpError> { Ok(HttpResponseOk(rqctx.context().artifact_store().list().await?)) } async fn artifact_copy_from_depot( rqctx: RequestContext, path_params: Path, + query_params: Query, body: TypedBody, ) -> Result, HttpError> { let sha256 = path_params.into_inner().sha256; + let generation = query_params.into_inner().generation; let depot_base_url = body.into_inner().depot_base_url; rqctx .context() .artifact_store() - .copy_from_depot(sha256, &depot_base_url) + .copy_from_depot(sha256, generation, &depot_base_url) .await?; Ok(HttpResponseAccepted(ArtifactCopyFromDepotResponse {})) } @@ -196,23 +217,20 @@ impl SledAgentApi for SledAgentSimImpl { async fn artifact_put( rqctx: RequestContext, path_params: Path, + query_params: Query, body: StreamingBody, ) -> Result, HttpError> { let sha256 = path_params.into_inner().sha256; + let generation = query_params.into_inner().generation; Ok(HttpResponseOk( - rqctx.context().artifact_store().put_body(sha256, body).await?, + rqctx + .context() + .artifact_store() + .put_body(sha256, generation, body) + .await?, )) } - async fn artifact_delete( - rqctx: RequestContext, - path_params: Path, - ) -> Result { - let sha256 = path_params.into_inner().sha256; - rqctx.context().artifact_store().delete(sha256).await?; - Ok(HttpResponseDeleted()) - } - async fn vmm_issue_disk_snapshot_request( rqctx: RequestContext, path_params: Path,