Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: fix race cleaning up timeline files when shut down during bootstrap #10532

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3108,12 +3108,16 @@ async fn put_tenant_timeline_import_basebackup(

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);

let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
let tenant_shard_id = TenantShardId::unsharded(tenant_id);

let span = info_span!("import_basebackup",
tenant_id=%tenant_id, timeline_id=%timeline_id, shard_id=%tenant_shard_id.shard_slug(),
base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
async move {
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?;
.get_attached_tenant_shard(tenant_shard_id)?;

let broker_client = state.broker_client.clone();

Expand Down
66 changes: 29 additions & 37 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ impl Tenant {
// Make sure the freeze_and_flush reaches remote storage.
tline.remote_client.wait_completion().await.unwrap();

let tl = uninit_tl.finish_creation()?;
let tl = uninit_tl.finish_creation().await?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
Expand Down Expand Up @@ -4688,7 +4688,7 @@ impl Tenant {
)
.await?;

let new_timeline = uninitialized_timeline.finish_creation()?;
let new_timeline = uninitialized_timeline.finish_creation().await?;

// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
Expand Down Expand Up @@ -4878,10 +4878,11 @@ impl Tenant {
}

// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
let pgdata_path_deferred = pgdata_path.clone();
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
if let Err(e) = fs::remove_dir_all(&pgdata_path_deferred) {
// this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
error!("Failed to remove temporary initdb directory '{pgdata_path_deferred}': {e}");
}
}
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
Expand Down Expand Up @@ -4948,7 +4949,7 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
let raw_timeline = self
let mut raw_timeline = self
.prepare_new_timeline(
timeline_id,
&new_metadata,
Expand All @@ -4959,42 +4960,33 @@ impl Tenant {
.await?;

let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();

import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
pgdata_lsn,
ctx,
)
.await
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
raw_timeline
.write(|unfinished_timeline| async move {
import_datadir::import_timeline_from_postgres_datadir(
&unfinished_timeline,
&pgdata_path,
pgdata_lsn,
ctx,
)
.await
.with_context(|| {
format!(
"Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}"
)
})?;

fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});
fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});

unfinished_timeline
.freeze_and_flush()
.await
.with_context(|| {
format!(
"Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
)
})?;
Ok(())
})
.await?;

// All done!
let timeline = raw_timeline.finish_creation()?;
let timeline = raw_timeline.finish_creation().await?;

// Callers are responsible to wait for uploads to complete and for activating the timeline.

Expand Down
129 changes: 95 additions & 34 deletions pageserver/src/tenant/timeline/uninit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::hash_map::Entry, fs, sync::Arc};
use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc};

use anyhow::Context;
use camino::Utf8PathBuf;
Expand All @@ -8,7 +8,8 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
use crate::{
context::RequestContext,
import_datadir,
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
span::debug_assert_current_span_has_tenant_and_timeline_id,
tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
};

use super::Timeline;
Expand All @@ -24,6 +25,9 @@ pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
/// Whether we spawned the inner Timeline's tasks such that we must later shut it down
/// if aborting the timeline creation
needs_shutdown: bool,
}

impl<'t> UninitializedTimeline<'t> {
Expand All @@ -36,6 +40,50 @@ impl<'t> UninitializedTimeline<'t> {
owning_tenant,
timeline_id,
raw_timeline,
needs_shutdown: false,
}
}

/// When writing data to this timeline during creation, use this wrapper: it will take care of
/// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
/// later.
pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
where
F: FnOnce(Arc<Timeline>) -> Fut,
Fut: Future<Output = Result<(), CreateTimelineError>>,
{
debug_assert_current_span_has_tenant_and_timeline_id();

// Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
self.needs_shutdown = true;

let timeline = self.raw_timeline()?;

// Spawn flush loop so that the Timeline is ready to accept writes
timeline.maybe_spawn_flush_loop();

// Invoke the provided function, which will write some data into the new timeline
if let Err(e) = f(timeline.clone()).await {
self.abort().await;
return Err(e.into());
}

// Flush the underlying timeline's ephemeral layers to disk
if let Err(e) = timeline
.freeze_and_flush()
.await
.context("Failed to flush after timeline creation writes")
{
self.abort().await;
return Err(e);
}

Ok(())
}

pub(crate) async fn abort(&self) {
if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
raw_timeline.shutdown(super::ShutdownMode::Hard).await;
}
}

Expand All @@ -44,11 +92,13 @@ impl<'t> UninitializedTimeline<'t> {
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_shard_id = self.owning_tenant.tenant_shard_id;

if self.raw_timeline.is_none() {
self.abort().await;

return Err(anyhow::anyhow!(
"No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
));
Expand All @@ -62,16 +112,25 @@ impl<'t> UninitializedTimeline<'t> {
.0
.get_disk_consistent_lsn();

anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
);
if !new_disk_consistent_lsn.is_valid() {
self.abort().await;

return Err(anyhow::anyhow!(
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
));
}

let mut timelines = self.owning_tenant.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Occupied(_) => anyhow::bail!(
Entry::Occupied(_) => {
// Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
//
// We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
// skip trying to delete the timeline directory too.
anyhow::bail!(
"Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
),
)
}
Entry::Vacant(v) => {
// after taking here should be no fallible operations, because the drop guard will not
// cleanup after and would block for example the tenant deletion
Expand All @@ -93,36 +152,31 @@ impl<'t> UninitializedTimeline<'t> {

/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
mut self,
tenant: Arc<Tenant>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
self.write(|raw_timeline| async move {
import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")
.map_err(CreateTimelineError::Other)?;

import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
raw_timeline.maybe_spawn_flush_loop();

fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});

raw_timeline
.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
Ok(())
})
.await?;

// All the data has been imported. Insert the Timeline into the tenant's timelines map
let tl = self.finish_creation()?;
let tl = self.finish_creation().await?;
tl.activate(tenant, broker_client, None, ctx);
Ok(tl)
}
Expand All @@ -143,12 +197,19 @@ impl<'t> UninitializedTimeline<'t> {

impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, create_guard)) = self.raw_timeline.take() {
if let Some((timeline, create_guard)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
// This is unusual, but can happen harmlessly if the pageserver is stopped while
// creating a timeline.
info!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(create_guard);
if self.needs_shutdown && !timeline.gate.close_complete() {
// This should not happen: caller should call [`Self::abort`] on failures
tracing::warn!(
"Timeline not shut down after initialization failure, cannot clean up files"
);
} else {
// This is unusual, but can happen harmlessly if the pageserver is stopped while
// creating a timeline.
info!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(create_guard);
}
}
}
}
Expand Down
Loading