Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pass now to qlog #2212

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Handler<'_> {
impl super::Handler for Handler<'_> {
type Client = Connection;

fn handle(&mut self, client: &mut Self::Client) -> Res<bool> {
fn handle(&mut self, client: &mut Self::Client, now: Instant) -> Res<bool> {
while let Some(event) = client.next_event() {
if self.needs_key_update {
match client.initiate_key_update() {
Expand All @@ -69,7 +69,7 @@ impl super::Handler for Handler<'_> {

match event {
ConnectionEvent::AuthenticationNeeded => {
client.authenticated(AuthenticationStatus::Ok, Instant::now());
client.authenticated(AuthenticationStatus::Ok, now);
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
self.read(client, stream_id)?;
Expand Down
50 changes: 33 additions & 17 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Handler<'_> {
impl super::Handler for Handler<'_> {
type Client = Http3Client;

fn handle(&mut self, client: &mut Http3Client) -> Res<bool> {
fn handle(&mut self, client: &mut Http3Client, now: Instant) -> Res<bool> {
while let Some(event) = client.next_event() {
match event {
Http3ClientEvent::AuthenticationNeeded => {
Expand All @@ -182,7 +182,7 @@ impl super::Handler for Handler<'_> {
qwarn!("Data on unexpected stream: {stream_id}");
}
if fin {
self.url_handler.on_stream_fin(client, stream_id);
self.url_handler.on_stream_fin(client, stream_id, now);
}
}
Http3ClientEvent::DataReadable { stream_id } => {
Expand Down Expand Up @@ -215,7 +215,7 @@ impl super::Handler for Handler<'_> {
}

if stream_done {
self.url_handler.on_stream_fin(client, stream_id);
self.url_handler.on_stream_fin(client, stream_id, now);
}
}
Http3ClientEvent::DataWritable { stream_id } => {
Expand All @@ -224,20 +224,20 @@ impl super::Handler for Handler<'_> {
qwarn!("Data on unexpected stream: {stream_id}");
}
Some(handler) => {
handler.process_data_writable(client, stream_id);
handler.process_data_writable(client, stream_id, now);
}
}
}
Http3ClientEvent::StateChange(Http3State::Connected)
| Http3ClientEvent::RequestsCreatable => {
qinfo!("{event:?}");
self.url_handler.process_urls(client);
self.url_handler.process_urls(client, now);
}
Http3ClientEvent::ZeroRttRejected => {
qinfo!("{event:?}");
// All 0-RTT data was rejected. We need to retransmit it.
self.reinit();
self.url_handler.process_urls(client);
self.url_handler.process_urls(client, now);
}
Http3ClientEvent::ResumptionToken(t) => self.token = Some(t),
_ => {
Expand All @@ -263,7 +263,12 @@ trait StreamHandler {
data: &[u8],
output_read_data: bool,
) -> Res<bool>;
fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId);
fn process_data_writable(
&mut self,
client: &mut Http3Client,
stream_id: StreamId,
now: Instant,
);
}

struct DownloadStreamHandler {
Expand Down Expand Up @@ -308,7 +313,13 @@ impl StreamHandler for DownloadStreamHandler {
Ok(true)
}

fn process_data_writable(&mut self, _client: &mut Http3Client, _stream_id: StreamId) {}
fn process_data_writable(
&mut self,
_client: &mut Http3Client,
_stream_id: StreamId,
_now: Instant,
) {
}
}

struct UploadStreamHandler {
Expand Down Expand Up @@ -341,12 +352,17 @@ impl StreamHandler for UploadStreamHandler {
Ok(true)
}

fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId) {
fn process_data_writable(
&mut self,
client: &mut Http3Client,
stream_id: StreamId,
now: Instant,
) {
let done = self
.data
.send(|chunk| client.send_data(stream_id, chunk).unwrap());
.send(|chunk| client.send_data(stream_id, chunk, now).unwrap());
if done {
client.stream_close_send(stream_id).unwrap();
client.stream_close_send(stream_id, now).unwrap();
}
}
}
Expand All @@ -364,21 +380,21 @@ impl UrlHandler<'_> {
self.stream_handlers.get_mut(&stream_id)
}

fn process_urls(&mut self, client: &mut Http3Client) {
fn process_urls(&mut self, client: &mut Http3Client, now: Instant) {
loop {
if self.url_queue.is_empty() {
break;
}
if self.stream_handlers.len() >= self.args.concurrency {
break;
}
if !self.next_url(client) {
if !self.next_url(client, now) {
break;
}
}
}

fn next_url(&mut self, client: &mut Http3Client) -> bool {
fn next_url(&mut self, client: &mut Http3Client, now: Instant) -> bool {
let url = self
.url_queue
.pop_front()
Expand All @@ -400,7 +416,7 @@ impl UrlHandler<'_> {
self.args.output_dir.as_ref(),
&mut self.all_paths,
);
client.stream_close_send(client_stream_id).unwrap();
client.stream_close_send(client_stream_id, now).unwrap();
Box::new(DownloadStreamHandler { out_file })
}
"POST" => Box::new(UploadStreamHandler {
Expand Down Expand Up @@ -432,9 +448,9 @@ impl UrlHandler<'_> {
self.stream_handlers.is_empty() && self.url_queue.is_empty()
}

fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId) {
fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId, now: Instant) {
self.stream_handlers.remove(&stream_id);
self.process_urls(client);
self.process_urls(client, now);
}
}

Expand Down
4 changes: 2 additions & 2 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ async fn ready(
trait Handler {
type Client: Client;

fn handle(&mut self, client: &mut Self::Client) -> Res<bool>;
fn handle(&mut self, client: &mut Self::Client, now: Instant) -> Res<bool>;
fn take_token(&mut self) -> Option<ResumptionToken>;
}

Expand Down Expand Up @@ -418,7 +418,7 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
let handler_done = self.handler.handle(&mut self.client)?;
let handler_done = self.handler.handle(&mut self.client, Instant::now())?;
self.process_output().await?;
if self.client.has_events() {
continue;
Expand Down
17 changes: 9 additions & 8 deletions neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl super::HttpServer for HttpServer {
self.server.process(dgram, now)
}

fn process_events(&mut self, _now: Instant) {
fn process_events(&mut self, now: Instant) {
while let Some(event) = self.server.next_event() {
match event {
Http3ServerEvent::Headers {
Expand Down Expand Up @@ -117,7 +117,7 @@ impl super::HttpServer for HttpServer {
stream
.send_headers(&[Header::new(":status", "404")])
.unwrap();
stream.stream_close_send().unwrap();
stream.stream_close_send(now).unwrap();
continue;
}
}
Expand All @@ -135,20 +135,21 @@ impl super::HttpServer for HttpServer {
Header::new("content-length", response.len().to_string()),
])
.unwrap();
let done = response.send(|chunk| stream.send_data(chunk).unwrap());
let done = response.send(|chunk| stream.send_data(chunk, now).unwrap());
if done {
stream.stream_close_send().unwrap();
stream.stream_close_send(now).unwrap();
} else {
self.remaining_data.insert(stream.stream_id(), response);
}
}
Http3ServerEvent::DataWritable { stream } => {
if self.posts.get_mut(&stream).is_none() {
if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) {
let done = remaining.send(|chunk| stream.send_data(chunk).unwrap());
let done =
remaining.send(|chunk| stream.send_data(chunk, now).unwrap());
if done {
self.remaining_data.remove(&stream.stream_id());
stream.stream_close_send().unwrap();
stream.stream_close_send(now).unwrap();
}
}
}
Expand All @@ -164,8 +165,8 @@ impl super::HttpServer for HttpServer {
stream
.send_headers(&[Header::new(":status", "200")])
.unwrap();
stream.send_data(&msg).unwrap();
stream.stream_close_send().unwrap();
stream.send_data(&msg, now).unwrap();
stream.stream_close_send(now).unwrap();
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions neqo-common/src/qlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
io::BufWriter,
path::PathBuf,
rc::Rc,
time::SystemTime,
time::{Instant, SystemTime},
};

use qlog::{
Expand Down Expand Up @@ -95,33 +95,35 @@ impl NeqoQlog {
}

/// If logging enabled, closure may generate an event to be logged.
pub fn add_event<F>(&self, f: F)
pub fn add_event<F>(&self, f: F, now: Instant)
where
F: FnOnce() -> Option<qlog::events::Event>,
{
self.add_event_with_stream(|s| {
if let Some(evt) = f() {
s.add_event(evt)?;
s.add_event_with_instant(evt, now)?;
}
Ok(())
});
}

/// If logging enabled, closure may generate an event to be logged.
pub fn add_event_data<F>(&self, f: F)
pub fn add_event_data<F>(&self, f: F, now: Instant)
where
F: FnOnce() -> Option<qlog::events::EventData>,
{
self.add_event_with_stream(|s| {
if let Some(ev_data) = f() {
s.add_event_data_now(ev_data)?;
s.add_event_data_with_instant(ev_data, now)?;
}
Ok(())
});
}

/// If logging enabled, closure is given the Qlog stream to write events and
/// frames to.
//
// TODO: What about this one?
pub fn add_event_with_stream<F>(&self, f: F)
where
F: FnOnce(&mut QlogStreamer) -> Result<(), qlog::Error>,
Expand Down
17 changes: 12 additions & 5 deletions neqo-http3/src/buffered_send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::time::Instant;

use neqo_common::qtrace;
use neqo_transport::{Connection, StreamId};

Expand Down Expand Up @@ -61,7 +63,7 @@ impl BufferedStream {
/// # Errors
///
/// Returns `neqo_transport` errors.
pub fn send_buffer(&mut self, conn: &mut Connection) -> Res<usize> {
pub fn send_buffer(&mut self, conn: &mut Connection, now: Instant) -> Res<usize> {
let label = ::neqo_common::log_subject!(::log::Level::Debug, self);
let Self::Initialized { stream_id, buf } = self else {
return Ok(0);
Expand All @@ -79,16 +81,21 @@ impl BufferedStream {
let b = buf.split_off(sent);
*buf = b;
}
qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, sent);
qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, sent, now);
Ok(sent)
}

/// # Errors
///
/// Returns `neqo_transport` errors.
pub fn send_atomic(&mut self, conn: &mut Connection, to_send: &[u8]) -> Res<bool> {
pub fn send_atomic(
&mut self,
conn: &mut Connection,
to_send: &[u8],
now: Instant,
) -> Res<bool> {
// First try to send anything that is in the buffer.
self.send_buffer(conn)?;
self.send_buffer(conn, now)?;
let Self::Initialized { stream_id, buf } = self else {
return Ok(false);
};
Expand All @@ -97,7 +104,7 @@ impl BufferedStream {
}
let res = conn.stream_send_atomic(*stream_id, to_send)?;
if res {
qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, to_send.len());
qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, to_send.len(), now);
}
Ok(res)
}
Expand Down
Loading
Loading