Skip to content

Commit

Permalink
Push on-demand download into Timeline::get() function itself.
Browse files Browse the repository at this point in the history
This makes Timeline::get() async, and all functions that call it
directly or indirectly with it. The with_ondemand_download() mechanism
is gone, Timeline::get() now always downloads files, whether you want
it or not. That is what all the current callers want, so even though
this loses the capability to get a page only if it's already in the
pageserver, without downloading, we were not using that capability.
There were some places that used 'no_ondemand_download' in the WAL
ingestion code that would error out if a layer file was not found
locally, but those were dubious. We do actually want to on-demand
download in all of those places.

Per discussion at
#3233 (comment)
  • Loading branch information
hlinnaka committed Jan 11, 2023
1 parent 95bf19b commit a3a2ee1
Show file tree
Hide file tree
Showing 10 changed files with 489 additions and 693 deletions.
66 changes: 32 additions & 34 deletions pageserver/src/basebackup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tracing::*;
///
use tokio_tar::{Builder, EntryType, Header};

use crate::tenant::{with_ondemand_download, Timeline};
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};

use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
Expand Down Expand Up @@ -171,30 +171,23 @@ where
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in
with_ondemand_download(|| self.timeline.list_slru_segments(kind, self.lsn)).await?
{
for segno in self.timeline.list_slru_segments(kind, self.lsn).await? {
self.add_slru_segment(kind, segno).await?;
}
}

// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
with_ondemand_download(|| self.timeline.list_dbdirs(self.lsn)).await?
{
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn).await? {
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;

// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in
with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
.await?
{
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn).await? {
self.add_rel(rel).await?;
}
}
}
for xid in with_ondemand_download(|| self.timeline.list_twophase_files(self.lsn)).await? {
for xid in self.timeline.list_twophase_files(self.lsn).await? {
self.add_twophase_file(xid).await?;
}

Expand All @@ -210,8 +203,7 @@ where
}

async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download(|| self.timeline.get_rel_size(tag, self.lsn, false)).await?;
let nblocks = self.timeline.get_rel_size(tag, self.lsn, false).await?;

// If the relation is empty, create an empty file
if nblocks == 0 {
Expand All @@ -229,11 +221,10 @@ where

let mut segment_data: Vec<u8> = vec![];
for blknum in startblk..endblk {
let img = with_ondemand_download(|| {
self.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
})
.await?;
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
.await?;
segment_data.extend_from_slice(&img[..]);
}

Expand All @@ -252,17 +243,17 @@ where
// Generate SLRU segment files from repository.
//
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download(|| self.timeline.get_slru_segment_size(slru, segno, self.lsn))
.await?;
let nblocks = self
.timeline
.get_slru_segment_size(slru, segno, self.lsn)
.await?;

let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
for blknum in 0..nblocks {
let img = with_ondemand_download(|| {
self.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
})
.await?;
let img = self
.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
.await?;

if slru == SlruKind::Clog {
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
Expand Down Expand Up @@ -294,9 +285,10 @@ where
has_relmap_file: bool,
) -> anyhow::Result<()> {
let relmap_img = if has_relmap_file {
let img =
with_ondemand_download(|| self.timeline.get_relmap_file(spcnode, dbnode, self.lsn))
.await?;
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, self.lsn)
.await?;
ensure!(img.len() == 512);
Some(img)
} else {
Expand Down Expand Up @@ -329,7 +321,9 @@ where
// XLOG_TBLSPC_DROP records. But we probably should just
// throw an error on CREATE TABLESPACE in the first place.
if !has_relmap_file
&& with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
&& self
.timeline
.list_rels(spcnode, dbnode, self.lsn)
.await?
.is_empty()
{
Expand Down Expand Up @@ -362,7 +356,7 @@ where
// Extract twophase state files
//
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
let img = with_ondemand_download(|| self.timeline.get_twophase_file(xid, self.lsn)).await?;
let img = self.timeline.get_twophase_file(xid, self.lsn).await?;

let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
Expand Down Expand Up @@ -398,10 +392,14 @@ where
)
.await?;

let checkpoint_bytes = with_ondemand_download(|| self.timeline.get_checkpoint(self.lsn))
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = with_ondemand_download(|| self.timeline.get_control_file(self.lsn))
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.await
.context("failed get control bytes")?;

Expand Down
16 changes: 13 additions & 3 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::models::{
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{with_ondemand_download, Timeline};
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
Expand Down Expand Up @@ -77,6 +77,15 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
})
}

fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
match err {
PageReconstructError::Other(err) => ApiError::InternalServerError(err),
PageReconstructError::WalRedo(err) => {
ApiError::InternalServerError(anyhow::Error::new(err))
}
}
}

// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
Expand Down Expand Up @@ -298,9 +307,10 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.map_err(ApiError::NotFound)?;
let result = with_ondemand_download(|| timeline.find_lsn_for_timestamp(timestamp_pg))
let result = timeline
.find_lsn_for_timestamp(timestamp_pg)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(apierror_from_prerror)?;

let result = match result {
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
Expand Down
16 changes: 10 additions & 6 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn import_rel(
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32).await {
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
Expand Down Expand Up @@ -178,7 +178,7 @@ async fn import_rel(
//
// If we process rel segments out of order,
// put_rel_extend will skip the update.
modification.put_rel_extend(rel, blknum)?;
modification.put_rel_extend(rel, blknum).await?;

Ok(())
}
Expand Down Expand Up @@ -206,7 +206,9 @@ async fn import_slru(

ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);

modification.put_slru_segment_creation(slru, segno, nblocks as u32)?;
modification
.put_slru_segment_creation(slru, segno, nblocks as u32)
.await?;

let mut rpageno = 0;
loop {
Expand Down Expand Up @@ -492,7 +494,7 @@ async fn import_file(
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
debug!("imported relmap file")
}
"PG_VERSION" => {
Expand All @@ -515,7 +517,7 @@ async fn import_file(
match file_name.as_ref() {
"pg_filenode.map" => {
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
debug!("imported relmap file")
}
"PG_VERSION" => {
Expand Down Expand Up @@ -545,7 +547,9 @@ async fn import_file(
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;

let bytes = read_all_bytes(reader).await?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
modification
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))
.await?;
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
Expand Down
24 changes: 8 additions & 16 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,7 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;

let exists = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_exists(req.rel, lsn, req.latest)
})
.await?;
let exists = timeline.get_rel_exists(req.rel, lsn, req.latest).await?;

Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
exists,
Expand All @@ -566,10 +563,7 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;

let n_blocks = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_size(req.rel, lsn, req.latest)
})
.await?;
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest).await?;

Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
n_blocks,
Expand All @@ -586,10 +580,9 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;

let total_blocks = crate::tenant::with_ondemand_download(|| {
timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)
})
.await?;
let total_blocks = timeline
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;

Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
Expand All @@ -615,10 +608,9 @@ impl PageServerHandler {
}
*/

let page = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)
})
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)
.await?;

Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
Expand Down
Loading

0 comments on commit a3a2ee1

Please sign in to comment.