diff --git a/benches/latency.rs b/benches/latency.rs index 477b7f8f..da9ef9c0 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -107,8 +107,10 @@ fn latency_by_native_framed_tcp(c: &mut Criterion) { b.iter(|| { let encoded_size = encoding::encode_size(&[0xFF], &mut framming); + sender.set_nodelay(false).ok(); sender.write(&encoded_size).unwrap(); sender.write(&[0xFF]).unwrap(); + sender.set_nodelay(true).ok(); let mut message_received = false; while !message_received { diff --git a/examples/throughput/main.rs b/examples/throughput/main.rs index 971fd82e..c7cb5e0e 100644 --- a/examples/throughput/main.rs +++ b/examples/throughput/main.rs @@ -193,8 +193,10 @@ fn throughput_native_framed_tcp(packet_size: usize) { let start_time = Instant::now(); while total_sent < EXPECTED_BYTES { let encoded_size = encoding::encode_size(&message, &mut framming); + sender.set_nodelay(false).ok(); sender.write(&encoded_size).unwrap(); sender.write(&message).unwrap(); + sender.set_nodelay(true).ok(); total_sent += message.len(); } start_time diff --git a/src/adapters/framed_tcp.rs b/src/adapters/framed_tcp.rs index 9218ea76..9d41f5fb 100644 --- a/src/adapters/framed_tcp.rs +++ b/src/adapters/framed_tcp.rs @@ -88,15 +88,21 @@ impl Remote for RemoteResource { let mut buf = [0; MAX_ENCODED_SIZE]; // used to avoid a heap allocation let encoded_size = encoding::encode_size(data, &mut buf); + let stream = &self.stream; + // We want to send the message as a whole whatever it can be possible. + // In this protocol, sending few bytes than the message has no sense and adds latency: + // by the network sending small chunks, and by the receiver allocating memory to decode them. + // If the target is throughput, use TCP instead. + stream.set_nodelay(false).ok(); + let mut total_bytes_sent = 0; let total_bytes = encoded_size.len() + data.len(); - loop { + let status = loop { let data_to_send = match total_bytes_sent < encoded_size.len() { true => &encoded_size[total_bytes_sent..], false => &data[total_bytes_sent - encoded_size.len()..], }; - let stream = &self.stream; match stream.deref().write(data_to_send) { Ok(bytes_sent) => { total_bytes_sent += bytes_sent; @@ -110,7 +116,13 @@ impl Remote for RemoteResource { break SendStatus::ResourceNotFound // should not happen } } - } + }; + + // We have already the entire message in the OS buffer, send now, not wait for the next one. + // The message in this protocol has an information meanless. + // The user can process already this unit of data. Do not wait for other possible message. + stream.set_nodelay(true).ok(); + status } }