Skip to content

Commit

Permalink
Enable on-demand download in WalIngest. (#3233)
Browse files Browse the repository at this point in the history
Makes the top-level functions in WalIngest async, and replaces
no_ondemand_download calls with with_ondemand_download.

This hopefully fixes the problem reported in issue #3230, although I
don't have a self-contained test case for it.
  • Loading branch information
hlinnaka authored Jan 3, 2023
1 parent 0a0e55c commit 8b692e1
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 194 deletions.
15 changes: 7 additions & 8 deletions pageserver/src/basebackup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ where

let mut segment_data: Vec<u8> = vec![];
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
.no_ondemand_download()?;
let img = with_ondemand_download(|| {
self.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
})
.await?;
segment_data.extend_from_slice(&img[..]);
}

Expand Down Expand Up @@ -313,10 +314,8 @@ where
// XLOG_TBLSPC_DROP records. But we probably should just
// throw an error on CREATE TABLESPACE in the first place.
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, self.lsn)
.no_ondemand_download()?
&& with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
.await?
.is_empty()
{
return Ok(());
Expand Down
13 changes: 7 additions & 6 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ pub async fn import_timeline_from_postgres_datadir(
tline,
Lsn(pg_control.checkPointCopy.redo),
pgdata_lsn,
)?;
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -240,7 +241,7 @@ async fn import_slru(

/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(
async fn import_wal(
walpath: &Path,
tline: &Timeline,
startpoint: Lsn,
Expand All @@ -253,7 +254,7 @@ fn import_wal(
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;

let mut walingest = WalIngest::new(tline, startpoint).no_ondemand_download()?;
let mut walingest = WalIngest::new(tline, startpoint).await?;

while last_lsn <= endpoint {
// FIXME: assume postgresql tli 1 for now
Expand Down Expand Up @@ -291,7 +292,7 @@ fn import_wal(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.no_ondemand_download()?;
.await?;
last_lsn = lsn;

nrecords += 1;
Expand Down Expand Up @@ -375,7 +376,7 @@ pub async fn import_wal_from_tar(
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn).no_ondemand_download()?;
let mut walingest = WalIngest::new(tline, start_lsn).await?;

// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
Expand Down Expand Up @@ -425,7 +426,7 @@ pub async fn import_wal_from_tar(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.no_ondemand_download()?;
.await?;
last_lsn = lsn;

debug!("imported record at {} (end {})", lsn, end_lsn);
Expand Down
Loading

0 comments on commit 8b692e1

Please sign in to comment.