Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(async-tar port): keep state accross yield points when reading data #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<R: Read + Unpin> Archive<R> {

Ok(Entries {
archive: self.clone(),
next: 0,
current: (0, None, 0, None),
gnu_longlink: None,
gnu_longname: None,
pax_extensions: None,
Expand All @@ -195,7 +195,7 @@ impl<R: Read + Unpin> Archive<R> {

Ok(RawEntries {
archive: self.clone(),
next: 0,
current: (0, None, 0),
})
}

Expand Down Expand Up @@ -236,7 +236,7 @@ impl<R: Read + Unpin> Archive<R> {
/// Stream of `Entry`s.
pub struct Entries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
gnu_longname: Option<Vec<u8>>,
gnu_longlink: Option<Vec<u8>>,
pax_extensions: Option<Vec<u8>>,
Expand Down Expand Up @@ -266,8 +266,15 @@ impl<R: Read + Unpin> Stream for Entries<R> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let entry = ready_opt_err!(poll_next_raw(self.archive.clone(), &mut self.next, cx));

let archive = self.archive.clone();
let (next, current_header, current_header_pos, _) = &mut self.current;
let entry = ready_opt_err!(poll_next_raw(
archive,
next,
current_header,
current_header_pos,
cx
));
let is_recognized_header =
entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
if is_recognized_header && entry.header().entry_type().is_gnu_longname() {
Expand Down Expand Up @@ -315,9 +322,14 @@ impl<R: Read + Unpin> Stream for Entries<R> {
fields.long_linkname = self.gnu_longlink.take();
fields.pax_extensions = self.pax_extensions.take();

let archive = self.archive.clone();
let (next, _, current_pos, current_ext) = &mut self.current;

ready_err!(poll_parse_sparse_header(
self.archive.clone(),
&mut self.next,
archive,
next,
current_ext,
current_pos,
&mut fields,
cx
));
Expand All @@ -330,36 +342,48 @@ impl<R: Read + Unpin> Stream for Entries<R> {
/// Stream of raw `Entry`s.
pub struct RawEntries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
current: (u64, Option<Header>, usize),
}

impl<R: Read + Unpin> Stream for RawEntries<R> {
type Item = io::Result<Entry<Archive<R>>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
poll_next_raw(self.archive.clone(), &mut self.next, cx)
let archive = self.archive.clone();
let (next, current_header, current_header_pos) = &mut self.current;
poll_next_raw(archive, next, current_header, current_header_pos, cx)
}
}

fn poll_next_raw<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_header: &mut Option<Header>,
current_header_pos: &mut usize,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
let mut header = Header::new_old();
let mut header_pos = *next;

loop {
// Seek to the start of the next header in the archive
let delta = *next - archive.inner.pos.load(Ordering::SeqCst);

match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
if current_header.is_none() {
let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
}
*current_header = Some(Header::new_old());
*current_header_pos = 0;
}

let header = current_header.as_mut().unwrap();
// EOF is an indicator that we are at the end of the archive.
match futures_core::ready!(poll_try_read_all(&mut archive, cx, header.as_mut_bytes())) {
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
header.as_mut_bytes(),
current_header_pos
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err))),
Expand All @@ -381,6 +405,7 @@ fn poll_next_raw<R: Read + Unpin>(
header_pos = *next;
}

let header = current_header.as_mut().unwrap();
// Make sure the checksum is ok
let sum = header.as_bytes()[..148]
.iter()
Expand All @@ -397,6 +422,9 @@ fn poll_next_raw<R: Read + Unpin>(

let mut data = VecDeque::with_capacity(1);
data.push_back(EntryIo::Data(archive.clone().take(size)));
drop(header);

let header = current_header.take().unwrap();

let ret = EntryFields {
size,
Expand Down Expand Up @@ -424,6 +452,8 @@ fn poll_next_raw<R: Read + Unpin>(
fn poll_parse_sparse_header<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_ext: &mut Option<GnuExtSparseHeader>,
current_ext_pos: &mut usize,
entry: &mut EntryFields<Archive<R>>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Expand Down Expand Up @@ -500,11 +530,22 @@ fn poll_parse_sparse_header<R: Read + Unpin>(
add_block(block)?
}
if gnu.is_extended() {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
let started_header = current_ext.is_some();
if !started_header {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
*current_ext = Some(ext);
*current_ext_pos = 0;
}

let ext = current_ext.as_mut().unwrap();
while ext.is_extended() {
match futures_core::ready!(poll_try_read_all(&mut archive, cx, ext.as_mut_bytes()))
{
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
ext.as_mut_bytes(),
current_ext_pos
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
Err(err) => return Poll::Ready(Err(err)),
Expand Down Expand Up @@ -567,23 +608,24 @@ fn poll_try_read_all<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
buf: &mut [u8],
pos: &mut usize,
) -> Poll<io::Result<bool>> {
let mut read = 0;
while read < buf.len() {
let mut read_buf = io::ReadBuf::new(&mut buf[read..]);
while *pos < buf.len() {
let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
Ok(()) if read_buf.filled().is_empty() => {
if read == 0 {
if *pos == 0 {
return Poll::Ready(Ok(false));
}

return Poll::Ready(Err(other("failed to read entire block")));
}
Ok(()) => read += read_buf.filled().len(),
Ok(()) => *pos += read_buf.filled().len(),
Err(err) => return Poll::Ready(Err(err)),
}
}

*pos = 0;
Poll::Ready(Ok(true))
}

Expand Down