Skip to content

Commit

Permalink
Refact. Replace all tokio::time::interval() (rustdesk#7173)
Browse files Browse the repository at this point in the history
* Refact. Replace all `tokio::time::interval()`

Signed-off-by: fufesou <[email protected]>

* Refact Better min_interval for `ThrottledInterval`.

Signed-off-by: fufesou <[email protected]>

---------

Signed-off-by: fufesou <[email protected]>
  • Loading branch information
fufesou authored Feb 18, 2024
1 parent 5fdcc74 commit 8c10806
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 39 deletions.
14 changes: 7 additions & 7 deletions src/client/io_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use hbb_common::{
tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
time::{self, Duration, Instant},
},
Stream,
};
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct Remote<T: InvokeUiSession> {
read_jobs: Vec<fs::TransferJob>,
write_jobs: Vec<fs::TransferJob>,
remove_jobs: HashMap<i32, RemoveJob>,
timer: Interval,
timer: crate::RustDeskInterval,
last_update_jobs_status: (Instant, HashMap<i32, u64>),
is_connected: bool,
first_frame: bool,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<T: InvokeUiSession> Remote<T> {
read_jobs: Vec::new(),
write_jobs: Vec::new(),
remove_jobs: Default::default(),
timer: time::interval(SEC30),
timer: crate::rustdesk_interval(time::interval(SEC30)),
last_update_jobs_status: (Instant::now(), Default::default()),
is_connected: false,
first_frame: false,
Expand Down Expand Up @@ -170,7 +170,7 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
let mut rx_clip_client = rx_clip_client_lock.lock().await;

let mut status_timer = time::interval(Duration::new(1, 0));
let mut status_timer = crate::rustdesk_interval(time::interval(Duration::new(1, 0)));
let mut fps_instant = Instant::now();

loop {
Expand Down Expand Up @@ -228,7 +228,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
self.update_jobs_status();
} else {
self.timer = time::interval_at(Instant::now() + SEC30, SEC30);
self.timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30));
}
}
_ = status_timer.tick() => {
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
let total_size = job.total_size();
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
self.timer = crate::rustdesk_interval(time::interval(MILLI1));
allow_err!(
peer.send(&fs::new_receive(id, to, file_num, files, total_size))
.await
Expand Down Expand Up @@ -597,7 +597,7 @@ impl<T: InvokeUiSession> Remote<T> {
);
job.is_last_job = true;
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
self.timer = crate::rustdesk_interval(time::interval(MILLI1));
}
}
}
Expand Down
127 changes: 124 additions & 3 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
future::Future,
sync::{Arc, Mutex, RwLock},
task::Poll,
};

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -132,15 +133,20 @@ use hbb_common::{
bytes::Bytes,
compress::compress as compress_func,
config::{self, Config, CONNECT_TIMEOUT, READ_TIMEOUT},
futures_util::future::poll_fn,
get_version_number, log,
message_proto::*,
protobuf::Enum,
protobuf::Message as _,
protobuf::{Enum, Message as _},
rendezvous_proto::*,
socket_client,
sodiumoxide::crypto::{box_, secretbox, sign},
tcp::FramedStream,
timeout, tokio, ResultType,
timeout,
tokio::{
self,
time::{Duration, Instant, Interval},
},
ResultType,
};
// #[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))]
use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all};
Expand Down Expand Up @@ -1335,3 +1341,118 @@ pub fn using_public_server() -> bool {
&& crate::get_custom_rendezvous_server(get_option("custom-rendezvous-server")).is_empty()
}

pub struct ThrottledInterval {
interval: Interval,
last_tick: Instant,
min_interval: Duration,
}

