Skip to content

Commit

Permalink
explicitly wait for background tasks to cleanup before exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
arcnmx committed Dec 13, 2020
1 parent 8e21e57 commit 91dbda8
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 18 deletions.
19 changes: 13 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use config::{Config, ConfigEvent, ConfigSourceName};
use event::{Hotkey, UserEvent, ProcessedXEvent};
use qemu::Qemu;
use route::Route;
use spawner::Spawner;
use sources::Sources;
use process::Process;
use ddc::{Monitor, DdcMonitor};
Expand All @@ -35,27 +36,32 @@ mod sources;
mod exec;
mod process;
mod util;
mod spawner;

type Events = event::Events<Arc<ConfigEvent>>;

const EVENT_BUFFER: usize = 8;

fn main() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let code = match runtime.block_on(main_result()) {
let spawner = Arc::new(Spawner::new());

let code = match runtime.block_on(main_result(&spawner)) {
Ok(code) => code,
Err(e) => {
let _ = writeln!(io::stderr(), "{:?} {}", e, e);
1
},
};

runtime.shutdown_timeout(Duration::from_secs(2));
runtime.block_on(spawner.join_timeout(Duration::from_secs(2))).unwrap();

runtime.shutdown_timeout(Duration::from_secs(1));

exit(code);
}

