diff --git a/Cargo.toml b/Cargo.toml index c7ba95a00..9a6c573cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,16 +18,13 @@ exclude = ["/.github"] edition = "2018" rust-version = "1.53" -[dependencies] -jobserver = { version = "0.1.16", optional = true } - [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false } [features] -parallel = ["jobserver"] +parallel = [] [dev-dependencies] tempfile = "3" diff --git a/gen-windows-sys-binding/windows_sys.list b/gen-windows-sys-binding/windows_sys.list index 82d11c9de..f5ed55e16 100644 --- a/gen-windows-sys-binding/windows_sys.list +++ b/gen-windows-sys-binding/windows_sys.list @@ -6,6 +6,12 @@ Windows.Win32.Foundation.SysFreeString Windows.Win32.Foundation.SysStringLen Windows.Win32.Foundation.S_FALSE Windows.Win32.Foundation.S_OK +Windows.Win32.Foundation.FALSE +Windows.Win32.Foundation.HANDLE +Windows.Win32.Foundation.WAIT_OBJECT_0 +Windows.Win32.Foundation.WAIT_TIMEOUT +Windows.Win32.Foundation.WAIT_FAILED +Windows.Win32.Foundation.WAIT_ABANDONED Windows.Win32.System.Com.SAFEARRAY Windows.Win32.System.Com.SAFEARRAYBOUND @@ -25,3 +31,10 @@ Windows.Win32.System.Registry.HKEY_LOCAL_MACHINE Windows.Win32.System.Registry.KEY_READ Windows.Win32.System.Registry.KEY_WOW64_32KEY Windows.Win32.System.Registry.REG_SZ + +Windows.Win32.System.Threading.ReleaseSemaphore +Windows.Win32.System.Threading.WaitForSingleObject +Windows.Win32.System.Threading.SEMAPHORE_MODIFY_STATE +Windows.Win32.System.Threading.THREAD_SYNCHRONIZE + +Windows.Win32.System.WindowsProgramming.OpenSemaphoreA diff --git a/src/async_executor.rs b/src/async_executor.rs new file mode 100644 index 000000000..ad9e62a65 --- /dev/null +++ b/src/async_executor.rs @@ -0,0 +1,118 @@ +use std::{ + cell::Cell, + future::Future, + pin::Pin, + ptr, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + thread, + time::Duration, +}; + +use crate::Error; + +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| NOOP_RAW_WAKER, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, +); +const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); + +#[derive(Default)] +pub(super) struct YieldOnce(bool); + +impl Future for YieldOnce { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + let flag = &mut std::pin::Pin::into_inner(self).0; + if !*flag { + *flag = true; + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +/// Execute the futures and return when they are all done. +/// +/// Here we use our own homebrew async executor since cc is used in the build +/// script of many popular projects, pulling in additional dependencies would +/// significantly slow down its compilation. +pub(super) fn block_on( + mut fut1: Fut1, + mut fut2: Fut2, + has_made_progress: &Cell, +) -> Result<(), Error> +where + Fut1: Future>, + Fut2: Future>, +{ + // Shadows the future so that it can never be moved and is guaranteed + // to be pinned. + // + // The same trick used in `pin!` macro. + // + // TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!` + let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) }); + let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) }); + + // TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version + // which it is stablised, replace this wth `Waker::noop`. + let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; + let mut context = Context::from_waker(&waker); + + let mut backoff_cnt = 0; + + loop { + has_made_progress.set(false); + + if let Some(fut) = fut2.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut2 = None; + res?; + } + } + + if let Some(fut) = fut1.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut1 = None; + res?; + } + } + + if fut1.is_none() && fut2.is_none() { + return Ok(()); + } + + if !has_made_progress.get() { + if backoff_cnt > 3 { + // We have yielded at least three times without making' + // any progress, so we will sleep for a while. + let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10)); + thread::sleep(duration); + } else { + // Given that we spawned a lot of compilation tasks, it is unlikely + // that OS cannot find other ready task to execute. + // + // If all of them are done, then we will yield them and spawn more, + // or simply return. + // + // Thus this will not be turned into a busy-wait loop and it will not + // waste CPU resource. + thread::yield_now(); + } + } + + backoff_cnt = if has_made_progress.get() { + 0 + } else { + backoff_cnt + 1 + }; + } +} diff --git a/src/job_token.rs b/src/job_token.rs index 818917c8d..4b4fa989e 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,139 +1,171 @@ -use jobserver::{Acquired, Client, HelperThread}; -use std::{ - env, - mem::MaybeUninit, - sync::{ - mpsc::{self, Receiver, Sender}, - Once, - }, -}; - -pub(crate) struct JobToken { - /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. - /// Both are valid values to put into queue. - token: Option, - /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead - /// of storing it back in the pool - see [`Self::forget()`] function for that. - pool: Option>>>, -} +use std::{mem::MaybeUninit, sync::Once}; + +use crate::Error; + +#[cfg(unix)] +#[path = "job_token/unix.rs"] +mod sys; + +#[cfg(windows)] +#[path = "job_token/windows.rs"] +mod sys; + +pub(super) struct JobToken(); impl Drop for JobToken { fn drop(&mut self) { - if let Some(pool) = &self.pool { - // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. - let _ = pool.send(self.token.take().map(|token| Ok(token))); + match JobTokenServer::new() { + JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), + JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), } } } -impl JobToken { - /// Ensure that this token is not put back into queue once it's dropped. - /// This also leads to releasing it sooner for other processes to use, - /// which is a correct thing to do once it is known that there won't be - /// any more token acquisitions. - pub(crate) fn forget(&mut self) { - self.pool.take(); - } -} - -/// A thin wrapper around jobserver's Client. -/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse -/// our own implicit token assigned for this build script. This struct manages that and -/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. -/// Furthermore, instead of giving up job tokens, it keeps them around -/// for reuse if we know we're going to request another token after freeing the current one. -pub(crate) struct JobTokenServer { - helper: HelperThread, - tx: Sender>>, - rx: Receiver>>, +pub(super) enum JobTokenServer { + Inherited(inherited_jobserver::JobServer), + InProcess(inprocess_jobserver::JobServer), } impl JobTokenServer { + /// This function returns a static reference to the jobserver because + /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might + /// be closed by other jobserver users in the process) and better do it + /// at the start of the program. + /// - in case a jobserver cannot be created from env (e.g. it's not + /// present), we will create a global in-process only jobserver + /// that has to be static so that it will be shared by all cc + /// compilation. pub(crate) fn new() -> &'static Self { - jobserver() - } - fn new_inner(client: Client) -> Result { - let (tx, rx) = mpsc::channel(); - // Push the implicit token. Since JobTokens only give back what they got, - // there should be at most one global implicit token in the wild. - tx.send(None).unwrap(); - let pool = tx.clone(); - let helper = client.into_helper_thread(move |acq| { - let _ = pool.send(Some(acq.map_err(|e| e.into()))); - })?; - Ok(Self { helper, tx, rx }) - } + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - pub(crate) fn acquire(&self) -> Result { - let token = if let Ok(token) = self.rx.try_recv() { - // Opportunistically check if there's a token that can be reused. - token - } else { - // Cold path, request a token and block - self.helper.request_token(); - self.rx.recv().unwrap() - }; - let token = if let Some(token) = token { - Some(token?) - } else { - None - }; - Ok(JobToken { - token, - pool: Some(self.tx.clone()), - }) + unsafe { + INIT.call_once(|| { + let server = inherited_jobserver::JobServer::from_env() + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); + JOBSERVER = MaybeUninit::new(server); + }); + // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } } -} -/// Returns a suitable `JobTokenServer` used to coordinate -/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures -/// that only one implicit job token is used in the wild. -/// Having multiple separate job token servers would lead to each of them assuming that they have control -/// over the implicit job token. -/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most -/// one implicit job token in the wild. -fn jobserver() -> &'static JobTokenServer { - static INIT: Once = Once::new(); - static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - - fn _assert_sync() {} - _assert_sync::(); - - unsafe { - INIT.call_once(|| { - let server = default_jobserver(); - JOBSERVER = MaybeUninit::new( - JobTokenServer::new_inner(server).expect("Job server initialization failed"), - ); - }); - // Poor man's assume_init_ref, as that'd require a MSRV of 1.55. - &*JOBSERVER.as_ptr() + pub(crate) fn try_acquire(&self) -> Result, Error> { + match self { + Self::Inherited(jobserver) => jobserver.try_acquire(), + Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), + } } } -unsafe fn default_jobserver() -> jobserver::Client { - // Try to use the environmental jobserver which Cargo typically - // initializes for us... - if let Some(client) = jobserver::Client::from_env() { - return client; +mod inherited_jobserver { + use super::{sys, Error, JobToken}; + + use std::{ + env::var_os, + sync::atomic::{ + AtomicBool, + Ordering::{AcqRel, Acquire}, + }, + }; + + pub(crate) struct JobServer { + /// Implicit token for this process which is obtained and will be + /// released in parent. Since JobTokens only give back what they got, + /// there should be at most one global implicit token in the wild. + /// + /// Since Rust does not execute any `Drop` for global variables, + /// we can't just put it back to jobserver and then re-acquire it at + /// the end of the process. + global_implicit_token: AtomicBool, + inner: sys::JobServerClient, } - // ... but if that fails for whatever reason select something - // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's - // configured by Cargo) and otherwise just fall back to a - // semi-reasonable number. Note that we could use `num_cpus` here - // but it's an extra dependency that will almost never be used, so - // it's generally not too worth it. - let mut parallelism = 4; - if let Ok(amt) = env::var("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - parallelism = amt; + impl JobServer { + pub(super) unsafe fn from_env() -> Option { + let var = var_os("CARGO_MAKEFLAGS") + .or_else(|| var_os("MAKEFLAGS")) + .or_else(|| var_os("MFLAGS"))?; + + let inner = sys::JobServerClient::open(var)?; + + Some(Self { + inner, + global_implicit_token: AtomicBool::new(true), + }) + } + + pub(super) fn try_acquire(&self) -> Result, Error> { + if !self.global_implicit_token.swap(false, AcqRel) { + // Cold path, no global implicit token, obtain one + if self.inner.try_acquire()?.is_none() { + return Ok(None); + } + } + Ok(Some(JobToken())) + } + + pub(super) fn release_token_raw(&self) { + // All tokens will be put back into the jobserver immediately + // and they cannot be cached, since Rust does not call `Drop::drop` + // on global variables. + if self + .global_implicit_token + .compare_exchange(false, true, AcqRel, Acquire) + .is_err() + { + // There's already a global implicit token, so this token must + // be released back into jobserver + let _ = self.inner.release(); + } } } +} + +mod inprocess_jobserver { + use super::JobToken; + + use std::{ + env::var, + sync::atomic::{ + AtomicU32, + Ordering::{AcqRel, Acquire}, + }, + }; + + pub(crate) struct JobServer(AtomicU32); - // If we create our own jobserver then be sure to reserve one token - // for ourselves. - let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); - client.acquire_raw().expect("failed to acquire initial"); - return client; + impl JobServer { + pub(super) fn new() -> Self { + // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise + // just fall back to a semi-reasonable number. + // + // Note that we could use `num_cpus` here but it's an extra + // dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + // TODO: Use std::thread::available_parallelism as an upper bound + // when MSRV is bumped. + if let Ok(amt) = var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + Self(AtomicU32::new(parallelism)) + } + + pub(super) fn try_acquire(&self) -> Option { + let res = self + .0 + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); + + res.ok().map(|_| JobToken()) + } + + pub(super) fn release_token_raw(&self) { + self.0.fetch_add(1, AcqRel); + } + } } diff --git a/src/job_token/unix.rs b/src/job_token/unix.rs new file mode 100644 index 000000000..af626c24d --- /dev/null +++ b/src/job_token/unix.rs @@ -0,0 +1,185 @@ +use std::{ + ffi::{OsStr, OsString}, + fs::{self, File}, + io::{self, Read, Write}, + mem::ManuallyDrop, + os::{ + raw::c_int, + unix::{ + ffi::{OsStrExt, OsStringExt}, + prelude::*, + }, + }, + path::Path, +}; + +pub(super) struct JobServerClient { + read: File, + write: Option, +} + +impl JobServerClient { + pub(super) unsafe fn open(var: OsString) -> Option { + let bytes = var.into_vec(); + + let s = bytes + .split(u8::is_ascii_whitespace) + .filter_map(|arg| { + arg.strip_prefix(b"--jobserver-fds=") + .or_else(|| arg.strip_prefix(b"--jobserver-auth=")) + }) + .find(|bytes| !bytes.is_empty())?; + + if let Some(fifo) = s.strip_prefix(b"fifo:") { + Self::from_fifo(Path::new(OsStr::from_bytes(fifo))) + } else { + Self::from_pipe(OsStr::from_bytes(s).to_str()?) + } + } + + /// `--jobserver-auth=fifo:PATH` + fn from_fifo(path: &Path) -> Option { + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(path) + .ok()?; + + if is_pipe(&file)? { + // File in Rust is always closed-on-exec as long as it's opened by + // `File::open` or `fs::OpenOptions::open`. + set_nonblocking(&file)?; + + Some(Self { + read: file, + write: None, + }) + } else { + None + } + } + + /// `--jobserver-auth=fd-for-R,fd-for-W` + unsafe fn from_pipe(s: &str) -> Option { + let (read, write) = s.split_once(',')?; + + let read = read.parse().ok()?; + let write = write.parse().ok()?; + + let read = ManuallyDrop::new(File::from_raw_fd(read)); + let write = ManuallyDrop::new(File::from_raw_fd(write)); + + // Ok so we've got two integers that look like file descriptors, but + // for extra sanity checking let's see if they actually look like + // instances of a pipe before we return the client. + // + // If we're called from `make` *without* the leading + on our rule + // then we'll have `MAKEFLAGS` env vars but won't actually have + // access to the file descriptors. + match ( + is_pipe(&read), + is_pipe(&write), + get_access_mode(&read), + get_access_mode(&write), + ) { + ( + Some(true), + Some(true), + Some(libc::O_RDONLY) | Some(libc::O_RDWR), + Some(libc::O_WRONLY) | Some(libc::O_RDWR), + ) => { + let read = read.try_clone().ok()?; + let write = write.try_clone().ok()?; + + // Set read and write end to nonblocking + set_nonblocking(&read)?; + set_nonblocking(&write)?; + + Some(Self { + read, + write: Some(write), + }) + } + _ => None, + } + } + + pub(super) fn try_acquire(&self) -> io::Result> { + let mut fds = [libc::pollfd { + fd: self.read.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }]; + + let ret = cvt(unsafe { libc::poll(fds.as_mut_ptr(), 1, 0) })?; + if ret == 1 { + let mut buf = [0]; + match (&self.read).read(&mut buf) { + Ok(1) => Ok(Some(())), + Ok(_) => Ok(None), // 0, eof + Err(e) + if e.kind() == io::ErrorKind::Interrupted + || e.kind() == io::ErrorKind::WouldBlock => + { + Ok(None) + } + Err(e) => Err(e), + } + } else { + Ok(None) + } + } + + pub(super) fn release(&self) -> io::Result<()> { + // For write to block, this would mean that pipe is full. + // If all every release are pair with an acquire, then this cannot + // happen. + // + // If it does happen, it is likely a bug in the program using this + // crate or some other programs that use the same jobserver have a + // bug in their code. + // + // If that turns out to not be the case we'll get an error anyway! + let mut write = self.write.as_ref().unwrap_or(&self.read); + match write.write(&[b'+'])? { + 1 => Ok(()), + _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), + } + } +} + +fn set_nonblocking(file: &File) -> Option<()> { + // F_SETFL can only set the O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, and + // O_NONBLOCK flags. + // + // For pipe, only O_NONBLOCK is meaningful, so it is ok to + // not issue a F_GETFL fcntl syscall. + let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK) }; + + if ret == -1 { + None + } else { + Some(()) + } +} + +fn cvt(t: c_int) -> io::Result { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} + +fn is_pipe(file: &File) -> Option { + Some(file.metadata().ok()?.file_type().is_fifo()) +} + +fn get_access_mode(file: &File) -> Option { + let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_GETFL) }; + if ret == -1 { + return None; + } + + Some(ret & libc::O_ACCMODE) +} diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs new file mode 100644 index 000000000..03ec8869b --- /dev/null +++ b/src/job_token/windows.rs @@ -0,0 +1,76 @@ +use std::{ + ffi::{CString, OsString}, + io, ptr, +}; + +use crate::windows_sys::{ + OpenSemaphoreA, ReleaseSemaphore, WaitForSingleObject, FALSE, HANDLE, SEMAPHORE_MODIFY_STATE, + THREAD_SYNCHRONIZE, WAIT_ABANDONED, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, +}; + +pub(super) struct JobServerClient { + sem: HANDLE, +} + +unsafe impl Sync for JobServerClient {} +unsafe impl Send for JobServerClient {} + +impl JobServerClient { + pub(super) unsafe fn open(var: OsString) -> Option { + let var = var.to_str()?; + if !var.is_ascii() { + // `OpenSemaphoreA` only accepts ASCII, not utf-8. + // + // Upstream implementation jobserver and jobslot also uses the + // same function and they works without problem, so there's no + // motivation to support utf-8 here using `OpenSemaphoreW` + // which only makes the code harder to maintain by making it more + // different than upstream. + return None; + } + + let s = var + .split_ascii_whitespace() + .filter_map(|arg| arg.strip_prefix("--jobserver-auth=")) + .find(|s| !s.is_empty())?; + + let name = CString::new(s).ok()?; + + let sem = OpenSemaphoreA( + THREAD_SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, + FALSE, + name.as_bytes().as_ptr(), + ); + if sem != ptr::null_mut() { + Some(Self { sem }) + } else { + None + } + } + + pub(super) fn try_acquire(&self) -> io::Result> { + match unsafe { WaitForSingleObject(self.sem, 0) } { + WAIT_OBJECT_0 => Ok(Some(())), + WAIT_TIMEOUT => Ok(None), + WAIT_FAILED => Err(io::Error::last_os_error()), + // We believe this should be impossible for a semaphore, but still + // check the error code just in case it happens. + WAIT_ABANDONED => Err(io::Error::new( + io::ErrorKind::Other, + "Wait on jobserver semaphore returned WAIT_ABANDONED", + )), + _ => unreachable!("Unexpected return value from WaitForSingleObject"), + } + } + + pub(super) fn release(&self) -> io::Result<()> { + // SAFETY: ReleaseSemaphore will write to prev_count is it is Some + // and release semaphore self.sem by 1. + let r = unsafe { ReleaseSemaphore(self.sem, 1, ptr::null_mut()) }; + if r != 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 77f1a1db1..86e55f438 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,8 @@ use std::process::{Child, Command, Stdio}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "parallel")] +mod async_executor; #[cfg(feature = "parallel")] mod job_token; mod os_pipe; @@ -1297,7 +1299,9 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::mpsc; + use std::cell::Cell; + + use async_executor::{block_on, YieldOnce}; if objs.len() <= 1 { for obj in objs { @@ -1330,116 +1334,93 @@ impl Build { // acquire the appropriate tokens, Once all objects have been compiled // we wait on all the processes and propagate the results of compilation. - let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>(); + let pendings = Cell::new(Vec::<( + Command, + String, + KillOnDrop, + crate::job_token::JobToken, + )>::new()); + let is_disconnected = Cell::new(false); + let has_made_progress = Cell::new(false); - // Since jobserver::Client::acquire can block, waiting - // must be done in parallel so that acquire won't block forever. - let wait_thread = thread::Builder::new().spawn(move || { + let wait_future = async { let mut error = None; - let mut pendings = Vec::new(); // Buffer the stdout let mut stdout = io::BufWriter::with_capacity(128, io::stdout()); - let mut backoff_cnt = 0; loop { - let mut has_made_progress = false; // If the other end of the pipe is already disconnected, then we're not gonna get any new jobs, // so it doesn't make sense to reuse the tokens; in fact, // releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. // Imagine that the last file built takes an hour to finish; in this scenario, // by not releasing the tokens before that last file is done we would effectively block other processes from // starting sooner - even though we only need one token for that last file, not N others that were acquired. - let mut is_disconnected = false; - // Reading new pending tasks - loop { - match rx.try_recv() { - Ok(pending) => { - has_made_progress = true; - pendings.push(pending) - } - Err(mpsc::TryRecvError::Disconnected) if pendings.is_empty() => { - let _ = stdout.flush(); - return if let Some(err) = error { - Err(err) - } else { - Ok(()) - }; - } - Err(mpsc::TryRecvError::Disconnected) => { - is_disconnected = true; - break; - } - _ => break, - } - } - // Try waiting on them. - retain_unordered_mut(&mut pendings, |(cmd, program, child, token)| { - match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { - Ok(Some(())) => { - // Task done, remove the entry - if is_disconnected { - token.forget(); - } - has_made_progress = true; - false - } - Ok(None) => true, // Task still not finished, keep the entry - Err(err) => { - // Task fail, remove the entry. - has_made_progress = true; - if is_disconnected { - token.forget(); + let mut pendings_is_empty = false; + + cell_update(&pendings, |mut pendings| { + // Try waiting on them. + retain_unordered_mut(&mut pendings, |(cmd, program, child, _token)| { + match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { + Ok(Some(())) => { + // Task done, remove the entry + has_made_progress.set(true); + false } - // Since we can only return one error, log the error to make - // sure users always see all the compilation failures. - let _ = writeln!(stdout, "cargo:warning={}", err); - error = Some(err); + Ok(None) => true, // Task still not finished, keep the entry + Err(err) => { + // Task fail, remove the entry. + // Since we can only return one error, log the error to make + // sure users always see all the compilation failures. + has_made_progress.set(true); - false + let _ = writeln!(stdout, "cargo:warning={}", err); + error = Some(err); + + false + } } - } + }); + pendings_is_empty = pendings.is_empty(); + pendings }); - if !has_made_progress { - if backoff_cnt > 3 { - // We have yielded at least three times without making' - // any progress, so we will sleep for a while. - let duration = - std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10)); - thread::sleep(duration); + if pendings_is_empty && is_disconnected.get() { + break if let Some(err) = error { + Err(err) } else { - // Given that we spawned a lot of compilation tasks, it is unlikely - // that OS cannot find other ready task to execute. - // - // If all of them are done, then we will yield them and spawn more, - // or simply returns. - // - // Thus this will not be turned into a busy-wait loop and it will not - // waste CPU resource. - thread::yield_now(); - } + Ok(()) + }; } - backoff_cnt = if has_made_progress { - 0 - } else { - backoff_cnt + 1 + YieldOnce::default().await; + } + }; + let spawn_future = async { + for obj in objs { + let (mut cmd, program) = self.create_compile_object_cmd(obj)?; + let token = loop { + if let Some(token) = tokens.try_acquire()? { + break token; + } else { + YieldOnce::default().await + } }; + let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; + + cell_update(&pendings, |mut pendings| { + pendings.push((cmd, program, KillOnDrop(child), token)); + pendings + }); + + has_made_progress.set(true); } - })?; - for obj in objs { - let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = tokens.acquire()?; - let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; + is_disconnected.set(true); - tx.send((cmd, program, KillOnDrop(child), token)) - .expect("Wait thread must be alive until all compilation jobs are done, otherwise we risk deadlock"); - } - // Drop tx so that the wait_thread could return - drop(tx); + Ok::<_, Error>(()) + }; - return wait_thread.join().expect("wait_thread panics"); + return block_on(wait_future, spawn_future, &has_made_progress); struct KillOnDrop(Child); @@ -1450,6 +1431,16 @@ impl Build { child.kill().ok(); } } + + fn cell_update(cell: &Cell, f: F) + where + T: Default, + F: FnOnce(T) -> T, + { + let old = cell.take(); + let new = f(old); + cell.set(new); + } } #[cfg(not(feature = "parallel"))] diff --git a/src/windows_sys.rs b/src/windows_sys.rs index ee4704d25..20a256076 100644 --- a/src/windows_sys.rs +++ b/src/windows_sys.rs @@ -62,6 +62,22 @@ extern "system" { nsize: u32, ) -> BOOL; } +#[link(name = "kernel32")] +extern "system" { + pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE; +} +#[link(name = "kernel32")] +extern "system" { + pub fn ReleaseSemaphore( + hsemaphore: HANDLE, + lreleasecount: i32, + lppreviouscount: *mut i32, + ) -> BOOL; +} +#[link(name = "kernel32")] +extern "system" { + pub fn WaitForSingleObject(hhandle: HANDLE, dwmilliseconds: u32) -> WIN32_ERROR; +} #[link(name = "ole32")] extern "system" { pub fn CoCreateInstance( @@ -93,6 +109,7 @@ pub type COINIT = i32; pub const COINIT_MULTITHREADED: COINIT = 0i32; pub const ERROR_NO_MORE_ITEMS: WIN32_ERROR = 259u32; pub const ERROR_SUCCESS: WIN32_ERROR = 0u32; +pub const FALSE: BOOL = 0i32; #[repr(C)] pub struct FILETIME { pub dwLowDateTime: u32, @@ -135,6 +152,7 @@ pub const INVALID_HANDLE_VALUE: HANDLE = invalid_mut(-1i32 as _); pub type IUnknown = *mut ::core::ffi::c_void; pub const KEY_READ: REG_SAM_FLAGS = 131097u32; pub const KEY_WOW64_32KEY: REG_SAM_FLAGS = 512u32; +pub type PCSTR = *const u8; pub type PCWSTR = *const u16; pub type PWSTR = *mut u16; pub type REG_SAM_FLAGS = u32; @@ -178,8 +196,16 @@ impl ::core::clone::Clone for SECURITY_ATTRIBUTES { *self } } +pub const SEMAPHORE_MODIFY_STATE: SYNCHRONIZATION_ACCESS_RIGHTS = 2u32; +pub type SYNCHRONIZATION_ACCESS_RIGHTS = u32; pub const S_FALSE: HRESULT = 1i32; pub const S_OK: HRESULT = 0i32; +pub type THREAD_ACCESS_RIGHTS = u32; +pub const THREAD_SYNCHRONIZE: THREAD_ACCESS_RIGHTS = 1048576u32; +pub const WAIT_ABANDONED: WIN32_ERROR = 128u32; +pub const WAIT_FAILED: WIN32_ERROR = 4294967295u32; +pub const WAIT_OBJECT_0: WIN32_ERROR = 0u32; +pub const WAIT_TIMEOUT: WIN32_ERROR = 258u32; pub type WIN32_ERROR = u32; /// Adapted from