impl ThrottledInterval {
pub fn new(i: Interval) -> ThrottledInterval {
let period = i.period();
ThrottledInterval {
interval: i,
last_tick: Instant::now() - period * 2,
min_interval: Duration::from_secs_f64(period.as_secs_f64() * 0.9),
}
}

pub async fn tick(&mut self) -> Instant {
loop {
let instant = poll_fn(|cx| self.poll_tick(cx));
if let Some(instant) = instant.await {
return instant;
}
}
}

pub fn poll_tick(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Option<Instant>> {
match self.interval.poll_tick(cx) {
Poll::Ready(instant) => {
if self.last_tick.elapsed() >= self.min_interval {
self.last_tick = Instant::now();
Poll::Ready(Some(instant))
} else {
// This call is required since tokio 1.27
cx.waker().wake_by_ref();
Poll::Ready(None)
}
}
Poll::Pending => {
Poll::Pending
},
}
}
}

pub type RustDeskInterval = ThrottledInterval;

#[inline]
pub fn rustdesk_interval(i: Interval) -> ThrottledInterval {
ThrottledInterval::new(i)
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{format::StrftimeItems, Local};
use hbb_common::tokio::{
self,
time::{interval, sleep, Duration},
};
use std::collections::HashSet;

#[tokio::test]
async fn test_tokio_time_interval() {
let mut timer = interval(Duration::from_secs(1));
let mut times = Vec::new();
sleep(Duration::from_secs(3)).await;
loop {
tokio::select! {
_ = timer.tick() => {
let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S");
times.push(Local::now().format_with_items(format).to_string());
if times.len() == 5 {
break;
}
}
}
}
let times2: HashSet<String> = HashSet::from_iter(times.clone());
assert_eq!(times.len(), times2.len() + 3);
}

#[allow(non_snake_case)]
#[tokio::test]
async fn test_RustDesk_interval() {
let mut timer = rustdesk_interval(interval(Duration::from_secs(1)));
let mut times = Vec::new();
sleep(Duration::from_secs(3)).await;
loop {
tokio::select! {
_ = timer.tick() => {
let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S");
times.push(Local::now().format_with_items(format).to_string());
if times.len() == 5 {
break;
}
}
}
}
let times2: HashSet<String> = HashSet::from_iter(times.clone());
assert_eq!(times.len(), times2.len());
}

#[test]
fn test_duration_multiplication() {
let dur = Duration::from_secs(1);

assert_eq!(dur * 2, Duration::from_secs(2));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.9), Duration::from_millis(900));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923), Duration::from_millis(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-3), Duration::from_micros(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-6), Duration::from_nanos(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-9), Duration::from_nanos(1));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-10), Duration::from_nanos(0));
}
}
2 changes: 1 addition & 1 deletion src/hbbs_http/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct StrategyOptions {
#[cfg(not(any(target_os = "ios")))]
#[tokio::main(flavor = "current_thread")]
async fn start_hbbs_sync_async() {
let mut interval = tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN);
let mut interval = crate::rustdesk_interval(tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN));
let mut last_sent: Option<Instant> = None;
let mut info_uploaded: (bool, String, Option<Instant>) = (false, "".to_owned(), None);
loop {
Expand Down
4 changes: 4 additions & 0 deletions src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
assert_eq!(
Expand All @@ -124,6 +125,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
// key in these tests is "foobar.,2" base64 encoded
Expand All @@ -136,6 +138,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "Zm9vYmFyLiwyCg==".to_owned(),
api: "abc".to_owned(),
relay: "".to_owned(),
}
);
assert_eq!(
Expand All @@ -145,6 +148,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "Zm9vYmFyLiwyCg==".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
}
Expand Down
10 changes: 2 additions & 8 deletions src/rendezvous_mediator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl RendezvousMediator {
keep_alive: DEFAULT_KEEP_ALIVE,
};