async fn main_result() -> Result<i32, Error> {
async fn main_result(spawner: &Arc<Spawner>) -> Result<i32, Error> {
env_logger::init();

let app = App::new("screenstub")
Expand Down Expand Up @@ -161,6 +167,7 @@ async fn main_result() -> Result<i32, Error> {
let process = Process::new(
config.qemu.routing, keyboard_driver, relative_driver, absolute_driver, config.exit_events,
qemu.clone(), events.clone(), sources, xreq_sender.clone(), event_sender.clone(), error_sender.clone(),
spawner.clone(),
);

process.devices_init().await?;
Expand All @@ -181,7 +188,7 @@ async fn main_result() -> Result<i32, Error> {
.x_config_key(repeat)
.id(&uinput_id);
}
let mut events_keyboard = route_keyboard.spawn(error_sender.clone());
let mut events_keyboard = route_keyboard.spawn(spawner, error_sender.clone());

let mut route_relative = Route::new(config.qemu.routing, qemu.clone(), "screenstub-route-mouse".into(), bus.clone(), repeat);
if let Some(builder) = route_relative.builder() {
Expand All @@ -190,7 +197,7 @@ async fn main_result() -> Result<i32, Error> {
.x_config_rel()
.id(&uinput_id);
}
let mut events_relative = route_relative.spawn(error_sender.clone());
let mut events_relative = route_relative.spawn(spawner, error_sender.clone());

let mut route_absolute = Route::new(config.qemu.routing, qemu.clone(), "screenstub-route-tablet".into(), bus, repeat);
if let Some(builder) = route_absolute.builder() {
Expand All @@ -199,7 +206,7 @@ async fn main_result() -> Result<i32, Error> {
.x_config_abs()
.id(&uinput_id);
}
let mut events_absolute = route_absolute.spawn(error_sender.clone());
let mut events_absolute = route_absolute.spawn(spawner, error_sender.clone());

let x_filter = process.x_filter();

Expand Down
8 changes: 6 additions & 2 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::grab::GrabEvdev;
use crate::exec::exec;
use x::XRequest;
use crate::Events;
use crate::spawner::Spawner;
use log::{trace, info, error};

pub struct GrabHandle {
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct Process {
event_sender: un_mpsc::Sender<InputEvent>,
error_sender: un_mpsc::Sender<Error>,
uinput_id: Arc<InputId>,
spawner: Arc<Spawner>,
}

#[derive(Debug, Copy, Clone)]
Expand All @@ -59,7 +61,7 @@ enum InputDevice {
}

impl Process {
pub fn new(routing: ConfigQemuRouting, driver_keyboard: ConfigQemuDriver, driver_relative: ConfigQemuDriver, driver_absolute: ConfigQemuDriver, exit_events: Vec<config::ConfigEvent>, qemu: Arc<Qemu>, events: Arc<Events>, sources: Sources, xreq_sender: un_mpsc::Sender<XRequest>, event_sender: un_mpsc::Sender<InputEvent>, error_sender: un_mpsc::Sender<Error>) -> Self {
pub fn new(routing: ConfigQemuRouting, driver_keyboard: ConfigQemuDriver, driver_relative: ConfigQemuDriver, driver_absolute: ConfigQemuDriver, exit_events: Vec<config::ConfigEvent>, qemu: Arc<Qemu>, events: Arc<Events>, sources: Sources, xreq_sender: un_mpsc::Sender<XRequest>, event_sender: un_mpsc::Sender<InputEvent>, error_sender: un_mpsc::Sender<Error>, spawner: Arc<Spawner>) -> Self {
Process {
routing,
driver_keyboard,
Expand All @@ -80,6 +82,7 @@ impl Process {
product: 0x05df,
version: 1,
}),
spawner,
}
}

Expand Down Expand Up @@ -170,6 +173,7 @@ impl Process {
let driver_relative = self.driver_relative;
let driver_absolute = self.driver_absolute;
let prev_is_mouse = self.is_mouse();
let spawner = self.spawner.clone();
let grab = GrabEvdev::new(devices, evdev_ignore.iter().cloned());

async move {
Expand Down Expand Up @@ -198,7 +202,7 @@ impl Process {
grab.grab(true)?;
}

uinput.spawn(error_sender.clone())
uinput.spawn(&spawner, error_sender.clone())
} else {
event_sender.unwrap()
};
Expand Down
21 changes: 11 additions & 10 deletions src/route.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::path::Path;
use std::task::Poll;
use std::time::Duration;
use std::sync::Once;
use std::pin::Pin;
use std::iter;
use tokio::time::{Duration, Instant};
use input::{InputEvent, EventRef, KeyEvent, Key, RelativeAxis, AbsoluteAxis};
use futures::channel::mpsc;
use futures::{StreamExt, SinkExt, Future, FutureExt, TryFutureExt};
Expand All @@ -15,6 +15,7 @@ use qapi::{qmp, Any};
use qemu::Qemu;
use uinput;
use log::warn;
use crate::spawner::Spawner;

pub struct RouteQmp {
qemu: Arc<Qemu>,
Expand Down Expand Up @@ -94,10 +95,10 @@ impl RouteQmp {
e.into_iter().map(move |ref e| Self::convert_event(e, qkeycodes)).filter_map(|e| e)
}

pub fn spawn(&self, mut events: mpsc::Receiver<InputEvent>, mut error_sender: mpsc::Sender<Error>) {
pub fn spawn(&self, spawner: &Spawner, mut events: mpsc::Receiver<InputEvent>, mut error_sender: mpsc::Sender<Error>) {
let qemu = self.qemu.clone();
let qkeycodes = self.qkeycodes.clone();
tokio::spawn(async move {
spawner.spawn(async move {
let qmp = qemu.connect_qmp().await?;
let mut cmd = qmp::input_send_event {
device: Default::default(),
Expand Down Expand Up @@ -186,7 +187,7 @@ impl UInputCommands for RouteUInputVirtio {
let command = qmp::device_add::new(name, Some(self.id.clone()), self.bus.clone(), vec![
("evdev".into(), Any::String(path.display().to_string())),
]);
let deadline = tokio::time::Instant::now() + Duration::from_millis(512); // HACK: wait for udev to see device and change permissions
let deadline = Instant::now() + Duration::from_millis(512); // HACK: wait for udev to see device and change permissions
let qemu = qemu.clone();
async move {
qemu.device_add(command, deadline).await
Expand Down Expand Up @@ -244,11 +245,11 @@ impl<U> RouteUInput<U> {
}

impl<U: UInputCommands> RouteUInput<U> {
pub fn spawn(&self, mut events: mpsc::Receiver<InputEvent>, mut error_sender: mpsc::Sender<Error>) {
pub fn spawn(&self, spawner: &Spawner, mut events: mpsc::Receiver<InputEvent>, mut error_sender: mpsc::Sender<Error>) {
let qemu = self.qemu.clone();
let uinput = self.builder.create();
let commands = self.commands.clone();
tokio::spawn(async move {
spawner.spawn(async move {
let uinput = uinput?;
let path = uinput.path().to_owned();
let mut uinput = uinput.to_sink()?;
Expand Down Expand Up @@ -297,13 +298,13 @@ impl Route {
}
}

pub fn spawn(self, error_sender: mpsc::Sender<Error>) -> mpsc::Sender<InputEvent> {
pub fn spawn(self, spawner: &Spawner, error_sender: mpsc::Sender<Error>) -> mpsc::Sender<InputEvent> {
let (sender, events) = mpsc::channel(crate::EVENT_BUFFER);

match self {
Route::InputLinux(ref uinput) => uinput.spawn(events, error_sender),
Route::VirtioHost(ref uinput) => uinput.spawn(events, error_sender),
Route::Qmp(ref qmp) => qmp.spawn(events, error_sender),
Route::InputLinux(ref uinput) => uinput.spawn(spawner, events, error_sender),
Route::VirtioHost(ref uinput) => uinput.spawn(spawner, events, error_sender),
Route::Qmp(ref qmp) => qmp.spawn(spawner, events, error_sender),
}

sender
Expand Down
33 changes: 33 additions & 0 deletions src/spawner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::sync::Mutex;
use tokio::time::{Duration, Instant, timeout_at};
use tokio::task::JoinHandle;
use futures::{Future, future};
use failure::Error;

pub struct Spawner {
handles: Mutex<Vec<JoinHandle<()>>>,
}

impl Spawner {
pub fn new() -> Self {
Self {
handles: Mutex::new(Vec::new()),
}
}

pub fn spawn<F: Future<Output=()> + Send + 'static>(&self, f: F) {
let handle = tokio::spawn(f);
self.handles.lock().unwrap().push(handle);
}

pub async fn join_timeout(&self, timeout: Duration) -> Result<(), Error> {
let deadline = Instant::now() + timeout;
loop {
let handles: Vec<_> = self.handles.lock().unwrap().drain(..).collect();
if handles.is_empty() {
break Ok(())
}
let _: Vec<()> = timeout_at(deadline, future::try_join_all(handles)).await??;
}
}
}

0 comments on commit 91dbda8

Please sign in to comment.