From 883a616f0bf3258f1898e0fad0c3707c630f81b0 Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Thu, 7 Sep 2023 11:38:40 +0100 Subject: [PATCH] stream: split RecvBuf and SendBuf into submodules --- quiche/src/lib.rs | 2 +- quiche/src/{stream.rs => stream/mod.rs} | 2113 +++-------------------- quiche/src/stream/recv_buf.rs | 977 +++++++++++ quiche/src/stream/send_buf.rs | 775 +++++++++ 4 files changed, 1980 insertions(+), 1887 deletions(-) rename quiche/src/{stream.rs => stream/mod.rs} (58%) create mode 100644 quiche/src/stream/recv_buf.rs create mode 100644 quiche/src/stream/send_buf.rs diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index 3ba2c0f9..39d2c5b0 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -6801,7 +6801,7 @@ impl Connection { let was_readable = stream.is_readable(); let priority_key = Arc::clone(&stream.priority_key); - let was_draining = stream.is_draining(); + let was_draining = stream.recv.is_draining(); stream.recv.write(data)?; diff --git a/quiche/src/stream.rs b/quiche/src/stream/mod.rs similarity index 58% rename from quiche/src/stream.rs rename to quiche/src/stream/mod.rs index 6b1a620a..708b6bed 100644 --- a/quiche/src/stream.rs +++ b/quiche/src/stream/mod.rs @@ -29,12 +29,8 @@ use std::cmp; use std::sync::Arc; use std::collections::hash_map; -use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; -use std::collections::VecDeque; - -use std::time; use intrusive_collections::intrusive_adapter; use intrusive_collections::KeyAdapter; @@ -46,17 +42,8 @@ use smallvec::SmallVec; use crate::Error; use crate::Result; -use crate::flowcontrol; -use crate::ranges; - const DEFAULT_URGENCY: u8 = 127; -#[cfg(test)] -const SEND_BUFFER_SIZE: usize = 5; - -#[cfg(not(test))] -const SEND_BUFFER_SIZE: usize = 4096; - // The default size of the receiver stream flow control window. const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024; @@ -662,10 +649,10 @@ impl StreamMap { /// A QUIC stream. pub struct Stream { /// Receive-side stream buffer. - pub recv: RecvBuf, + pub recv: recv_buf::RecvBuf, /// Send-side stream buffer. - pub send: SendBuf, + pub send: send_buf::SendBuf, pub send_lowat: usize, @@ -696,8 +683,8 @@ impl Stream { }); Stream { - recv: RecvBuf::new(max_rx_data, max_window), - send: SendBuf::new(max_tx_data), + recv: recv_buf::RecvBuf::new(max_rx_data, max_window), + send: send_buf::SendBuf::new(max_tx_data), send_lowat: 1, bidi, local, @@ -715,15 +702,16 @@ impl Stream { /// Returns true if the stream has enough flow control capacity to be /// written to, and is not finished. pub fn is_writable(&self) -> bool { - !self.send.shutdown && + !self.send.is_shutdown() && !self.send.is_fin() && - (self.send.off + self.send_lowat as u64) < self.send.max_data + (self.send.off_back() + self.send_lowat as u64) < + self.send.max_off() } /// Returns true if the stream has data to send and is allowed to send at /// least some of it. pub fn is_flushable(&self) -> bool { - self.send.ready() && self.send.off_front() < self.send.max_data + self.send.ready() && self.send.off_front() < self.send.max_off() } /// Returns true if the stream is complete. @@ -750,11 +738,6 @@ impl Stream { (false, false) => self.recv.is_fin(), } } - - /// Returns true if the stream is not storing incoming data. - pub fn is_draining(&self) -> bool { - self.recv.drain - } } /// Returns true if the stream was created locally. @@ -789,1851 +772,259 @@ impl Default for StreamPriorityKey { flushable: Default::default(), } } -} - -impl PartialEq for StreamPriorityKey { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for StreamPriorityKey {} - -impl PartialOrd for StreamPriorityKey { - fn partial_cmp(&self, other: &Self) -> Option { - // Ignore priority if ID matches. - if self.id == other.id { - return Some(std::cmp::Ordering::Equal); - } - - // First, order by urgency... - if self.urgency != other.urgency { - return self.urgency.partial_cmp(&other.urgency); - } - - // ...when the urgency is the same, and both are not incremental, order - // by stream ID... - if !self.incremental && !other.incremental { - return self.id.partial_cmp(&other.id); - } - - // ...non-incremental takes priority over incremental... - if self.incremental && !other.incremental { - return Some(std::cmp::Ordering::Greater); - } - if !self.incremental && other.incremental { - return Some(std::cmp::Ordering::Less); - } - - // ...finally, when both are incremental, `other` takes precedence (so - // `self` is always sorted after other same-urgency incremental - // entries). - Some(std::cmp::Ordering::Greater) - } -} - -impl Ord for StreamPriorityKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // `partial_cmp()` never returns `None`, so this should be safe. - self.partial_cmp(other).unwrap() - } -} - -intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc: StreamPriorityKey { writable: RBTreeAtomicLink }); - -impl<'a> KeyAdapter<'a> for StreamWritablePriorityAdapter { - type Key = StreamPriorityKey; - - fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { - s.clone() - } -} - -intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc: StreamPriorityKey { readable: RBTreeAtomicLink }); - -impl<'a> KeyAdapter<'a> for StreamReadablePriorityAdapter { - type Key = StreamPriorityKey; - - fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { - s.clone() - } -} - -intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc: StreamPriorityKey { flushable: RBTreeAtomicLink }); - -impl<'a> KeyAdapter<'a> for StreamFlushablePriorityAdapter { - type Key = StreamPriorityKey; - - fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { - s.clone() - } -} - -/// An iterator over QUIC streams. -#[derive(Default)] -pub struct StreamIter { - streams: SmallVec<[u64; 8]>, - index: usize, -} - -impl StreamIter { - #[inline] - fn from(streams: &StreamIdHashSet) -> Self { - StreamIter { - streams: streams.iter().copied().collect(), - index: 0, - } - } -} - -impl Iterator for StreamIter { - type Item = u64; - - #[inline] - fn next(&mut self) -> Option { - let v = self.streams.get(self.index)?; - self.index += 1; - Some(*v) - } -} - -impl ExactSizeIterator for StreamIter { - #[inline] - fn len(&self) -> usize { - self.streams.len() - self.index - } -} - -/// Receive-side stream buffer. -/// -/// Stream data received by the peer is buffered in a list of data chunks -/// ordered by offset in ascending order. Contiguous data can then be read -/// into a slice. -#[derive(Debug, Default)] -pub struct RecvBuf { - /// Chunks of data received from the peer that have not yet been read by - /// the application, ordered by offset. - data: BTreeMap, - - /// The lowest data offset that has yet to be read by the application. - off: u64, - - /// The total length of data received on this stream. - len: u64, - - /// Receiver flow controller. - flow_control: flowcontrol::FlowControl, - - /// The final stream offset received from the peer, if any. - fin_off: Option, - - /// The error code received via RESET_STREAM. - error: Option, - - /// Whether incoming data is validated but not buffered. - drain: bool, -} - -impl RecvBuf { - /// Creates a new receive buffer. - fn new(max_data: u64, max_window: u64) -> RecvBuf { - RecvBuf { - flow_control: flowcontrol::FlowControl::new( - max_data, - cmp::min(max_data, DEFAULT_STREAM_WINDOW), - max_window, - ), - ..RecvBuf::default() - } - } - - /// Inserts the given chunk of data in the buffer. - /// - /// This also takes care of enforcing stream flow control limits, as well - /// as handling incoming data that overlaps data that is already in the - /// buffer. - pub fn write(&mut self, buf: RangeBuf) -> Result<()> { - if buf.max_off() > self.max_data() { - return Err(Error::FlowControl); - } - - if let Some(fin_off) = self.fin_off { - // Stream's size is known, forbid data beyond that point. - if buf.max_off() > fin_off { - return Err(Error::FinalSize); - } - - // Stream's size is already known, forbid changing it. - if buf.fin() && fin_off != buf.max_off() { - return Err(Error::FinalSize); - } - } - - // Stream's known size is lower than data already received. - if buf.fin() && buf.max_off() < self.len { - return Err(Error::FinalSize); - } - - // We already saved the final offset, so there's nothing else we - // need to keep from the RangeBuf if it's empty. - if self.fin_off.is_some() && buf.is_empty() { - return Ok(()); - } - - if buf.fin() { - self.fin_off = Some(buf.max_off()); - } - - // No need to store empty buffer that doesn't carry the fin flag. - if !buf.fin() && buf.is_empty() { - return Ok(()); - } - - // Check if data is fully duplicate, that is the buffer's max offset is - // lower or equal to the offset already stored in the recv buffer. - if self.off >= buf.max_off() { - // An exception is applied to empty range buffers, because an empty - // buffer's max offset matches the max offset of the recv buffer. - // - // By this point all spurious empty buffers should have already been - // discarded, so allowing empty buffers here should be safe. - if !buf.is_empty() { - return Ok(()); - } - } - - let mut tmp_bufs = VecDeque::with_capacity(2); - tmp_bufs.push_back(buf); - - 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() { - // Discard incoming data below current stream offset. Bytes up to - // `self.off` have already been received so we should not buffer - // them again. This is also important to make sure `ready()` doesn't - // get stuck when a buffer with lower offset than the stream's is - // buffered. - if self.off_front() > buf.off() { - buf = buf.split_off((self.off_front() - buf.off()) as usize); - } - - // Handle overlapping data. If the incoming data's starting offset - // is above the previous maximum received offset, there is clearly - // no overlap so this logic can be skipped. However do still try to - // merge an empty final buffer (i.e. an empty buffer with the fin - // flag set, which is the only kind of empty buffer that should - // reach this point). - if buf.off() < self.max_off() || buf.is_empty() { - for (_, b) in self.data.range(buf.off()..) { - let off = buf.off(); - - // We are past the current buffer. - if b.off() > buf.max_off() { - break; - } - - // New buffer is fully contained in existing buffer. - if off >= b.off() && buf.max_off() <= b.max_off() { - continue 'tmp; - } - - // New buffer's start overlaps existing buffer. - if off >= b.off() && off < b.max_off() { - buf = buf.split_off((b.max_off() - off) as usize); - } - - // New buffer's end overlaps existing buffer. - if off < b.off() && buf.max_off() > b.off() { - tmp_bufs - .push_back(buf.split_off((b.off() - off) as usize)); - } - } - } - - self.len = cmp::max(self.len, buf.max_off()); - - if !self.drain { - self.data.insert(buf.max_off(), buf); - } - } - - Ok(()) - } - - /// Writes data from the receive buffer into the given output buffer. - /// - /// Only contiguous data is written to the output buffer, starting from - /// offset 0. The offset is incremented as data is read out of the receive - /// buffer into the application buffer. If there is no data at the expected - /// read offset, the `Done` error is returned. - /// - /// On success the amount of data read, and a flag indicating if there is - /// no more data in the buffer, are returned as a tuple. - pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> { - let mut len = 0; - let mut cap = out.len(); - - if !self.ready() { - return Err(Error::Done); - } - - // The stream was reset, so return the error code instead. - if let Some(e) = self.error { - return Err(Error::StreamReset(e)); - } - - while cap > 0 && self.ready() { - let mut entry = match self.data.first_entry() { - Some(entry) => entry, - None => break, - }; - - let buf = entry.get_mut(); - - let buf_len = cmp::min(buf.len(), cap); - - out[len..len + buf_len].copy_from_slice(&buf[..buf_len]); - - self.off += buf_len as u64; - - len += buf_len; - cap -= buf_len; - - if buf_len < buf.len() { - buf.consume(buf_len); - - // We reached the maximum capacity, so end here. - break; - } - - entry.remove(); - } - - // Update consumed bytes for flow control. - self.flow_control.add_consumed(len as u64); - - Ok((len, self.is_fin())) - } - - /// Resets the stream at the given offset. - pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result { - // Stream's size is already known, forbid changing it. - if let Some(fin_off) = self.fin_off { - if fin_off != final_size { - return Err(Error::FinalSize); - } - } - - // Stream's known size is lower than data already received. - if final_size < self.len { - return Err(Error::FinalSize); - } - - // Calculate how many bytes need to be removed from the connection flow - // control. - let max_data_delta = final_size - self.len; - - if self.error.is_some() { - return Ok(max_data_delta as usize); - } - - self.error = Some(error_code); - - // Clear all data already buffered. - self.off = final_size; - - self.data.clear(); - - // In order to ensure the application is notified when the stream is - // reset, enqueue a zero-length buffer at the final size offset. - let buf = RangeBuf::from(b"", final_size, true); - self.write(buf)?; - - Ok(max_data_delta as usize) - } - - /// Commits the new max_data limit. - pub fn update_max_data(&mut self, now: time::Instant) { - self.flow_control.update_max_data(now); - } - - /// Return the new max_data limit. - pub fn max_data_next(&mut self) -> u64 { - self.flow_control.max_data_next() - } - - /// Return the current flow control limit. - fn max_data(&self) -> u64 { - self.flow_control.max_data() - } - - /// Return the current window. - pub fn window(&self) -> u64 { - self.flow_control.window() - } - - /// Autotune the window size. - pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) { - self.flow_control.autotune_window(now, rtt); - } - - /// Shuts down receiving data. - pub fn shutdown(&mut self) -> Result<()> { - if self.drain { - return Err(Error::Done); - } - - self.drain = true; - - self.data.clear(); - - self.off = self.max_off(); - - Ok(()) - } - - /// Returns the lowest offset of data buffered. - pub fn off_front(&self) -> u64 { - self.off - } - - /// Returns true if we need to update the local flow control limit. - pub fn almost_full(&self) -> bool { - self.fin_off.is_none() && self.flow_control.should_update_max_data() - } - - /// Returns the largest offset ever received. - pub fn max_off(&self) -> u64 { - self.len - } - - /// Returns true if the receive-side of the stream is complete. - /// - /// This happens when the stream's receive final size is known, and the - /// application has read all data from the stream. - pub fn is_fin(&self) -> bool { - if self.fin_off == Some(self.off) { - return true; - } - - false - } - - /// Returns true if the stream has data to be read. - fn ready(&self) -> bool { - let (_, buf) = match self.data.first_key_value() { - Some(v) => v, - None => return false, - }; - - buf.off() == self.off - } -} - -/// Send-side stream buffer. -/// -/// Stream data scheduled to be sent to the peer is buffered in a list of data -/// chunks ordered by offset in ascending order. Contiguous data can then be -/// read into a slice. -/// -/// By default, new data is appended at the end of the stream, but data can be -/// inserted at the start of the buffer (this is to allow data that needs to be -/// retransmitted to be re-buffered). -#[derive(Debug, Default)] -pub struct SendBuf { - /// Chunks of data to be sent, ordered by offset. - data: VecDeque, - - /// The index of the buffer that needs to be sent next. - pos: usize, - - /// The maximum offset of data buffered in the stream. - off: u64, - - /// The maximum offset of data sent to the peer, regardless of - /// retransmissions. - emit_off: u64, - - /// The amount of data currently buffered. - len: u64, - - /// The maximum offset we are allowed to send to the peer. - max_data: u64, - - /// The last offset the stream was blocked at, if any. - blocked_at: Option, - - /// The final stream offset written to the stream, if any. - fin_off: Option, - - /// Whether the stream's send-side has been shut down. - shutdown: bool, - - /// Ranges of data offsets that have been acked. - acked: ranges::RangeSet, - - /// The error code received via STOP_SENDING. - error: Option, -} - -impl SendBuf { - /// Creates a new send buffer. - fn new(max_data: u64) -> SendBuf { - SendBuf { - max_data, - ..SendBuf::default() - } - } - - /// Inserts the given slice of data at the end of the buffer. - /// - /// The number of bytes that were actually stored in the buffer is returned - /// (this may be lower than the size of the input buffer, in case of partial - /// writes). - pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result { - let max_off = self.off + data.len() as u64; - - // Get the stream send capacity. This will return an error if the stream - // was stopped. - let capacity = self.cap()?; - - if data.len() > capacity { - // Truncate the input buffer according to the stream's capacity. - let len = capacity; - data = &data[..len]; - - // We are not buffering the full input, so clear the fin flag. - fin = false; - } - - if let Some(fin_off) = self.fin_off { - // Can't write past final offset. - if max_off > fin_off { - return Err(Error::FinalSize); - } - - // Can't "undo" final offset. - if max_off == fin_off && !fin { - return Err(Error::FinalSize); - } - } - - if fin { - self.fin_off = Some(max_off); - } - - // Don't queue data that was already fully acked. - if self.ack_off() >= max_off { - return Ok(data.len()); - } - - // We already recorded the final offset, so we can just discard the - // empty buffer now. - if data.is_empty() { - return Ok(data.len()); - } - - let mut len = 0; - - // Split the remaining input data into consistently-sized buffers to - // avoid fragmentation. - for chunk in data.chunks(SEND_BUFFER_SIZE) { - len += chunk.len(); - - let fin = len == data.len() && fin; - - let buf = RangeBuf::from(chunk, self.off, fin); - - // The new data can simply be appended at the end of the send buffer. - self.data.push_back(buf); - - self.off += chunk.len() as u64; - self.len += chunk.len() as u64; - } - - Ok(len) - } - - /// Writes data from the send buffer into the given output buffer. - pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> { - let mut out_len = out.len(); - let out_off = self.off_front(); - - let mut next_off = out_off; - - while out_len > 0 && - self.ready() && - self.off_front() == next_off && - self.off_front() < self.max_data - { - let buf = match self.data.get_mut(self.pos) { - Some(v) => v, - - None => break, - }; - - if buf.is_empty() { - self.pos += 1; - continue; - } - - let buf_len = cmp::min(buf.len(), out_len); - let partial = buf_len < buf.len(); - - // Copy data to the output buffer. - let out_pos = (next_off - out_off) as usize; - out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]); - - self.len -= buf_len as u64; - - out_len -= buf_len; - - next_off = buf.off() + buf_len as u64; - - buf.consume(buf_len); - - if partial { - // We reached the maximum capacity, so end here. - break; - } - - self.pos += 1; - } - - // Override the `fin` flag set for the output buffer by matching the - // buffer's maximum offset against the stream's final offset (if known). - // - // This is more efficient than tracking `fin` using the range buffers - // themselves, and lets us avoid queueing empty buffers just so we can - // propagate the final size. - let fin = self.fin_off == Some(next_off); - - // Record the largest offset that has been sent so we can accurately - // report final_size - self.emit_off = cmp::max(self.emit_off, next_off); - - Ok((out.len() - out_len, fin)) - } - - /// Updates the max_data limit to the given value. - pub fn update_max_data(&mut self, max_data: u64) { - self.max_data = cmp::max(self.max_data, max_data); - } - - /// Updates the last offset the stream was blocked at, if any. - pub fn update_blocked_at(&mut self, blocked_at: Option) { - self.blocked_at = blocked_at; - } - - /// The last offset the stream was blocked at, if any. - pub fn blocked_at(&self) -> Option { - self.blocked_at - } - - /// Increments the acked data offset. - pub fn ack(&mut self, off: u64, len: usize) { - self.acked.insert(off..off + len as u64); - } - - pub fn ack_and_drop(&mut self, off: u64, len: usize) { - self.ack(off, len); - - let ack_off = self.ack_off(); - - if self.data.is_empty() { - return; - } - - if off > ack_off { - return; - } - - let mut drop_until = None; - - // Drop contiguously acked data from the front of the buffer. - for (i, buf) in self.data.iter_mut().enumerate() { - // Newly acked range is past highest contiguous acked range, so we - // can't drop it. - if buf.off >= ack_off { - break; - } - - // Highest contiguous acked range falls within newly acked range, - // so we can't drop it. - if buf.off < ack_off && ack_off < buf.max_off() { - break; - } - - // Newly acked range can be dropped. - drop_until = Some(i); - } - - if let Some(drop) = drop_until { - self.data.drain(..=drop); - - // When a buffer is marked for retransmission, but then acked before - // it could be retransmitted, we might end up decreasing the SendBuf - // position too much, so make sure that doesn't happen. - self.pos = self.pos.saturating_sub(drop + 1); - } - } - - pub fn retransmit(&mut self, off: u64, len: usize) { - let max_off = off + len as u64; - let ack_off = self.ack_off(); - - if self.data.is_empty() { - return; - } - - if max_off <= ack_off { - return; - } - - for i in 0..self.data.len() { - let buf = &mut self.data[i]; - - if buf.off >= max_off { - break; - } - - if off > buf.max_off() { - continue; - } - - // Split the buffer into 2 if the retransmit range ends before the - // buffer's final offset. - let new_buf = if buf.off < max_off && max_off < buf.max_off() { - Some(buf.split_off((max_off - buf.off) as usize)) - } else { - None - }; - - let prev_pos = buf.pos; - - // Reduce the buffer's position (expand the buffer) if the retransmit - // range is past the buffer's starting offset. - buf.pos = if off > buf.off && off <= buf.max_off() { - cmp::min(buf.pos, buf.start + (off - buf.off) as usize) - } else { - buf.start - }; - - self.pos = cmp::min(self.pos, i); - - self.len += (prev_pos - buf.pos) as u64; - - if let Some(b) = new_buf { - self.data.insert(i + 1, b); - } - } - } - - /// Resets the stream at the current offset and clears all buffered data. - pub fn reset(&mut self) -> (u64, u64) { - let unsent_off = cmp::max(self.off_front(), self.emit_off); - let unsent_len = self.off_back().saturating_sub(unsent_off); - - self.fin_off = Some(unsent_off); - - // Drop all buffered data. - self.data.clear(); - - // Mark all data as acked. - self.ack(0, self.off as usize); - - self.pos = 0; - self.len = 0; - self.off = unsent_off; - - (self.emit_off, unsent_len) - } - - /// Resets the streams and records the received error code. - /// - /// Calling this again after the first time has no effect. - pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> { - if self.error.is_some() { - return Err(Error::Done); - } - - let (max_off, unsent) = self.reset(); - - self.error = Some(error_code); - - Ok((max_off, unsent)) - } - - /// Shuts down sending data. - pub fn shutdown(&mut self) -> Result<(u64, u64)> { - if self.shutdown { - return Err(Error::Done); - } - - self.shutdown = true; - - Ok(self.reset()) - } - - /// Returns the largest offset of data buffered. - pub fn off_back(&self) -> u64 { - self.off - } - - /// Returns the lowest offset of data buffered. - pub fn off_front(&self) -> u64 { - let mut pos = self.pos; - - // Skip empty buffers from the start of the queue. - while let Some(b) = self.data.get(pos) { - if !b.is_empty() { - return b.off(); - } - - pos += 1; - } - - self.off - } - - /// The maximum offset we are allowed to send to the peer. - pub fn max_off(&self) -> u64 { - self.max_data - } - - /// Returns true if all data in the stream has been sent. - /// - /// This happens when the stream's send final size is known, and the - /// application has already written data up to that point. - pub fn is_fin(&self) -> bool { - if self.fin_off == Some(self.off) { - return true; - } - - false - } - - /// Returns true if the send-side of the stream is complete. - /// - /// This happens when the stream's send final size is known, and the peer - /// has already acked all stream data up to that point. - pub fn is_complete(&self) -> bool { - if let Some(fin_off) = self.fin_off { - if self.acked == (0..fin_off) { - return true; - } - } - - false - } - - /// Returns true if the stream was stopped before completion. - pub fn is_stopped(&self) -> bool { - self.error.is_some() - } - - /// Returns true if there is data to be written. - fn ready(&self) -> bool { - !self.data.is_empty() && self.off_front() < self.off - } - - /// Returns the highest contiguously acked offset. - fn ack_off(&self) -> u64 { - match self.acked.iter().next() { - // Only consider the initial range if it contiguously covers the - // start of the stream (i.e. from offset 0). - Some(std::ops::Range { start: 0, end }) => end, - - Some(_) | None => 0, - } - } - - /// Returns the outgoing flow control capacity. - pub fn cap(&self) -> Result { - // The stream was stopped, so return the error code instead. - if let Some(e) = self.error { - return Err(Error::StreamStopped(e)); - } - - Ok((self.max_data - self.off) as usize) - } -} - -/// Buffer holding data at a specific offset. -/// -/// The data is stored in a `Vec` in such a way that it can be shared -/// between multiple `RangeBuf` objects. -/// -/// Each `RangeBuf` will have its own view of that buffer, where the `start` -/// value indicates the initial offset within the `Vec`, and `len` indicates the -/// number of bytes, starting from `start` that are included. -/// -/// In addition, `pos` indicates the current offset within the `Vec`, starting -/// from the very beginning of the `Vec`. -/// -/// Finally, `off` is the starting offset for the specific `RangeBuf` within the -/// stream the buffer belongs to. -#[derive(Clone, Debug, Default, Eq)] -pub struct RangeBuf { - /// The internal buffer holding the data. - /// - /// To avoid needless allocations when a RangeBuf is split, this field is - /// reference-counted and can be shared between multiple RangeBuf objects, - /// and sliced using the `start` and `len` values. - data: Arc>, - - /// The initial offset within the internal buffer. - start: usize, - - /// The current offset within the internal buffer. - pos: usize, - - /// The number of bytes in the buffer, from the initial offset. - len: usize, - - /// The offset of the buffer within a stream. - off: u64, - - /// Whether this contains the final byte in the stream. - fin: bool, -} - -impl RangeBuf { - /// Creates a new `RangeBuf` from the given slice. - pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf { - RangeBuf { - data: Arc::new(Vec::from(buf)), - start: 0, - pos: 0, - len: buf.len(), - off, - fin, - } - } - - /// Returns whether `self` holds the final offset in the stream. - pub fn fin(&self) -> bool { - self.fin - } - - /// Returns the starting offset of `self`. - pub fn off(&self) -> u64 { - (self.off - self.start as u64) + self.pos as u64 - } - - /// Returns the final offset of `self`. - pub fn max_off(&self) -> u64 { - self.off() + self.len() as u64 - } - - /// Returns the length of `self`. - pub fn len(&self) -> usize { - self.len - (self.pos - self.start) - } - - /// Returns true if `self` has a length of zero bytes. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Consumes the starting `count` bytes of `self`. - pub fn consume(&mut self, count: usize) { - self.pos += count; - } - - /// Splits the buffer into two at the given index. - pub fn split_off(&mut self, at: usize) -> RangeBuf { - assert!( - at <= self.len, - "`at` split index (is {}) should be <= len (is {})", - at, - self.len - ); - - let buf = RangeBuf { - data: self.data.clone(), - start: self.start + at, - pos: cmp::max(self.pos, self.start + at), - len: self.len - at, - off: self.off + at as u64, - fin: self.fin, - }; - - self.pos = cmp::min(self.pos, self.start + at); - self.len = at; - self.fin = false; - - buf - } -} - -impl std::ops::Deref for RangeBuf { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &self.data[self.pos..self.start + self.len] - } -} - -impl Ord for RangeBuf { - fn cmp(&self, other: &RangeBuf) -> cmp::Ordering { - // Invert ordering to implement min-heap. - self.off.cmp(&other.off).reverse() - } -} - -impl PartialOrd for RangeBuf { - fn partial_cmp(&self, other: &RangeBuf) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for RangeBuf { - fn eq(&self, other: &RangeBuf) -> bool { - self.off == other.off - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn empty_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn empty_stream_frame() { - let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let buf = RangeBuf::from(b"hello", 0, false); - assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - let mut buf = [0; 32]; - assert_eq!(recv.emit(&mut buf), Ok((5, false))); - - // Don't store non-fin empty buffer. - let buf = RangeBuf::from(b"", 10, false); - assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 5); - assert_eq!(recv.data.len(), 0); - - // Check flow control for empty buffer. - let buf = RangeBuf::from(b"", 16, false); - assert_eq!(recv.write(buf), Err(Error::FlowControl)); - - // Store fin empty buffer. - let buf = RangeBuf::from(b"", 5, true); - assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 5); - assert_eq!(recv.data.len(), 1); - - // Don't store additional fin empty buffers. - let buf = RangeBuf::from(b"", 5, true); - assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 5); - assert_eq!(recv.data.len(), 1); - - // Don't store additional fin non-empty buffers. - let buf = RangeBuf::from(b"aa", 3, true); - assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 5); - assert_eq!(recv.data.len(), 1); - - // Validate final size with fin empty buffers. - let buf = RangeBuf::from(b"", 6, true); - assert_eq!(recv.write(buf), Err(Error::FinalSize)); - let buf = RangeBuf::from(b"", 4, true); - assert_eq!(recv.write(buf), Err(Error::FinalSize)); - - let mut buf = [0; 32]; - assert_eq!(recv.emit(&mut buf), Ok((0, true))); - } - - #[test] - fn ordered_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"hello", 0, false); - let second = RangeBuf::from(b"world", 5, false); - let third = RangeBuf::from(b"something", 10, true); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 10); - assert_eq!(recv.off, 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 0); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 19); - assert!(fin); - assert_eq!(&buf[..len], b"helloworldsomething"); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 19); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn split_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"helloworld", 9, true); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 0); - - let (len, fin) = recv.emit(&mut buf[..10]).unwrap(); - assert_eq!(len, 10); - assert!(!fin); - assert_eq!(&buf[..len], b"somethingh"); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 10); - - let (len, fin) = recv.emit(&mut buf[..5]).unwrap(); - assert_eq!(len, 5); - assert!(!fin); - assert_eq!(&buf[..len], b"ellow"); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 15); - - let (len, fin) = recv.emit(&mut buf[..10]).unwrap(); - assert_eq!(len, 4); - assert!(fin); - assert_eq!(&buf[..len], b"orld"); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 19); - } - - #[test] - fn incomplete_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"helloworld", 9, true); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 0); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 19); - assert!(fin); - assert_eq!(&buf[..len], b"somethinghelloworld"); - assert_eq!(recv.len, 19); - assert_eq!(recv.off, 19); - } - - #[test] - fn zero_len_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"", 9, true); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 9); - assert!(fin); - assert_eq!(&buf[..len], b"something"); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - } - - #[test] - fn past_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"hello", 3, false); - let third = RangeBuf::from(b"ello", 4, true); - let fourth = RangeBuf::from(b"ello", 5, true); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 9); - assert!(!fin); - assert_eq!(&buf[..len], b"something"); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.write(third), Err(Error::FinalSize)); - - assert!(recv.write(fourth).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn fully_overlapping_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"hello", 4, false); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 9); - assert!(!fin); - assert_eq!(&buf[..len], b"something"); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn fully_overlapping_read2() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"hello", 4, false); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 9); - assert!(!fin); - assert_eq!(&buf[..len], b"somehello"); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn fully_overlapping_read3() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"hello", 3, false); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 8); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 3); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 9); - assert!(!fin); - assert_eq!(&buf[..len], b"somhellog"); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 9); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn fully_overlapping_read_multi() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"somethingsomething", 0, false); - let second = RangeBuf::from(b"hello", 3, false); - let third = RangeBuf::from(b"hello", 12, false); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 8); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 17); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 18); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 5); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 18); - assert!(!fin); - assert_eq!(&buf[..len], b"somhellogsomhellog"); - assert_eq!(recv.len, 18); - assert_eq!(recv.off, 18); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn overlapping_start_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"something", 0, false); - let second = RangeBuf::from(b"hello", 8, true); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 13); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 13); - assert!(fin); - assert_eq!(&buf[..len], b"somethingello"); - assert_eq!(recv.len, 13); - assert_eq!(recv.off, 13); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn overlapping_end_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"hello", 0, false); - let second = RangeBuf::from(b"something", 3, true); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 12); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 12); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 12); - assert!(fin); - assert_eq!(&buf[..len], b"helsomething"); - assert_eq!(recv.len, 12); - assert_eq!(recv.off, 12); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn overlapping_end_twice_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"he", 0, false); - let second = RangeBuf::from(b"ow", 4, false); - let third = RangeBuf::from(b"rl", 7, false); - let fourth = RangeBuf::from(b"helloworld", 0, true); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 3); - - assert!(recv.write(fourth).is_ok()); - assert_eq!(recv.len, 10); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 6); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 10); - assert!(fin); - assert_eq!(&buf[..len], b"helloworld"); - assert_eq!(recv.len, 10); - assert_eq!(recv.off, 10); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn overlapping_end_twice_and_contained_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"hellow", 0, false); - let second = RangeBuf::from(b"barfoo", 10, true); - let third = RangeBuf::from(b"rl", 7, false); - let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 16); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 16); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 3); - - assert!(recv.write(fourth).is_ok()); - assert_eq!(recv.len, 16); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 5); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 16); - assert!(fin); - assert_eq!(&buf[..len], b"helloworldbarfoo"); - assert_eq!(recv.len, 16); - assert_eq!(recv.off, 16); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn partially_multi_overlapping_reordered_read() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"hello", 8, false); - let second = RangeBuf::from(b"something", 0, false); - let third = RangeBuf::from(b"moar", 11, true); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 13); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 13); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 15); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 3); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 15); - assert!(fin); - assert_eq!(&buf[..len], b"somethinhelloar"); - assert_eq!(recv.len, 15); - assert_eq!(recv.off, 15); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn partially_multi_overlapping_reordered_read2() { - let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); - assert_eq!(recv.len, 0); - - let mut buf = [0; 32]; - - let first = RangeBuf::from(b"aaa", 0, false); - let second = RangeBuf::from(b"bbb", 2, false); - let third = RangeBuf::from(b"ccc", 4, false); - let fourth = RangeBuf::from(b"ddd", 6, false); - let fifth = RangeBuf::from(b"eee", 9, false); - let sixth = RangeBuf::from(b"fff", 11, false); - - assert!(recv.write(second).is_ok()); - assert_eq!(recv.len, 5); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 1); - - assert!(recv.write(fourth).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 2); - - assert!(recv.write(third).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 3); - - assert!(recv.write(first).is_ok()); - assert_eq!(recv.len, 9); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 4); - - assert!(recv.write(sixth).is_ok()); - assert_eq!(recv.len, 14); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 5); - - assert!(recv.write(fifth).is_ok()); - assert_eq!(recv.len, 14); - assert_eq!(recv.off, 0); - assert_eq!(recv.data.len(), 6); - - let (len, fin) = recv.emit(&mut buf).unwrap(); - assert_eq!(len, 14); - assert!(!fin); - assert_eq!(&buf[..len], b"aabbbcdddeefff"); - assert_eq!(recv.len, 14); - assert_eq!(recv.off, 14); - assert_eq!(recv.data.len(), 0); - - assert_eq!(recv.emit(&mut buf), Err(Error::Done)); - } - - #[test] - fn empty_write() { - let mut buf = [0; 5]; - - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); - - let (written, fin) = send.emit(&mut buf).unwrap(); - assert_eq!(written, 0); - assert!(!fin); - } - - #[test] - fn multi_write() { - let mut buf = [0; 128]; - - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); - - let first = b"something"; - let second = b"helloworld"; - - assert!(send.write(first, false).is_ok()); - assert_eq!(send.len, 9); - - assert!(send.write(second, true).is_ok()); - assert_eq!(send.len, 19); - - let (written, fin) = send.emit(&mut buf[..128]).unwrap(); - assert_eq!(written, 19); - assert!(fin); - assert_eq!(&buf[..written], b"somethinghelloworld"); - assert_eq!(send.len, 0); - } - - #[test] - fn split_write() { - let mut buf = [0; 10]; - - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); - - let first = b"something"; - let second = b"helloworld"; - - assert!(send.write(first, false).is_ok()); - assert_eq!(send.len, 9); - - assert!(send.write(second, true).is_ok()); - assert_eq!(send.len, 19); - - assert_eq!(send.off_front(), 0); - - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 10); - assert!(!fin); - assert_eq!(&buf[..written], b"somethingh"); - assert_eq!(send.len, 9); - - assert_eq!(send.off_front(), 10); - - let (written, fin) = send.emit(&mut buf[..5]).unwrap(); - assert_eq!(written, 5); - assert!(!fin); - assert_eq!(&buf[..written], b"ellow"); - assert_eq!(send.len, 4); - - assert_eq!(send.off_front(), 15); - - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 4); - assert!(fin); - assert_eq!(&buf[..written], b"orld"); - assert_eq!(send.len, 0); +} - assert_eq!(send.off_front(), 19); +impl PartialEq for StreamPriorityKey { + fn eq(&self, other: &Self) -> bool { + self.id == other.id } +} - #[test] - fn resend() { - let mut buf = [0; 15]; +impl Eq for StreamPriorityKey {} - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); - assert_eq!(send.off_front(), 0); +impl PartialOrd for StreamPriorityKey { + fn partial_cmp(&self, other: &Self) -> Option { + // Ignore priority if ID matches. + if self.id == other.id { + return Some(std::cmp::Ordering::Equal); + } - let first = b"something"; - let second = b"helloworld"; + // First, order by urgency... + if self.urgency != other.urgency { + return self.urgency.partial_cmp(&other.urgency); + } - assert!(send.write(first, false).is_ok()); - assert_eq!(send.off_front(), 0); + // ...when the urgency is the same, and both are not incremental, order + // by stream ID... + if !self.incremental && !other.incremental { + return self.id.partial_cmp(&other.id); + } - assert!(send.write(second, true).is_ok()); - assert_eq!(send.off_front(), 0); + // ...non-incremental takes priority over incremental... + if self.incremental && !other.incremental { + return Some(std::cmp::Ordering::Greater); + } + if !self.incremental && other.incremental { + return Some(std::cmp::Ordering::Less); + } - assert_eq!(send.len, 19); + // ...finally, when both are incremental, `other` takes precedence (so + // `self` is always sorted after other same-urgency incremental + // entries). + Some(std::cmp::Ordering::Greater) + } +} - let (written, fin) = send.emit(&mut buf[..4]).unwrap(); - assert_eq!(written, 4); - assert!(!fin); - assert_eq!(&buf[..written], b"some"); - assert_eq!(send.len, 15); - assert_eq!(send.off_front(), 4); +impl Ord for StreamPriorityKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // `partial_cmp()` never returns `None`, so this should be safe. + self.partial_cmp(other).unwrap() + } +} - let (written, fin) = send.emit(&mut buf[..5]).unwrap(); - assert_eq!(written, 5); - assert!(!fin); - assert_eq!(&buf[..written], b"thing"); - assert_eq!(send.len, 10); - assert_eq!(send.off_front(), 9); +intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc: StreamPriorityKey { writable: RBTreeAtomicLink }); - let (written, fin) = send.emit(&mut buf[..5]).unwrap(); - assert_eq!(written, 5); - assert!(!fin); - assert_eq!(&buf[..written], b"hello"); - assert_eq!(send.len, 5); - assert_eq!(send.off_front(), 14); +impl<'a> KeyAdapter<'a> for StreamWritablePriorityAdapter { + type Key = StreamPriorityKey; - send.retransmit(4, 5); - assert_eq!(send.len, 10); - assert_eq!(send.off_front(), 4); + fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { + s.clone() + } +} - send.retransmit(0, 4); - assert_eq!(send.len, 14); - assert_eq!(send.off_front(), 0); +intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc: StreamPriorityKey { readable: RBTreeAtomicLink }); - let (written, fin) = send.emit(&mut buf[..11]).unwrap(); - assert_eq!(written, 9); - assert!(!fin); - assert_eq!(&buf[..written], b"something"); - assert_eq!(send.len, 5); - assert_eq!(send.off_front(), 14); +impl<'a> KeyAdapter<'a> for StreamReadablePriorityAdapter { + type Key = StreamPriorityKey; - let (written, fin) = send.emit(&mut buf[..11]).unwrap(); - assert_eq!(written, 5); - assert!(fin); - assert_eq!(&buf[..written], b"world"); - assert_eq!(send.len, 0); - assert_eq!(send.off_front(), 19); + fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { + s.clone() } +} - #[test] - fn write_blocked_by_off() { - let mut buf = [0; 10]; +intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc: StreamPriorityKey { flushable: RBTreeAtomicLink }); - let mut send = SendBuf::default(); - assert_eq!(send.len, 0); +impl<'a> KeyAdapter<'a> for StreamFlushablePriorityAdapter { + type Key = StreamPriorityKey; - let first = b"something"; - let second = b"helloworld"; + fn get_key(&self, s: &StreamPriorityKey) -> Self::Key { + s.clone() + } +} - assert_eq!(send.write(first, false), Ok(0)); - assert_eq!(send.len, 0); +/// An iterator over QUIC streams. +#[derive(Default)] +pub struct StreamIter { + streams: SmallVec<[u64; 8]>, + index: usize, +} - assert_eq!(send.write(second, true), Ok(0)); - assert_eq!(send.len, 0); +impl StreamIter { + #[inline] + fn from(streams: &StreamIdHashSet) -> Self { + StreamIter { + streams: streams.iter().copied().collect(), + index: 0, + } + } +} - send.update_max_data(5); +impl Iterator for StreamIter { + type Item = u64; - assert_eq!(send.write(first, false), Ok(5)); - assert_eq!(send.len, 5); + #[inline] + fn next(&mut self) -> Option { + let v = self.streams.get(self.index)?; + self.index += 1; + Some(*v) + } +} - assert_eq!(send.write(second, true), Ok(0)); - assert_eq!(send.len, 5); +impl ExactSizeIterator for StreamIter { + #[inline] + fn len(&self) -> usize { + self.streams.len() - self.index + } +} - assert_eq!(send.off_front(), 0); +/// Buffer holding data at a specific offset. +/// +/// The data is stored in a `Vec` in such a way that it can be shared +/// between multiple `RangeBuf` objects. +/// +/// Each `RangeBuf` will have its own view of that buffer, where the `start` +/// value indicates the initial offset within the `Vec`, and `len` indicates the +/// number of bytes, starting from `start` that are included. +/// +/// In addition, `pos` indicates the current offset within the `Vec`, starting +/// from the very beginning of the `Vec`. +/// +/// Finally, `off` is the starting offset for the specific `RangeBuf` within the +/// stream the buffer belongs to. +#[derive(Clone, Debug, Default, Eq)] +pub struct RangeBuf { + /// The internal buffer holding the data. + /// + /// To avoid needless allocations when a RangeBuf is split, this field is + /// reference-counted and can be shared between multiple RangeBuf objects, + /// and sliced using the `start` and `len` values. + data: Arc>, - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 5); - assert!(!fin); - assert_eq!(&buf[..written], b"somet"); - assert_eq!(send.len, 0); + /// The initial offset within the internal buffer. + start: usize, - assert_eq!(send.off_front(), 5); + /// The current offset within the internal buffer. + pos: usize, - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 0); - assert!(!fin); - assert_eq!(&buf[..written], b""); - assert_eq!(send.len, 0); + /// The number of bytes in the buffer, from the initial offset. + len: usize, - send.update_max_data(15); + /// The offset of the buffer within a stream. + off: u64, - assert_eq!(send.write(&first[5..], false), Ok(4)); - assert_eq!(send.len, 4); + /// Whether this contains the final byte in the stream. + fin: bool, +} - assert_eq!(send.write(second, true), Ok(6)); - assert_eq!(send.len, 10); +impl RangeBuf { + /// Creates a new `RangeBuf` from the given slice. + pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf { + RangeBuf { + data: Arc::new(Vec::from(buf)), + start: 0, + pos: 0, + len: buf.len(), + off, + fin, + } + } - assert_eq!(send.off_front(), 5); + /// Returns whether `self` holds the final offset in the stream. + pub fn fin(&self) -> bool { + self.fin + } - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 10); - assert!(!fin); - assert_eq!(&buf[..10], b"hinghellow"); - assert_eq!(send.len, 0); + /// Returns the starting offset of `self`. + pub fn off(&self) -> u64 { + (self.off - self.start as u64) + self.pos as u64 + } - send.update_max_data(25); + /// Returns the final offset of `self`. + pub fn max_off(&self) -> u64 { + self.off() + self.len() as u64 + } - assert_eq!(send.write(&second[6..], true), Ok(4)); - assert_eq!(send.len, 4); + /// Returns the length of `self`. + pub fn len(&self) -> usize { + self.len - (self.pos - self.start) + } - assert_eq!(send.off_front(), 15); + /// Returns true if `self` has a length of zero bytes. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 4); - assert!(fin); - assert_eq!(&buf[..written], b"orld"); - assert_eq!(send.len, 0); + /// Consumes the starting `count` bytes of `self`. + pub fn consume(&mut self, count: usize) { + self.pos += count; } - #[test] - fn zero_len_write() { - let mut buf = [0; 10]; + /// Splits the buffer into two at the given index. + pub fn split_off(&mut self, at: usize) -> RangeBuf { + assert!( + at <= self.len, + "`at` split index (is {}) should be <= len (is {})", + at, + self.len + ); + + let buf = RangeBuf { + data: self.data.clone(), + start: self.start + at, + pos: cmp::max(self.pos, self.start + at), + len: self.len - at, + off: self.off + at as u64, + fin: self.fin, + }; - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); + self.pos = cmp::min(self.pos, self.start + at); + self.len = at; + self.fin = false; - let first = b"something"; + buf + } +} - assert!(send.write(first, false).is_ok()); - assert_eq!(send.len, 9); +impl std::ops::Deref for RangeBuf { + type Target = [u8]; - assert!(send.write(&[], true).is_ok()); - assert_eq!(send.len, 9); + fn deref(&self) -> &[u8] { + &self.data[self.pos..self.start + self.len] + } +} - assert_eq!(send.off_front(), 0); +impl Ord for RangeBuf { + fn cmp(&self, other: &RangeBuf) -> cmp::Ordering { + // Invert ordering to implement min-heap. + self.off.cmp(&other.off).reverse() + } +} - let (written, fin) = send.emit(&mut buf[..10]).unwrap(); - assert_eq!(written, 9); - assert!(fin); - assert_eq!(&buf[..written], b"something"); - assert_eq!(send.len, 0); +impl PartialOrd for RangeBuf { + fn partial_cmp(&self, other: &RangeBuf) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for RangeBuf { + fn eq(&self, other: &RangeBuf) -> bool { + self.off == other.off } +} + +#[cfg(test)] +mod tests { + use super::*; #[test] fn recv_flow_control() { @@ -2658,7 +1049,7 @@ mod tests { assert!(stream.recv.almost_full()); - stream.recv.update_max_data(time::Instant::now()); + stream.recv.update_max_data(std::time::Instant::now()); assert_eq!(stream.recv.max_data_next(), 25); assert!(!stream.recv.almost_full()); @@ -3072,7 +1463,7 @@ mod tests { assert_eq!(stream.send.write(b"olleh", false), Ok(5)); assert_eq!(stream.send.write(b"dlrow", true), Ok(5)); assert_eq!(stream.send.off_front(), 0); - assert_eq!(stream.send.data.len(), 4); + assert_eq!(stream.send.bufs_count(), 4); assert!(stream.is_flushable()); @@ -3124,7 +1515,7 @@ mod tests { assert_eq!(stream.send.write(b"olleh", false), Ok(5)); assert_eq!(stream.send.write(b"dlrow", true), Ok(5)); assert_eq!(stream.send.off_front(), 0); - assert_eq!(stream.send.data.len(), 4); + assert_eq!(stream.send.bufs_count(), 4); assert!(stream.is_flushable()); @@ -3139,7 +1530,7 @@ mod tests { assert_eq!(&buf[..4], b"owor"); stream.send.ack_and_drop(0, 5); - assert_eq!(stream.send.data.len(), 3); + assert_eq!(stream.send.bufs_count(), 3); assert!(stream.send.ready()); assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false))); @@ -3147,7 +1538,7 @@ mod tests { assert_eq!(&buf[..2], b"ld"); stream.send.ack_and_drop(7, 5); - assert_eq!(stream.send.data.len(), 3); + assert_eq!(stream.send.bufs_count(), 3); assert!(stream.send.ready()); assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false))); @@ -3160,7 +1551,7 @@ mod tests { assert_eq!(&buf[..5], b"llehd"); stream.send.ack_and_drop(5, 7); - assert_eq!(stream.send.data.len(), 2); + assert_eq!(stream.send.bufs_count(), 2); assert!(stream.send.ready()); assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true))); @@ -3174,10 +1565,10 @@ mod tests { assert_eq!(stream.send.off_front(), 20); stream.send.ack_and_drop(22, 4); - assert_eq!(stream.send.data.len(), 2); + assert_eq!(stream.send.bufs_count(), 2); stream.send.ack_and_drop(20, 1); - assert_eq!(stream.send.data.len(), 2); + assert_eq!(stream.send.bufs_count(), 2); } #[test] @@ -3191,7 +1582,7 @@ mod tests { assert_eq!(stream.send.write(b"olleh", false), Ok(5)); assert_eq!(stream.send.write(b"dlrow", true), Ok(5)); assert_eq!(stream.send.off_front(), 0); - assert_eq!(stream.send.data.len(), 4); + assert_eq!(stream.send.bufs_count(), 4); assert!(stream.is_flushable()); @@ -3470,59 +1861,6 @@ mod tests { ); } - /// Check SendBuf::len calculation on a retransmit case - #[test] - fn send_buf_len_on_retransmit() { - let mut buf = [0; 15]; - - let mut send = SendBuf::new(u64::MAX); - assert_eq!(send.len, 0); - assert_eq!(send.off_front(), 0); - - let first = b"something"; - - assert!(send.write(first, false).is_ok()); - assert_eq!(send.off_front(), 0); - - assert_eq!(send.len, 9); - - let (written, fin) = send.emit(&mut buf[..4]).unwrap(); - assert_eq!(written, 4); - assert!(!fin); - assert_eq!(&buf[..written], b"some"); - assert_eq!(send.len, 5); - assert_eq!(send.off_front(), 4); - - send.retransmit(3, 5); - assert_eq!(send.len, 6); - assert_eq!(send.off_front(), 3); - } - - #[test] - fn send_buf_final_size_retransmit() { - let mut buf = [0; 50]; - let mut send = SendBuf::new(u64::MAX); - - send.write(&buf, false).unwrap(); - assert_eq!(send.off_front(), 0); - - // Emit the whole buffer - let (written, _fin) = send.emit(&mut buf).unwrap(); - assert_eq!(written, buf.len()); - assert_eq!(send.off_front(), buf.len() as u64); - - // Server decides to retransmit the last 10 bytes. It's possible - // it's not actually lost and that the client did receive it. - send.retransmit(40, 10); - - // Server receives STOP_SENDING from client. The final_size we - // send in the RESET_STREAM should be 50. If we send anything less, - // it's a FINAL_SIZE_ERROR. - let (fin_off, unsent) = send.stop(0).unwrap(); - assert_eq!(fin_off, 50); - assert_eq!(unsent, 0); - } - fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) { let key = streams.get(stream_id).unwrap().priority_key.clone(); streams.update_priority(&key.clone(), &key); @@ -3867,3 +2205,6 @@ mod tests { assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]); } } + +mod recv_buf; +mod send_buf; diff --git a/quiche/src/stream/recv_buf.rs b/quiche/src/stream/recv_buf.rs new file mode 100644 index 00000000..f1890672 --- /dev/null +++ b/quiche/src/stream/recv_buf.rs @@ -0,0 +1,977 @@ +// Copyright (C) 2023, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::cmp; +use std::time; + +use std::collections::BTreeMap; +use std::collections::VecDeque; + +use crate::Error; +use crate::Result; + +use crate::flowcontrol; + +use super::RangeBuf; +use super::DEFAULT_STREAM_WINDOW; + +/// Receive-side stream buffer. +/// +/// Stream data received by the peer is buffered in a list of data chunks +/// ordered by offset in ascending order. Contiguous data can then be read +/// into a slice. +#[derive(Debug, Default)] +pub struct RecvBuf { + /// Chunks of data received from the peer that have not yet been read by + /// the application, ordered by offset. + data: BTreeMap, + + /// The lowest data offset that has yet to be read by the application. + off: u64, + + /// The total length of data received on this stream. + len: u64, + + /// Receiver flow controller. + flow_control: flowcontrol::FlowControl, + + /// The final stream offset received from the peer, if any. + fin_off: Option, + + /// The error code received via RESET_STREAM. + error: Option, + + /// Whether incoming data is validated but not buffered. + drain: bool, +} + +impl RecvBuf { + /// Creates a new receive buffer. + pub fn new(max_data: u64, max_window: u64) -> RecvBuf { + RecvBuf { + flow_control: flowcontrol::FlowControl::new( + max_data, + cmp::min(max_data, DEFAULT_STREAM_WINDOW), + max_window, + ), + ..RecvBuf::default() + } + } + + /// Inserts the given chunk of data in the buffer. + /// + /// This also takes care of enforcing stream flow control limits, as well + /// as handling incoming data that overlaps data that is already in the + /// buffer. + pub fn write(&mut self, buf: RangeBuf) -> Result<()> { + if buf.max_off() > self.max_data() { + return Err(Error::FlowControl); + } + + if let Some(fin_off) = self.fin_off { + // Stream's size is known, forbid data beyond that point. + if buf.max_off() > fin_off { + return Err(Error::FinalSize); + } + + // Stream's size is already known, forbid changing it. + if buf.fin() && fin_off != buf.max_off() { + return Err(Error::FinalSize); + } + } + + // Stream's known size is lower than data already received. + if buf.fin() && buf.max_off() < self.len { + return Err(Error::FinalSize); + } + + // We already saved the final offset, so there's nothing else we + // need to keep from the RangeBuf if it's empty. + if self.fin_off.is_some() && buf.is_empty() { + return Ok(()); + } + + if buf.fin() { + self.fin_off = Some(buf.max_off()); + } + + // No need to store empty buffer that doesn't carry the fin flag. + if !buf.fin() && buf.is_empty() { + return Ok(()); + } + + // Check if data is fully duplicate, that is the buffer's max offset is + // lower or equal to the offset already stored in the recv buffer. + if self.off >= buf.max_off() { + // An exception is applied to empty range buffers, because an empty + // buffer's max offset matches the max offset of the recv buffer. + // + // By this point all spurious empty buffers should have already been + // discarded, so allowing empty buffers here should be safe. + if !buf.is_empty() { + return Ok(()); + } + } + + let mut tmp_bufs = VecDeque::with_capacity(2); + tmp_bufs.push_back(buf); + + 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() { + // Discard incoming data below current stream offset. Bytes up to + // `self.off` have already been received so we should not buffer + // them again. This is also important to make sure `ready()` doesn't + // get stuck when a buffer with lower offset than the stream's is + // buffered. + if self.off_front() > buf.off() { + buf = buf.split_off((self.off_front() - buf.off()) as usize); + } + + // Handle overlapping data. If the incoming data's starting offset + // is above the previous maximum received offset, there is clearly + // no overlap so this logic can be skipped. However do still try to + // merge an empty final buffer (i.e. an empty buffer with the fin + // flag set, which is the only kind of empty buffer that should + // reach this point). + if buf.off() < self.max_off() || buf.is_empty() { + for (_, b) in self.data.range(buf.off()..) { + let off = buf.off(); + + // We are past the current buffer. + if b.off() > buf.max_off() { + break; + } + + // New buffer is fully contained in existing buffer. + if off >= b.off() && buf.max_off() <= b.max_off() { + continue 'tmp; + } + + // New buffer's start overlaps existing buffer. + if off >= b.off() && off < b.max_off() { + buf = buf.split_off((b.max_off() - off) as usize); + } + + // New buffer's end overlaps existing buffer. + if off < b.off() && buf.max_off() > b.off() { + tmp_bufs + .push_back(buf.split_off((b.off() - off) as usize)); + } + } + } + + self.len = cmp::max(self.len, buf.max_off()); + + if !self.drain { + self.data.insert(buf.max_off(), buf); + } + } + + Ok(()) + } + + /// Writes data from the receive buffer into the given output buffer. + /// + /// Only contiguous data is written to the output buffer, starting from + /// offset 0. The offset is incremented as data is read out of the receive + /// buffer into the application buffer. If there is no data at the expected + /// read offset, the `Done` error is returned. + /// + /// On success the amount of data read, and a flag indicating if there is + /// no more data in the buffer, are returned as a tuple. + pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> { + let mut len = 0; + let mut cap = out.len(); + + if !self.ready() { + return Err(Error::Done); + } + + // The stream was reset, so return the error code instead. + if let Some(e) = self.error { + return Err(Error::StreamReset(e)); + } + + while cap > 0 && self.ready() { + let mut entry = match self.data.first_entry() { + Some(entry) => entry, + None => break, + }; + + let buf = entry.get_mut(); + + let buf_len = cmp::min(buf.len(), cap); + + out[len..len + buf_len].copy_from_slice(&buf[..buf_len]); + + self.off += buf_len as u64; + + len += buf_len; + cap -= buf_len; + + if buf_len < buf.len() { + buf.consume(buf_len); + + // We reached the maximum capacity, so end here. + break; + } + + entry.remove(); + } + + // Update consumed bytes for flow control. + self.flow_control.add_consumed(len as u64); + + Ok((len, self.is_fin())) + } + + /// Resets the stream at the given offset. + pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result { + // Stream's size is already known, forbid changing it. + if let Some(fin_off) = self.fin_off { + if fin_off != final_size { + return Err(Error::FinalSize); + } + } + + // Stream's known size is lower than data already received. + if final_size < self.len { + return Err(Error::FinalSize); + } + + // Calculate how many bytes need to be removed from the connection flow + // control. + let max_data_delta = final_size - self.len; + + if self.error.is_some() { + return Ok(max_data_delta as usize); + } + + self.error = Some(error_code); + + // Clear all data already buffered. + self.off = final_size; + + self.data.clear(); + + // In order to ensure the application is notified when the stream is + // reset, enqueue a zero-length buffer at the final size offset. + let buf = RangeBuf::from(b"", final_size, true); + self.write(buf)?; + + Ok(max_data_delta as usize) + } + + /// Commits the new max_data limit. + pub fn update_max_data(&mut self, now: time::Instant) { + self.flow_control.update_max_data(now); + } + + /// Return the new max_data limit. + pub fn max_data_next(&mut self) -> u64 { + self.flow_control.max_data_next() + } + + /// Return the current flow control limit. + pub fn max_data(&self) -> u64 { + self.flow_control.max_data() + } + + /// Return the current window. + pub fn window(&self) -> u64 { + self.flow_control.window() + } + + /// Autotune the window size. + pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) { + self.flow_control.autotune_window(now, rtt); + } + + /// Shuts down receiving data. + pub fn shutdown(&mut self) -> Result<()> { + if self.drain { + return Err(Error::Done); + } + + self.drain = true; + + self.data.clear(); + + self.off = self.max_off(); + + Ok(()) + } + + /// Returns the lowest offset of data buffered. + pub fn off_front(&self) -> u64 { + self.off + } + + /// Returns true if we need to update the local flow control limit. + pub fn almost_full(&self) -> bool { + self.fin_off.is_none() && self.flow_control.should_update_max_data() + } + + /// Returns the largest offset ever received. + pub fn max_off(&self) -> u64 { + self.len + } + + /// Returns true if the receive-side of the stream is complete. + /// + /// This happens when the stream's receive final size is known, and the + /// application has read all data from the stream. + pub fn is_fin(&self) -> bool { + if self.fin_off == Some(self.off) { + return true; + } + + false + } + + /// Returns true if the stream is not storing incoming data. + pub fn is_draining(&self) -> bool { + self.drain + } + + /// Returns true if the stream has data to be read. + pub fn ready(&self) -> bool { + let (_, buf) = match self.data.first_key_value() { + Some(v) => v, + None => return false, + }; + + buf.off() == self.off + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn empty_stream_frame() { + let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let buf = RangeBuf::from(b"hello", 0, false); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + let mut buf = [0; 32]; + assert_eq!(recv.emit(&mut buf), Ok((5, false))); + + // Don't store non-fin empty buffer. + let buf = RangeBuf::from(b"", 10, false); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 5); + assert_eq!(recv.data.len(), 0); + + // Check flow control for empty buffer. + let buf = RangeBuf::from(b"", 16, false); + assert_eq!(recv.write(buf), Err(Error::FlowControl)); + + // Store fin empty buffer. + let buf = RangeBuf::from(b"", 5, true); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 5); + assert_eq!(recv.data.len(), 1); + + // Don't store additional fin empty buffers. + let buf = RangeBuf::from(b"", 5, true); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 5); + assert_eq!(recv.data.len(), 1); + + // Don't store additional fin non-empty buffers. + let buf = RangeBuf::from(b"aa", 3, true); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 5); + assert_eq!(recv.data.len(), 1); + + // Validate final size with fin empty buffers. + let buf = RangeBuf::from(b"", 6, true); + assert_eq!(recv.write(buf), Err(Error::FinalSize)); + let buf = RangeBuf::from(b"", 4, true); + assert_eq!(recv.write(buf), Err(Error::FinalSize)); + + let mut buf = [0; 32]; + assert_eq!(recv.emit(&mut buf), Ok((0, true))); + } + + #[test] + fn ordered_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"hello", 0, false); + let second = RangeBuf::from(b"world", 5, false); + let third = RangeBuf::from(b"something", 10, true); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 10); + assert_eq!(recv.off, 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 0); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 19); + assert!(fin); + assert_eq!(&buf[..len], b"helloworldsomething"); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 19); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn split_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"helloworld", 9, true); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 0); + + let (len, fin) = recv.emit(&mut buf[..10]).unwrap(); + assert_eq!(len, 10); + assert!(!fin); + assert_eq!(&buf[..len], b"somethingh"); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 10); + + let (len, fin) = recv.emit(&mut buf[..5]).unwrap(); + assert_eq!(len, 5); + assert!(!fin); + assert_eq!(&buf[..len], b"ellow"); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 15); + + let (len, fin) = recv.emit(&mut buf[..10]).unwrap(); + assert_eq!(len, 4); + assert!(fin); + assert_eq!(&buf[..len], b"orld"); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 19); + } + + #[test] + fn incomplete_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"helloworld", 9, true); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 0); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 19); + assert!(fin); + assert_eq!(&buf[..len], b"somethinghelloworld"); + assert_eq!(recv.len, 19); + assert_eq!(recv.off, 19); + } + + #[test] + fn zero_len_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"", 9, true); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 9); + assert!(fin); + assert_eq!(&buf[..len], b"something"); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + } + + #[test] + fn past_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"hello", 3, false); + let third = RangeBuf::from(b"ello", 4, true); + let fourth = RangeBuf::from(b"ello", 5, true); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 9); + assert!(!fin); + assert_eq!(&buf[..len], b"something"); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.write(third), Err(Error::FinalSize)); + + assert!(recv.write(fourth).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn fully_overlapping_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"hello", 4, false); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 9); + assert!(!fin); + assert_eq!(&buf[..len], b"something"); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn fully_overlapping_read2() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"hello", 4, false); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 9); + assert!(!fin); + assert_eq!(&buf[..len], b"somehello"); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn fully_overlapping_read3() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"hello", 3, false); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 8); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 3); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 9); + assert!(!fin); + assert_eq!(&buf[..len], b"somhellog"); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 9); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn fully_overlapping_read_multi() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"somethingsomething", 0, false); + let second = RangeBuf::from(b"hello", 3, false); + let third = RangeBuf::from(b"hello", 12, false); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 8); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 17); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 18); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 5); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 18); + assert!(!fin); + assert_eq!(&buf[..len], b"somhellogsomhellog"); + assert_eq!(recv.len, 18); + assert_eq!(recv.off, 18); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn overlapping_start_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"something", 0, false); + let second = RangeBuf::from(b"hello", 8, true); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 13); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 13); + assert!(fin); + assert_eq!(&buf[..len], b"somethingello"); + assert_eq!(recv.len, 13); + assert_eq!(recv.off, 13); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn overlapping_end_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"hello", 0, false); + let second = RangeBuf::from(b"something", 3, true); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 12); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 12); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 12); + assert!(fin); + assert_eq!(&buf[..len], b"helsomething"); + assert_eq!(recv.len, 12); + assert_eq!(recv.off, 12); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn overlapping_end_twice_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"he", 0, false); + let second = RangeBuf::from(b"ow", 4, false); + let third = RangeBuf::from(b"rl", 7, false); + let fourth = RangeBuf::from(b"helloworld", 0, true); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 3); + + assert!(recv.write(fourth).is_ok()); + assert_eq!(recv.len, 10); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 6); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 10); + assert!(fin); + assert_eq!(&buf[..len], b"helloworld"); + assert_eq!(recv.len, 10); + assert_eq!(recv.off, 10); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn overlapping_end_twice_and_contained_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"hellow", 0, false); + let second = RangeBuf::from(b"barfoo", 10, true); + let third = RangeBuf::from(b"rl", 7, false); + let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 16); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 16); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 3); + + assert!(recv.write(fourth).is_ok()); + assert_eq!(recv.len, 16); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 5); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 16); + assert!(fin); + assert_eq!(&buf[..len], b"helloworldbarfoo"); + assert_eq!(recv.len, 16); + assert_eq!(recv.off, 16); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn partially_multi_overlapping_reordered_read() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"hello", 8, false); + let second = RangeBuf::from(b"something", 0, false); + let third = RangeBuf::from(b"moar", 11, true); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 13); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 13); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 15); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 3); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 15); + assert!(fin); + assert_eq!(&buf[..len], b"somethinhelloar"); + assert_eq!(recv.len, 15); + assert_eq!(recv.off, 15); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } + + #[test] + fn partially_multi_overlapping_reordered_read2() { + let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let mut buf = [0; 32]; + + let first = RangeBuf::from(b"aaa", 0, false); + let second = RangeBuf::from(b"bbb", 2, false); + let third = RangeBuf::from(b"ccc", 4, false); + let fourth = RangeBuf::from(b"ddd", 6, false); + let fifth = RangeBuf::from(b"eee", 9, false); + let sixth = RangeBuf::from(b"fff", 11, false); + + assert!(recv.write(second).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert!(recv.write(fourth).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 2); + + assert!(recv.write(third).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 3); + + assert!(recv.write(first).is_ok()); + assert_eq!(recv.len, 9); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 4); + + assert!(recv.write(sixth).is_ok()); + assert_eq!(recv.len, 14); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 5); + + assert!(recv.write(fifth).is_ok()); + assert_eq!(recv.len, 14); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 6); + + let (len, fin) = recv.emit(&mut buf).unwrap(); + assert_eq!(len, 14); + assert!(!fin); + assert_eq!(&buf[..len], b"aabbbcdddeefff"); + assert_eq!(recv.len, 14); + assert_eq!(recv.off, 14); + assert_eq!(recv.data.len(), 0); + + assert_eq!(recv.emit(&mut buf), Err(Error::Done)); + } +} diff --git a/quiche/src/stream/send_buf.rs b/quiche/src/stream/send_buf.rs new file mode 100644 index 00000000..f0688157 --- /dev/null +++ b/quiche/src/stream/send_buf.rs @@ -0,0 +1,775 @@ +// Copyright (C) 2023, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::cmp; + +use std::collections::VecDeque; + +use crate::Error; +use crate::Result; + +use crate::ranges; + +use super::RangeBuf; + +#[cfg(test)] +const SEND_BUFFER_SIZE: usize = 5; + +#[cfg(not(test))] +const SEND_BUFFER_SIZE: usize = 4096; + +/// Send-side stream buffer. +/// +/// Stream data scheduled to be sent to the peer is buffered in a list of data +/// chunks ordered by offset in ascending order. Contiguous data can then be +/// read into a slice. +/// +/// By default, new data is appended at the end of the stream, but data can be +/// inserted at the start of the buffer (this is to allow data that needs to be +/// retransmitted to be re-buffered). +#[derive(Debug, Default)] +pub struct SendBuf { + /// Chunks of data to be sent, ordered by offset. + data: VecDeque, + + /// The index of the buffer that needs to be sent next. + pos: usize, + + /// The maximum offset of data buffered in the stream. + off: u64, + + /// The maximum offset of data sent to the peer, regardless of + /// retransmissions. + emit_off: u64, + + /// The amount of data currently buffered. + len: u64, + + /// The maximum offset we are allowed to send to the peer. + max_data: u64, + + /// The last offset the stream was blocked at, if any. + blocked_at: Option, + + /// The final stream offset written to the stream, if any. + fin_off: Option, + + /// Whether the stream's send-side has been shut down. + shutdown: bool, + + /// Ranges of data offsets that have been acked. + acked: ranges::RangeSet, + + /// The error code received via STOP_SENDING. + error: Option, +} + +impl SendBuf { + /// Creates a new send buffer. + pub fn new(max_data: u64) -> SendBuf { + SendBuf { + max_data, + ..SendBuf::default() + } + } + + /// Inserts the given slice of data at the end of the buffer. + /// + /// The number of bytes that were actually stored in the buffer is returned + /// (this may be lower than the size of the input buffer, in case of partial + /// writes). + pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result { + let max_off = self.off + data.len() as u64; + + // Get the stream send capacity. This will return an error if the stream + // was stopped. + let capacity = self.cap()?; + + if data.len() > capacity { + // Truncate the input buffer according to the stream's capacity. + let len = capacity; + data = &data[..len]; + + // We are not buffering the full input, so clear the fin flag. + fin = false; + } + + if let Some(fin_off) = self.fin_off { + // Can't write past final offset. + if max_off > fin_off { + return Err(Error::FinalSize); + } + + // Can't "undo" final offset. + if max_off == fin_off && !fin { + return Err(Error::FinalSize); + } + } + + if fin { + self.fin_off = Some(max_off); + } + + // Don't queue data that was already fully acked. + if self.ack_off() >= max_off { + return Ok(data.len()); + } + + // We already recorded the final offset, so we can just discard the + // empty buffer now. + if data.is_empty() { + return Ok(data.len()); + } + + let mut len = 0; + + // Split the remaining input data into consistently-sized buffers to + // avoid fragmentation. + for chunk in data.chunks(SEND_BUFFER_SIZE) { + len += chunk.len(); + + let fin = len == data.len() && fin; + + let buf = RangeBuf::from(chunk, self.off, fin); + + // The new data can simply be appended at the end of the send buffer. + self.data.push_back(buf); + + self.off += chunk.len() as u64; + self.len += chunk.len() as u64; + } + + Ok(len) + } + + /// Writes data from the send buffer into the given output buffer. + pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> { + let mut out_len = out.len(); + let out_off = self.off_front(); + + let mut next_off = out_off; + + while out_len > 0 && + self.ready() && + self.off_front() == next_off && + self.off_front() < self.max_data + { + let buf = match self.data.get_mut(self.pos) { + Some(v) => v, + + None => break, + }; + + if buf.is_empty() { + self.pos += 1; + continue; + } + + let buf_len = cmp::min(buf.len(), out_len); + let partial = buf_len < buf.len(); + + // Copy data to the output buffer. + let out_pos = (next_off - out_off) as usize; + out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]); + + self.len -= buf_len as u64; + + out_len -= buf_len; + + next_off = buf.off() + buf_len as u64; + + buf.consume(buf_len); + + if partial { + // We reached the maximum capacity, so end here. + break; + } + + self.pos += 1; + } + + // Override the `fin` flag set for the output buffer by matching the + // buffer's maximum offset against the stream's final offset (if known). + // + // This is more efficient than tracking `fin` using the range buffers + // themselves, and lets us avoid queueing empty buffers just so we can + // propagate the final size. + let fin = self.fin_off == Some(next_off); + + // Record the largest offset that has been sent so we can accurately + // report final_size + self.emit_off = cmp::max(self.emit_off, next_off); + + Ok((out.len() - out_len, fin)) + } + + /// Updates the max_data limit to the given value. + pub fn update_max_data(&mut self, max_data: u64) { + self.max_data = cmp::max(self.max_data, max_data); + } + + /// Updates the last offset the stream was blocked at, if any. + pub fn update_blocked_at(&mut self, blocked_at: Option) { + self.blocked_at = blocked_at; + } + + /// The last offset the stream was blocked at, if any. + pub fn blocked_at(&self) -> Option { + self.blocked_at + } + + /// Increments the acked data offset. + pub fn ack(&mut self, off: u64, len: usize) { + self.acked.insert(off..off + len as u64); + } + + pub fn ack_and_drop(&mut self, off: u64, len: usize) { + self.ack(off, len); + + let ack_off = self.ack_off(); + + if self.data.is_empty() { + return; + } + + if off > ack_off { + return; + } + + let mut drop_until = None; + + // Drop contiguously acked data from the front of the buffer. + for (i, buf) in self.data.iter_mut().enumerate() { + // Newly acked range is past highest contiguous acked range, so we + // can't drop it. + if buf.off >= ack_off { + break; + } + + // Highest contiguous acked range falls within newly acked range, + // so we can't drop it. + if buf.off < ack_off && ack_off < buf.max_off() { + break; + } + + // Newly acked range can be dropped. + drop_until = Some(i); + } + + if let Some(drop) = drop_until { + self.data.drain(..=drop); + + // When a buffer is marked for retransmission, but then acked before + // it could be retransmitted, we might end up decreasing the SendBuf + // position too much, so make sure that doesn't happen. + self.pos = self.pos.saturating_sub(drop + 1); + } + } + + pub fn retransmit(&mut self, off: u64, len: usize) { + let max_off = off + len as u64; + let ack_off = self.ack_off(); + + if self.data.is_empty() { + return; + } + + if max_off <= ack_off { + return; + } + + for i in 0..self.data.len() { + let buf = &mut self.data[i]; + + if buf.off >= max_off { + break; + } + + if off > buf.max_off() { + continue; + } + + // Split the buffer into 2 if the retransmit range ends before the + // buffer's final offset. + let new_buf = if buf.off < max_off && max_off < buf.max_off() { + Some(buf.split_off((max_off - buf.off) as usize)) + } else { + None + }; + + let prev_pos = buf.pos; + + // Reduce the buffer's position (expand the buffer) if the retransmit + // range is past the buffer's starting offset. + buf.pos = if off > buf.off && off <= buf.max_off() { + cmp::min(buf.pos, buf.start + (off - buf.off) as usize) + } else { + buf.start + }; + + self.pos = cmp::min(self.pos, i); + + self.len += (prev_pos - buf.pos) as u64; + + if let Some(b) = new_buf { + self.data.insert(i + 1, b); + } + } + } + + /// Resets the stream at the current offset and clears all buffered data. + pub fn reset(&mut self) -> (u64, u64) { + let unsent_off = cmp::max(self.off_front(), self.emit_off); + let unsent_len = self.off_back().saturating_sub(unsent_off); + + self.fin_off = Some(unsent_off); + + // Drop all buffered data. + self.data.clear(); + + // Mark all data as acked. + self.ack(0, self.off as usize); + + self.pos = 0; + self.len = 0; + self.off = unsent_off; + + (self.emit_off, unsent_len) + } + + /// Resets the streams and records the received error code. + /// + /// Calling this again after the first time has no effect. + pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> { + if self.error.is_some() { + return Err(Error::Done); + } + + let (max_off, unsent) = self.reset(); + + self.error = Some(error_code); + + Ok((max_off, unsent)) + } + + /// Shuts down sending data. + pub fn shutdown(&mut self) -> Result<(u64, u64)> { + if self.shutdown { + return Err(Error::Done); + } + + self.shutdown = true; + + Ok(self.reset()) + } + + /// Returns the largest offset of data buffered. + pub fn off_back(&self) -> u64 { + self.off + } + + /// Returns the lowest offset of data buffered. + pub fn off_front(&self) -> u64 { + let mut pos = self.pos; + + // Skip empty buffers from the start of the queue. + while let Some(b) = self.data.get(pos) { + if !b.is_empty() { + return b.off(); + } + + pos += 1; + } + + self.off + } + + /// The maximum offset we are allowed to send to the peer. + pub fn max_off(&self) -> u64 { + self.max_data + } + + /// Returns true if all data in the stream has been sent. + /// + /// This happens when the stream's send final size is known, and the + /// application has already written data up to that point. + pub fn is_fin(&self) -> bool { + if self.fin_off == Some(self.off) { + return true; + } + + false + } + + /// Returns true if the send-side of the stream is complete. + /// + /// This happens when the stream's send final size is known, and the peer + /// has already acked all stream data up to that point. + pub fn is_complete(&self) -> bool { + if let Some(fin_off) = self.fin_off { + if self.acked == (0..fin_off) { + return true; + } + } + + false + } + + /// Returns true if the stream was stopped before completion. + pub fn is_stopped(&self) -> bool { + self.error.is_some() + } + + /// Returns true if the stream was shut down. + pub fn is_shutdown(&self) -> bool { + self.shutdown + } + + /// Returns true if there is data to be written. + pub fn ready(&self) -> bool { + !self.data.is_empty() && self.off_front() < self.off + } + + /// Returns the highest contiguously acked offset. + pub fn ack_off(&self) -> u64 { + match self.acked.iter().next() { + // Only consider the initial range if it contiguously covers the + // start of the stream (i.e. from offset 0). + Some(std::ops::Range { start: 0, end }) => end, + + Some(_) | None => 0, + } + } + + /// Returns the outgoing flow control capacity. + pub fn cap(&self) -> Result { + // The stream was stopped, so return the error code instead. + if let Some(e) = self.error { + return Err(Error::StreamStopped(e)); + } + + Ok((self.max_data - self.off) as usize) + } + + /// Returns the number of separate buffers stored. + #[allow(dead_code)] + pub fn bufs_count(&self) -> usize { + self.data.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_write() { + let mut buf = [0; 5]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + + let (written, fin) = send.emit(&mut buf).unwrap(); + assert_eq!(written, 0); + assert!(!fin); + } + + #[test] + fn multi_write() { + let mut buf = [0; 128]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + + let first = b"something"; + let second = b"helloworld"; + + assert!(send.write(first, false).is_ok()); + assert_eq!(send.len, 9); + + assert!(send.write(second, true).is_ok()); + assert_eq!(send.len, 19); + + let (written, fin) = send.emit(&mut buf[..128]).unwrap(); + assert_eq!(written, 19); + assert!(fin); + assert_eq!(&buf[..written], b"somethinghelloworld"); + assert_eq!(send.len, 0); + } + + #[test] + fn split_write() { + let mut buf = [0; 10]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + + let first = b"something"; + let second = b"helloworld"; + + assert!(send.write(first, false).is_ok()); + assert_eq!(send.len, 9); + + assert!(send.write(second, true).is_ok()); + assert_eq!(send.len, 19); + + assert_eq!(send.off_front(), 0); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 10); + assert!(!fin); + assert_eq!(&buf[..written], b"somethingh"); + assert_eq!(send.len, 9); + + assert_eq!(send.off_front(), 10); + + let (written, fin) = send.emit(&mut buf[..5]).unwrap(); + assert_eq!(written, 5); + assert!(!fin); + assert_eq!(&buf[..written], b"ellow"); + assert_eq!(send.len, 4); + + assert_eq!(send.off_front(), 15); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 4); + assert!(fin); + assert_eq!(&buf[..written], b"orld"); + assert_eq!(send.len, 0); + + assert_eq!(send.off_front(), 19); + } + + #[test] + fn resend() { + let mut buf = [0; 15]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + assert_eq!(send.off_front(), 0); + + let first = b"something"; + let second = b"helloworld"; + + assert!(send.write(first, false).is_ok()); + assert_eq!(send.off_front(), 0); + + assert!(send.write(second, true).is_ok()); + assert_eq!(send.off_front(), 0); + + assert_eq!(send.len, 19); + + let (written, fin) = send.emit(&mut buf[..4]).unwrap(); + assert_eq!(written, 4); + assert!(!fin); + assert_eq!(&buf[..written], b"some"); + assert_eq!(send.len, 15); + assert_eq!(send.off_front(), 4); + + let (written, fin) = send.emit(&mut buf[..5]).unwrap(); + assert_eq!(written, 5); + assert!(!fin); + assert_eq!(&buf[..written], b"thing"); + assert_eq!(send.len, 10); + assert_eq!(send.off_front(), 9); + + let (written, fin) = send.emit(&mut buf[..5]).unwrap(); + assert_eq!(written, 5); + assert!(!fin); + assert_eq!(&buf[..written], b"hello"); + assert_eq!(send.len, 5); + assert_eq!(send.off_front(), 14); + + send.retransmit(4, 5); + assert_eq!(send.len, 10); + assert_eq!(send.off_front(), 4); + + send.retransmit(0, 4); + assert_eq!(send.len, 14); + assert_eq!(send.off_front(), 0); + + let (written, fin) = send.emit(&mut buf[..11]).unwrap(); + assert_eq!(written, 9); + assert!(!fin); + assert_eq!(&buf[..written], b"something"); + assert_eq!(send.len, 5); + assert_eq!(send.off_front(), 14); + + let (written, fin) = send.emit(&mut buf[..11]).unwrap(); + assert_eq!(written, 5); + assert!(fin); + assert_eq!(&buf[..written], b"world"); + assert_eq!(send.len, 0); + assert_eq!(send.off_front(), 19); + } + + #[test] + fn write_blocked_by_off() { + let mut buf = [0; 10]; + + let mut send = SendBuf::default(); + assert_eq!(send.len, 0); + + let first = b"something"; + let second = b"helloworld"; + + assert_eq!(send.write(first, false), Ok(0)); + assert_eq!(send.len, 0); + + assert_eq!(send.write(second, true), Ok(0)); + assert_eq!(send.len, 0); + + send.update_max_data(5); + + assert_eq!(send.write(first, false), Ok(5)); + assert_eq!(send.len, 5); + + assert_eq!(send.write(second, true), Ok(0)); + assert_eq!(send.len, 5); + + assert_eq!(send.off_front(), 0); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 5); + assert!(!fin); + assert_eq!(&buf[..written], b"somet"); + assert_eq!(send.len, 0); + + assert_eq!(send.off_front(), 5); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 0); + assert!(!fin); + assert_eq!(&buf[..written], b""); + assert_eq!(send.len, 0); + + send.update_max_data(15); + + assert_eq!(send.write(&first[5..], false), Ok(4)); + assert_eq!(send.len, 4); + + assert_eq!(send.write(second, true), Ok(6)); + assert_eq!(send.len, 10); + + assert_eq!(send.off_front(), 5); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 10); + assert!(!fin); + assert_eq!(&buf[..10], b"hinghellow"); + assert_eq!(send.len, 0); + + send.update_max_data(25); + + assert_eq!(send.write(&second[6..], true), Ok(4)); + assert_eq!(send.len, 4); + + assert_eq!(send.off_front(), 15); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 4); + assert!(fin); + assert_eq!(&buf[..written], b"orld"); + assert_eq!(send.len, 0); + } + + #[test] + fn zero_len_write() { + let mut buf = [0; 10]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + + let first = b"something"; + + assert!(send.write(first, false).is_ok()); + assert_eq!(send.len, 9); + + assert!(send.write(&[], true).is_ok()); + assert_eq!(send.len, 9); + + assert_eq!(send.off_front(), 0); + + let (written, fin) = send.emit(&mut buf[..10]).unwrap(); + assert_eq!(written, 9); + assert!(fin); + assert_eq!(&buf[..written], b"something"); + assert_eq!(send.len, 0); + } + + /// Check SendBuf::len calculation on a retransmit case + #[test] + fn send_buf_len_on_retransmit() { + let mut buf = [0; 15]; + + let mut send = SendBuf::new(u64::MAX); + assert_eq!(send.len, 0); + assert_eq!(send.off_front(), 0); + + let first = b"something"; + + assert!(send.write(first, false).is_ok()); + assert_eq!(send.off_front(), 0); + + assert_eq!(send.len, 9); + + let (written, fin) = send.emit(&mut buf[..4]).unwrap(); + assert_eq!(written, 4); + assert!(!fin); + assert_eq!(&buf[..written], b"some"); + assert_eq!(send.len, 5); + assert_eq!(send.off_front(), 4); + + send.retransmit(3, 5); + assert_eq!(send.len, 6); + assert_eq!(send.off_front(), 3); + } + + #[test] + fn send_buf_final_size_retransmit() { + let mut buf = [0; 50]; + let mut send = SendBuf::new(u64::MAX); + + send.write(&buf, false).unwrap(); + assert_eq!(send.off_front(), 0); + + // Emit the whole buffer + let (written, _fin) = send.emit(&mut buf).unwrap(); + assert_eq!(written, buf.len()); + assert_eq!(send.off_front(), buf.len() as u64); + + // Server decides to retransmit the last 10 bytes. It's possible + // it's not actually lost and that the client did receive it. + send.retransmit(40, 10); + + // Server receives STOP_SENDING from client. The final_size we + // send in the RESET_STREAM should be 50. If we send anything less, + // it's a FINAL_SIZE_ERROR. + let (fin_off, unsent) = send.stop(0).unwrap(); + assert_eq!(fin_off, 50); + assert_eq!(unsent, 0); + } +}