diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5452719bcd54..d2eaf31c30f5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3108,12 +3108,16 @@ async fn put_tenant_timeline_import_basebackup( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); - let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version); + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + let span = info_span!("import_basebackup", + tenant_id=%tenant_id, timeline_id=%timeline_id, shard_id=%tenant_shard_id.shard_slug(), + base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version); async move { let state = get_state(&request); let tenant = state .tenant_manager - .get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?; + .get_attached_tenant_shard(tenant_shard_id)?; let broker_client = state.broker_client.clone(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index efe89cb98239..3df084a93170 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2424,7 +2424,7 @@ impl Tenant { // Make sure the freeze_and_flush reaches remote storage. tline.remote_client.wait_completion().await.unwrap(); - let tl = uninit_tl.finish_creation()?; + let tl = uninit_tl.finish_creation().await?; // The non-test code would call tl.activate() here. tl.set_state(TimelineState::Active); Ok(tl) @@ -4688,7 +4688,7 @@ impl Tenant { ) .await?; - let new_timeline = uninitialized_timeline.finish_creation()?; + let new_timeline = uninitialized_timeline.finish_creation().await?; // Root timeline gets its layers during creation and uploads them along with the metadata. // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created. @@ -4878,10 +4878,11 @@ impl Tenant { } // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it + let pgdata_path_deferred = pgdata_path.clone(); scopeguard::defer! { - if let Err(e) = fs::remove_dir_all(&pgdata_path) { + if let Err(e) = fs::remove_dir_all(&pgdata_path_deferred) { // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call - error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}"); + error!("Failed to remove temporary initdb directory '{pgdata_path_deferred}': {e}"); } } if let Some(existing_initdb_timeline_id) = load_existing_initdb { @@ -4948,7 +4949,7 @@ impl Tenant { pgdata_lsn, pg_version, ); - let raw_timeline = self + let mut raw_timeline = self .prepare_new_timeline( timeline_id, &new_metadata, @@ -4959,42 +4960,33 @@ impl Tenant { .await?; let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id; - let unfinished_timeline = raw_timeline.raw_timeline()?; - - // Flush the new layer files to disk, before we make the timeline as available to - // the outside world. - // - // Flush loop needs to be spawned in order to be able to flush. - unfinished_timeline.maybe_spawn_flush_loop(); - - import_datadir::import_timeline_from_postgres_datadir( - unfinished_timeline, - &pgdata_path, - pgdata_lsn, - ctx, - ) - .await - .with_context(|| { - format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}") - })?; + raw_timeline + .write(|unfinished_timeline| async move { + import_datadir::import_timeline_from_postgres_datadir( + &unfinished_timeline, + &pgdata_path, + pgdata_lsn, + ctx, + ) + .await + .with_context(|| { + format!( + "Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}" + ) + })?; - fail::fail_point!("before-checkpoint-new-timeline", |_| { - Err(CreateTimelineError::Other(anyhow::anyhow!( - "failpoint before-checkpoint-new-timeline" - ))) - }); + fail::fail_point!("before-checkpoint-new-timeline", |_| { + Err(CreateTimelineError::Other(anyhow::anyhow!( + "failpoint before-checkpoint-new-timeline" + ))) + }); - unfinished_timeline - .freeze_and_flush() - .await - .with_context(|| { - format!( - "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}" - ) - })?; + Ok(()) + }) + .await?; // All done! - let timeline = raw_timeline.finish_creation()?; + let timeline = raw_timeline.finish_creation().await?; // Callers are responsible to wait for uploads to complete and for activating the timeline. diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index 80a09b4840d0..3074463384fb 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -1,4 +1,4 @@ -use std::{collections::hash_map::Entry, fs, sync::Arc}; +use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc}; use anyhow::Context; use camino::Utf8PathBuf; @@ -8,7 +8,8 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard}; use crate::{ context::RequestContext, import_datadir, - tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded}, + span::debug_assert_current_span_has_tenant_and_timeline_id, + tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded}, }; use super::Timeline; @@ -24,6 +25,9 @@ pub struct UninitializedTimeline<'t> { pub(crate) owning_tenant: &'t Tenant, timeline_id: TimelineId, raw_timeline: Option<(Arc, TimelineCreateGuard)>, + /// Whether we spawned the inner Timeline's tasks such that we must later shut it down + /// if aborting the timeline creation + needs_shutdown: bool, } impl<'t> UninitializedTimeline<'t> { @@ -36,6 +40,50 @@ impl<'t> UninitializedTimeline<'t> { owning_tenant, timeline_id, raw_timeline, + needs_shutdown: false, + } + } + + /// When writing data to this timeline during creation, use this wrapper: it will take care of + /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down + /// later. + pub(crate) async fn write(&mut self, f: F) -> anyhow::Result<()> + where + F: FnOnce(Arc) -> Fut, + Fut: Future>, + { + debug_assert_current_span_has_tenant_and_timeline_id(); + + // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop + self.needs_shutdown = true; + + let timeline = self.raw_timeline()?; + + // Spawn flush loop so that the Timeline is ready to accept writes + timeline.maybe_spawn_flush_loop(); + + // Invoke the provided function, which will write some data into the new timeline + if let Err(e) = f(timeline.clone()).await { + self.abort().await; + return Err(e.into()); + } + + // Flush the underlying timeline's ephemeral layers to disk + if let Err(e) = timeline + .freeze_and_flush() + .await + .context("Failed to flush after timeline creation writes") + { + self.abort().await; + return Err(e); + } + + Ok(()) + } + + pub(crate) async fn abort(&self) { + if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() { + raw_timeline.shutdown(super::ShutdownMode::Hard).await; } } @@ -44,11 +92,13 @@ impl<'t> UninitializedTimeline<'t> { /// This function launches the flush loop if not already done. /// /// The caller is responsible for activating the timeline (function `.activate()`). - pub(crate) fn finish_creation(mut self) -> anyhow::Result> { + pub(crate) async fn finish_creation(mut self) -> anyhow::Result> { let timeline_id = self.timeline_id; let tenant_shard_id = self.owning_tenant.tenant_shard_id; if self.raw_timeline.is_none() { + self.abort().await; + return Err(anyhow::anyhow!( "No timeline for initialization found for {tenant_shard_id}/{timeline_id}" )); @@ -62,16 +112,25 @@ impl<'t> UninitializedTimeline<'t> { .0 .get_disk_consistent_lsn(); - anyhow::ensure!( - new_disk_consistent_lsn.is_valid(), - "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn" - ); + if !new_disk_consistent_lsn.is_valid() { + self.abort().await; + + return Err(anyhow::anyhow!( + "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn" + )); + } let mut timelines = self.owning_tenant.timelines.lock().unwrap(); match timelines.entry(timeline_id) { - Entry::Occupied(_) => anyhow::bail!( + Entry::Occupied(_) => { + // Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline. + // + // We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should + // skip trying to delete the timeline directory too. + anyhow::bail!( "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map" - ), + ) + } Entry::Vacant(v) => { // after taking here should be no fallible operations, because the drop guard will not // cleanup after and would block for example the tenant deletion @@ -93,36 +152,31 @@ impl<'t> UninitializedTimeline<'t> { /// Prepares timeline data by loading it from the basebackup archive. pub(crate) async fn import_basebackup_from_tar( - self, + mut self, tenant: Arc, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, ) -> anyhow::Result> { - let raw_timeline = self.raw_timeline()?; + self.write(|raw_timeline| async move { + import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx) + .await + .context("Failed to import basebackup") + .map_err(CreateTimelineError::Other)?; - import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx) - .await - .context("Failed to import basebackup")?; - - // Flush the new layer files to disk, before we make the timeline as available to - // the outside world. - // - // Flush loop needs to be spawned in order to be able to flush. - raw_timeline.maybe_spawn_flush_loop(); - - fail::fail_point!("before-checkpoint-new-timeline", |_| { - anyhow::bail!("failpoint before-checkpoint-new-timeline"); - }); + fail::fail_point!("before-checkpoint-new-timeline", |_| { + Err(CreateTimelineError::Other(anyhow::anyhow!( + "failpoint before-checkpoint-new-timeline" + ))) + }); - raw_timeline - .freeze_and_flush() - .await - .context("Failed to flush after basebackup import")?; + Ok(()) + }) + .await?; // All the data has been imported. Insert the Timeline into the tenant's timelines map - let tl = self.finish_creation()?; + let tl = self.finish_creation().await?; tl.activate(tenant, broker_client, None, ctx); Ok(tl) } @@ -143,12 +197,19 @@ impl<'t> UninitializedTimeline<'t> { impl Drop for UninitializedTimeline<'_> { fn drop(&mut self) { - if let Some((_, create_guard)) = self.raw_timeline.take() { + if let Some((timeline, create_guard)) = self.raw_timeline.take() { let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered(); - // This is unusual, but can happen harmlessly if the pageserver is stopped while - // creating a timeline. - info!("Timeline got dropped without initializing, cleaning its files"); - cleanup_timeline_directory(create_guard); + if self.needs_shutdown && !timeline.gate.close_complete() { + // This should not happen: caller should call [`Self::abort`] on failures + tracing::warn!( + "Timeline not shut down after initialization failure, cannot clean up files" + ); + } else { + // This is unusual, but can happen harmlessly if the pageserver is stopped while + // creating a timeline. + info!("Timeline got dropped without initializing, cleaning its files"); + cleanup_timeline_directory(create_guard); + } } } }