Skip to content

Commit

Permalink
Add range support to the streaming body extension
Browse files Browse the repository at this point in the history
  • Loading branch information
Icelk committed Nov 6, 2024
1 parent cf62b7f commit 59f5fbc
Showing 1 changed file with 35 additions and 9 deletions.
44 changes: 35 additions & 9 deletions src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,10 @@ impl<R> RuleSet<R> {
pub fn stream_body() -> Box<dyn PrepareCall> {
prepare!(req, host, path, _addr, {
debug!("Streaming body for {:?}", req.uri().path());
let range = utils::parse::sanitize_request(req)
.ok()
.and_then(|data| data.get_range());
let start = range.map_or(0, |range| range.0);
if let Some(path) = path {
let file = fs::File::open(path).await;
let meta = if let Ok(_file) = &file {
Expand Down Expand Up @@ -1237,15 +1241,29 @@ pub fn stream_body() -> Box<dyn PrepareCall> {
}

#[cfg(feature = "uring")]
let len = meta.stx_size as usize;
let file_len = meta.stx_size;
#[cfg(not(feature = "uring"))]
let len = meta.len() as usize;
let file_len = meta.len();

let end = if let Some((_, end)) = range {
end
} else {
file_len
};
let len = end - start;

#[cfg(not(feature = "uring"))]
{
use tokio::io::AsyncSeekExt;
if file.seek(io::SeekFrom::Start(start)).await.is_err() {
return default_error_response(StatusCode::NOT_FOUND, host, None).await;
};
}

#[allow(clippy::uninit_vec)]
let fut = response_pipe_fut!(response, _host, move |file: fs::File| {
let mut buf = Vec::with_capacity(1024 * 64);
#[cfg(feature = "uring")]
let mut pos = 0;
let mut pos = start;
unsafe { buf.set_len(buf.capacity()) };
let mut i = 0u32;
loop {
Expand All @@ -1264,15 +1282,20 @@ pub fn stream_body() -> Box<dyn PrepareCall> {
if read == 0 {
break;
}
#[cfg(feature = "uring")]
{
pos += read as u64;
}
pos += read as u64;

// it what was just read into memory, safe to cast to usize
#[allow(clippy::cast_possible_truncation)]
let buf_end = if pos > end {
read - (pos - end) as usize
} else {
read
};
// one chunk is max 64kB (see buffer above)
// we want to check connection status every, say, 10MB, to not
// exhaust resources.
// 10MB/64kB = 160
let data = Bytes::copy_from_slice(&buf[..read]);
let data = Bytes::copy_from_slice(&buf[..buf_end]);
let r = if i % 160 == 0 {
response.send_with_wait(data, 10 * 1024 * 1024).await
} else {
Expand All @@ -1284,6 +1307,9 @@ pub fn stream_body() -> Box<dyn PrepareCall> {
break;
}
}
if pos >= end {
break;
}
}
Err(err) => {
warn!("Failed to stream body from file: {err}");
Expand Down

0 comments on commit 59f5fbc

Please sign in to comment.