let mut timer = interval(TIMER_OUT);
let mut last_timer: Option<Instant> = None;
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
const MIN_REG_TIMEOUT: i64 = 3_000;
const MAX_REG_TIMEOUT: i64 = 30_000;
let mut reg_timeout = MIN_REG_TIMEOUT;
Expand Down Expand Up @@ -215,11 +214,6 @@ impl RendezvousMediator {
break;
}
let now = Some(Instant::now());
if last_timer.map(|x| x.elapsed() < TIMER_OUT).unwrap_or(false) {
// a workaround of tokio timer bug
continue;
}
last_timer = now;
let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true);
let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false);
// temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android
Expand Down Expand Up @@ -342,7 +336,7 @@ impl RendezvousMediator {
host_prefix: Self::get_host_prefix(&host),
keep_alive: DEFAULT_KEEP_ALIVE,
};
let mut timer = interval(TIMER_OUT);
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
let mut last_register_sent: Option<Instant> = None;
let mut last_recv_msg = Instant::now();
// we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.
Expand Down
19 changes: 10 additions & 9 deletions src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use hbb_common::{
tokio::{
net::TcpStream,
sync::mpsc,
time::{self, Duration, Instant, Interval},
time::{self, Duration, Instant},
},
tokio_util::codec::{BytesCodec, Framed},
};
Expand Down Expand Up @@ -175,8 +175,8 @@ pub struct Connection {
server: super::ServerPtrWeak,
hash: Hash,
read_jobs: Vec<fs::TransferJob>,
timer: Interval,
file_timer: Interval,
timer: crate::RustDeskInterval,
file_timer: crate::RustDeskInterval,
file_transfer: Option<(String, bool)>,
port_forward_socket: Option<Framed<TcpStream, BytesCodec>>,
port_forward_address: String,
Expand Down Expand Up @@ -327,8 +327,8 @@ impl Connection {
server,
hash,
read_jobs: Vec::new(),
timer: time::interval(SEC30),
file_timer: time::interval(SEC30),
timer: crate::rustdesk_interval(time::interval(SEC30)),
file_timer: crate::rustdesk_interval(time::interval(SEC30)),
file_transfer: None,
port_forward_socket: None,
port_forward_address: "".to_owned(),
Expand Down Expand Up @@ -419,7 +419,7 @@ impl Connection {
if !conn.block_input {
conn.send_permission(Permission::BlockInput, false).await;
}
let mut test_delay_timer = time::interval(TEST_DELAY_TIMEOUT);
let mut test_delay_timer = crate::rustdesk_interval(time::interval(TEST_DELAY_TIMEOUT));
let mut last_recv_time = Instant::now();

conn.stream.set_send_timeout(
Expand All @@ -432,7 +432,7 @@ impl Connection {

#[cfg(not(any(target_os = "android", target_os = "ios")))]
std::thread::spawn(move || Self::handle_input(_rx_input, tx_cloned));
let mut second_timer = time::interval(Duration::from_secs(1));
let mut second_timer = crate::rustdesk_interval(time::interval(Duration::from_secs(1)));

loop {
tokio::select! {
Expand Down Expand Up @@ -608,7 +608,7 @@ impl Connection {
}
}
} else {
conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30);
conn.file_timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30));
}
}
Ok(conns) = hbbs_rx.recv() => {
Expand Down Expand Up @@ -2054,7 +2054,8 @@ impl Connection {
job.is_remote = true;
job.conn_id = self.inner.id();
self.read_jobs.push(job);
self.file_timer = time::interval(MILLI1);
self.file_timer =
crate::rustdesk_interval(time::interval(MILLI1));
self.post_file_audit(
FileAuditType::RemoteSend,
&s.path,
Expand Down
5 changes: 3 additions & 2 deletions src/server/portable_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ pub mod server {

match ipc::connect(1000, postfix).await {
Ok(mut stream) => {
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer =
crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
let mut nack = 0;
loop {
tokio::select! {
Expand Down Expand Up @@ -777,7 +778,7 @@ pub mod client {
tokio::spawn(async move {
let mut stream = Connection::new(stream);
let postfix = postfix.to_owned();
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
let mut nack = 0;
let mut rx = rx_clone.lock().await;
loop {
Expand Down
2 changes: 1 addition & 1 deletion src/tray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn start_query_session_count(sender: std::sync::mpsc::Sender<Data>) {
let mut last_count = 0;
loop {
if let Ok(mut c) = crate::ipc::connect(1000, "").await {
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
loop {
tokio::select! {
res = c.next() => {
Expand Down
Loading

0 comments on commit 8c10806

Please sign in to comment.