From c3e577ff3ecbbf4423c78bf26f5eb4d9759366e8 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 21 Oct 2023 23:13:01 +1100 Subject: [PATCH 01/22] Impl vendored jobserver implementation It supports non-blocking `try_acquire` and is much simpler than the one provided by `jobserver` Signed-off-by: Jiahao XU --- Cargo.toml | 5 +- gen-windows-sys-binding/windows_sys.list | 10 + src/job_token.rs | 275 ++++++++++++++--------- src/job_token/unix.rs | 182 +++++++++++++++ src/job_token/windows.rs | 62 +++++ src/windows_sys.rs | 23 ++ 6 files changed, 442 insertions(+), 115 deletions(-) create mode 100644 src/job_token/unix.rs create mode 100644 src/job_token/windows.rs diff --git a/Cargo.toml b/Cargo.toml index c7ba95a00..9a6c573cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,16 +18,13 @@ exclude = ["/.github"] edition = "2018" rust-version = "1.53" -[dependencies] -jobserver = { version = "0.1.16", optional = true } - [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false } [features] -parallel = ["jobserver"] +parallel = [] [dev-dependencies] tempfile = "3" diff --git a/gen-windows-sys-binding/windows_sys.list b/gen-windows-sys-binding/windows_sys.list index 82d11c9de..6c84d7f98 100644 --- a/gen-windows-sys-binding/windows_sys.list +++ b/gen-windows-sys-binding/windows_sys.list @@ -6,6 +6,9 @@ Windows.Win32.Foundation.SysFreeString Windows.Win32.Foundation.SysStringLen Windows.Win32.Foundation.S_FALSE Windows.Win32.Foundation.S_OK +Windows.Win32.Foundation.FALSE +Windows.Win32.Foundation.HANDLE +Windows.Win32.Foundation.WAIT_OBJECT_0 Windows.Win32.System.Com.SAFEARRAY Windows.Win32.System.Com.SAFEARRAYBOUND @@ -25,3 +28,10 @@ Windows.Win32.System.Registry.HKEY_LOCAL_MACHINE Windows.Win32.System.Registry.KEY_READ Windows.Win32.System.Registry.KEY_WOW64_32KEY Windows.Win32.System.Registry.REG_SZ + +Windows.Win32.System.Threading.ReleaseSemaphore +Windows.Win32.System.Threading.WaitForSingleObject +Windows.Win32.System.Threading.SEMAPHORE_MODIFY_STATE +Windows.Win32.System.Threading.THREAD_SYNCHRONIZE + +Windows.Win32.System.WindowsProgramming.OpenSemaphoreA diff --git a/src/job_token.rs b/src/job_token.rs index 818917c8d..de462fcb8 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,29 +1,16 @@ -use jobserver::{Acquired, Client, HelperThread}; -use std::{ - env, - mem::MaybeUninit, - sync::{ - mpsc::{self, Receiver, Sender}, - Once, - }, -}; - -pub(crate) struct JobToken { - /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. - /// Both are valid values to put into queue. - token: Option, - /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead - /// of storing it back in the pool - see [`Self::forget()`] function for that. - pool: Option>>>, -} +use std::{mem::MaybeUninit, sync::Once}; -impl Drop for JobToken { - fn drop(&mut self) { - if let Some(pool) = &self.pool { - // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. - let _ = pool.send(self.token.take().map(|token| Ok(token))); - } - } +#[cfg(unix)] +#[path = "job_token/unix.rs"] +mod sys; + +#[cfg(windows)] +#[path = "job_token/windows.rs"] +mod sys; + +pub(super) enum JobToken { + Inherited(inherited_jobserver::JobToken), + InProcess(inprocess_jobserver::JobToken), } impl JobToken { @@ -31,109 +18,175 @@ impl JobToken { /// This also leads to releasing it sooner for other processes to use, /// which is a correct thing to do once it is known that there won't be /// any more token acquisitions. - pub(crate) fn forget(&mut self) { - self.pool.take(); + pub(super) fn forget(&mut self) { + if let Self::Inherited(inherited_jobtoken) = self { + inherited_jobtoken.forget(); + } } } -/// A thin wrapper around jobserver's Client. -/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse -/// our own implicit token assigned for this build script. This struct manages that and -/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. -/// Furthermore, instead of giving up job tokens, it keeps them around -/// for reuse if we know we're going to request another token after freeing the current one. -pub(crate) struct JobTokenServer { - helper: HelperThread, - tx: Sender>>, - rx: Receiver>>, +pub(super) enum JobTokenServer { + Inherited(inherited_jobserver::JobServer), + InProcess(inprocess_jobserver::JobServer), } impl JobTokenServer { pub(crate) fn new() -> &'static Self { - jobserver() - } - fn new_inner(client: Client) -> Result { - let (tx, rx) = mpsc::channel(); - // Push the implicit token. Since JobTokens only give back what they got, - // there should be at most one global implicit token in the wild. - tx.send(None).unwrap(); - let pool = tx.clone(); - let helper = client.into_helper_thread(move |acq| { - let _ = pool.send(Some(acq.map_err(|e| e.into()))); - })?; - Ok(Self { helper, tx, rx }) + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); + + unsafe { + INIT.call_once(|| { + let server = inherited_jobserver::JobServer::from_env() + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); + JOBSERVER = MaybeUninit::new(server); + }); + // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } } - pub(crate) fn acquire(&self) -> Result { - let token = if let Ok(token) = self.rx.try_recv() { - // Opportunistically check if there's a token that can be reused. - token - } else { - // Cold path, request a token and block - self.helper.request_token(); - self.rx.recv().unwrap() - }; - let token = if let Some(token) = token { - Some(token?) - } else { - None - }; - Ok(JobToken { - token, - pool: Some(self.tx.clone()), - }) + pub(crate) fn try_acquire(&self) -> Result, crate::Error> { + match self { + Self::Inherited(jobserver) => jobserver + .try_acquire() + .map(|option| option.map(JobToken::Inherited)), + Self::InProcess(jobserver) => Ok(jobserver.try_acquire().map(JobToken::InProcess)), + } } } -/// Returns a suitable `JobTokenServer` used to coordinate -/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures -/// that only one implicit job token is used in the wild. -/// Having multiple separate job token servers would lead to each of them assuming that they have control -/// over the implicit job token. -/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most -/// one implicit job token in the wild. -fn jobserver() -> &'static JobTokenServer { - static INIT: Once = Once::new(); - static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - - fn _assert_sync() {} - _assert_sync::(); - - unsafe { - INIT.call_once(|| { - let server = default_jobserver(); - JOBSERVER = MaybeUninit::new( - JobTokenServer::new_inner(server).expect("Job server initialization failed"), - ); - }); - // Poor man's assume_init_ref, as that'd require a MSRV of 1.55. - &*JOBSERVER.as_ptr() +mod inherited_jobserver { + use super::sys; + + use std::{ + env::var_os, + sync::mpsc::{self, Receiver, Sender}, + }; + + pub(super) struct JobServer { + inner: sys::JobServerClient, + tx: Sender>, + rx: Receiver>, } -} -unsafe fn default_jobserver() -> jobserver::Client { - // Try to use the environmental jobserver which Cargo typically - // initializes for us... - if let Some(client) = jobserver::Client::from_env() { - return client; + impl JobServer { + pub(super) unsafe fn from_env() -> Option { + let var = var_os("CARGO_MAKEFLAGS") + .or_else(|| var_os("MAKEFLAGS")) + .or_else(|| var_os("MFLAGS"))?; + + let inner = sys::JobServerClient::open(var)?; + + let (tx, rx) = mpsc::channel(); + // Push the implicit token. Since JobTokens only give back what they got, + // there should be at most one global implicit token in the wild. + tx.send(Ok(())).unwrap(); + + Some(Self { inner, tx, rx }) + } + + pub(super) fn try_acquire(&self) -> Result, crate::Error> { + if let Ok(token) = self.rx.try_recv() { + // Opportunistically check if there's a token that can be reused. + token? + } else { + // Cold path, request a token + if self.inner.try_acquire()?.is_none() { + return Ok(None); + } + }; + Ok(Some(JobToken { + pool: Some(self.tx.clone()), + jobserver: self, + })) + } } - // ... but if that fails for whatever reason select something - // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's - // configured by Cargo) and otherwise just fall back to a - // semi-reasonable number. Note that we could use `num_cpus` here - // but it's an extra dependency that will almost never be used, so - // it's generally not too worth it. - let mut parallelism = 4; - if let Ok(amt) = env::var("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - parallelism = amt; + /// A thin wrapper around jobserver Client. + /// It would be perfectly fine to just use jobserver Client, but we also want to reuse + /// our own implicit token assigned for this build script. This struct manages that and + /// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. + /// Furthermore, instead of giving up job tokens, it keeps them around + /// for reuse if we know we're going to request another token after freeing the current one. + pub(super) struct JobToken { + /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead + /// of storing it back in the pool - see [`Self::forget()`] function for that. + pool: Option>>, + jobserver: &'static JobServer, + } + + impl Drop for JobToken { + fn drop(&mut self) { + if let Some(pool) = &self.pool { + // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. + let _ = pool.send(Ok(())); + } else { + let _ = self.jobserver.inner.release(); + } + } + } + + impl JobToken { + /// Ensure that this token is not put back into queue once it's dropped. + /// This also leads to releasing it sooner for other processes to use, + /// which is a correct thing to do once it is known that there won't be + /// any more token acquisitions. + pub(super) fn forget(&mut self) { + self.pool.take(); + } + } +} + +mod inprocess_jobserver { + use std::{ + env::var, + sync::atomic::{AtomicU32, Ordering::Relaxed}, + }; + + pub(super) struct JobServer(AtomicU32); + + impl JobServer { + pub(super) fn new() -> Self { + // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise + // just fall back to a semi-reasonable number. + // + // Note that we could use `num_cpus` here but it's an extra + // dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + if let Ok(amt) = var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + Self(AtomicU32::new(parallelism)) + } + + pub(super) fn try_acquire(&self) -> Option { + let res = self.0.fetch_update(Relaxed, Relaxed, |tokens| { + if tokens > 0 { + Some(tokens - 1) + } else { + None + } + }); + + if res.is_ok() { + Some(JobToken(self)) + } else { + None + } } } - // If we create our own jobserver then be sure to reserve one token - // for ourselves. - let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); - client.acquire_raw().expect("failed to acquire initial"); - return client; + pub(super) struct JobToken(&'static JobServer); + + impl Drop for JobToken { + fn drop(&mut self) { + self.0 .0.fetch_add(1, Relaxed); + } + } } diff --git a/src/job_token/unix.rs b/src/job_token/unix.rs new file mode 100644 index 000000000..bf332abd2 --- /dev/null +++ b/src/job_token/unix.rs @@ -0,0 +1,182 @@ +use std::{ + ffi::{OsStr, OsString}, + fs::{self, File}, + io::{self, Read, Write}, + mem::ManuallyDrop, + os::{ + raw::c_int, + unix::{ + ffi::{OsStrExt, OsStringExt}, + prelude::*, + }, + }, + path::Path, +}; + +pub(super) struct JobServerClient { + read: File, + write: Option, +} + +impl JobServerClient { + pub(super) unsafe fn open(var: OsString) -> Option { + let bytes = var.into_vec(); + + let s = bytes + .split(u8::is_ascii_whitespace) + .filter_map(|arg| { + arg.strip_prefix(b"--jobserver-fds=") + .or_else(|| arg.strip_prefix(b"--jobserver-auth=")) + }) + .find(|bytes| !bytes.is_empty())?; + + if let Some(fifo) = s.strip_prefix(b"fifo:") { + Self::from_fifo(Path::new(OsStr::from_bytes(fifo))) + } else { + Self::from_pipe(OsStr::from_bytes(s).to_str()?) + } + } + + /// `--jobserver-auth=fifo:PATH` + fn from_fifo(path: &Path) -> Option { + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(path) + .ok()?; + + if is_pipe(&file)? { + // File in Rust is always closed-on-exec as long as it's opened by + // `File::open` or `fs::OpenOptions::open`. + set_nonblocking(&file)?; + + Some(Self { + read: file, + write: None, + }) + } else { + None + } + } + + /// `--jobserver-auth=fd-for-R,fd-for-W` + unsafe fn from_pipe(s: &str) -> Option { + let (read, write) = s.split_once(',')?; + + let read = read.parse().ok()?; + let write = write.parse().ok()?; + + let read = ManuallyDrop::new(File::from_raw_fd(read)); + let write = ManuallyDrop::new(File::from_raw_fd(write)); + + // Ok so we've got two integers that look like file descriptors, but + // for extra sanity checking let's see if they actually look like + // instances of a pipe before we return the client. + // + // If we're called from `make` *without* the leading + on our rule + // then we'll have `MAKEFLAGS` env vars but won't actually have + // access to the file descriptors. + if is_pipe(&read)? + && is_pipe(&write)? + && get_access_mode(&read) == Some(libc::O_RDONLY) + && get_access_mode(&write) == Some(libc::O_WRONLY) + { + let read = read.try_clone().ok()?; + let write = write.try_clone().ok()?; + + // Set read and write end to nonblocking + set_nonblocking(&read)?; + set_nonblocking(&write)?; + + Some(Self { + read, + write: Some(write), + }) + } else { + None + } + } + + pub(super) fn try_acquire(&self) -> io::Result> { + let mut fds = [libc::pollfd { + fd: self.read.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }]; + + let ret = cvt(unsafe { libc::poll(fds.as_mut_ptr(), 1, 0) })?; + if ret == 1 { + let mut buf = [0]; + match (&self.read).read(&mut buf) { + Ok(1) => Ok(Some(())), + Ok(_) => Ok(None), // 0, eof + Err(e) + if e.kind() == io::ErrorKind::Interrupted + || e.kind() == io::ErrorKind::WouldBlock => + { + Ok(None) + } + Err(e) => Err(e), + } + } else { + Ok(None) + } + } + + pub(super) fn release(&self) -> io::Result<()> { + // Note that the fd may be nonblocking but we're going to go ahead + // and assume that the writes here are always nonblocking (we can + // always quickly release a token). + // + // For write to block, this would mean that pipe is full. + // If all every release are pair with an acquire, then this cannot + // happen. + // + // If it does happen, it is likely a bug in the program using this + // crate or some other programs that use the same jobserver have a + // bug in their code + // + // If that turns out to not be the case we'll get an error anyway! + let mut write = self.write.as_ref().unwrap_or(&self.read); + match write.write(&[b'+'])? { + 1 => Ok(()), + _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), + } + } +} + +fn set_nonblocking(file: &File) -> Option<()> { + // F_SETFL can only set the O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, and + // O_NONBLOCK flags. + // + // For pipe, only O_NONBLOCK is meaningful, so it is ok to + // not issue a F_GETFL fcntl syscall. + let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK) }; + + if ret == -1 { + None + } else { + Some(()) + } +} + +fn cvt(t: c_int) -> io::Result { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} + +fn is_pipe(file: &File) -> Option { + Some(file.metadata().ok()?.file_type().is_fifo()) +} + +fn get_access_mode(file: &File) -> Option { + let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_GETFL) }; + if ret == -1 { + return None; + } + + Some(ret & libc::O_ACCMODE) +} diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs new file mode 100644 index 000000000..19ff50239 --- /dev/null +++ b/src/job_token/windows.rs @@ -0,0 +1,62 @@ +use std::{ + ffi::{CString, OsString}, + io, ptr, +}; + +use crate::windows_sys::{ + OpenSemaphoreA, ReleaseSemaphore, WaitForSingleObject, FALSE, HANDLE, SEMAPHORE_MODIFY_STATE, + THREAD_SYNCHRONIZE, WAIT_OBJECT_0, +}; + +pub(super) struct JobServerClient { + sem: HANDLE, +} + +unsafe impl Sync for JobServerClient {} +unsafe impl Send for JobServerClient {} + +impl JobServerClient { + pub(super) unsafe fn open(var: OsString) -> Option { + let s = var + .to_str()? + .split_ascii_whitespace() + .filter_map(|arg| { + arg.strip_prefix("--jobserver-fds=") + .or_else(|| arg.strip_prefix("--jobserver-auth=")) + }) + .find(|s| !s.is_empty())?; + + let name = CString::new(s).ok()?; + + let sem = OpenSemaphoreA( + THREAD_SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, + FALSE, + name.as_bytes().as_ptr(), + ); + if sem != ptr::null_mut() { + Some(Self { sem }) + } else { + None + } + } + + pub(super) fn try_acquire(&self) -> io::Result> { + let r = unsafe { WaitForSingleObject(self.sem, 0) }; + if r == WAIT_OBJECT_0 { + Ok(Some(())) + } else { + Err(io::Error::last_os_error()) + } + } + + pub(super) fn release(&self) -> io::Result<()> { + // SAFETY: ReleaseSemaphore will write to prev_count is it is Some + // and release semaphore self.sem by 1. + let r = unsafe { ReleaseSemaphore(self.sem, 1, ptr::null_mut()) }; + if r != 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } +} diff --git a/src/windows_sys.rs b/src/windows_sys.rs index ee4704d25..eea210853 100644 --- a/src/windows_sys.rs +++ b/src/windows_sys.rs @@ -62,6 +62,22 @@ extern "system" { nsize: u32, ) -> BOOL; } +#[link(name = "kernel32")] +extern "system" { + pub fn OpenSemaphoreA(dwdesiredaccess: u32, binherithandle: BOOL, lpname: PCSTR) -> HANDLE; +} +#[link(name = "kernel32")] +extern "system" { + pub fn ReleaseSemaphore( + hsemaphore: HANDLE, + lreleasecount: i32, + lppreviouscount: *mut i32, + ) -> BOOL; +} +#[link(name = "kernel32")] +extern "system" { + pub fn WaitForSingleObject(hhandle: HANDLE, dwmilliseconds: u32) -> WIN32_ERROR; +} #[link(name = "ole32")] extern "system" { pub fn CoCreateInstance( @@ -93,6 +109,7 @@ pub type COINIT = i32; pub const COINIT_MULTITHREADED: COINIT = 0i32; pub const ERROR_NO_MORE_ITEMS: WIN32_ERROR = 259u32; pub const ERROR_SUCCESS: WIN32_ERROR = 0u32; +pub const FALSE: BOOL = 0i32; #[repr(C)] pub struct FILETIME { pub dwLowDateTime: u32, @@ -135,6 +152,7 @@ pub const INVALID_HANDLE_VALUE: HANDLE = invalid_mut(-1i32 as _); pub type IUnknown = *mut ::core::ffi::c_void; pub const KEY_READ: REG_SAM_FLAGS = 131097u32; pub const KEY_WOW64_32KEY: REG_SAM_FLAGS = 512u32; +pub type PCSTR = *const u8; pub type PCWSTR = *const u16; pub type PWSTR = *mut u16; pub type REG_SAM_FLAGS = u32; @@ -178,8 +196,13 @@ impl ::core::clone::Clone for SECURITY_ATTRIBUTES { *self } } +pub const SEMAPHORE_MODIFY_STATE: SYNCHRONIZATION_ACCESS_RIGHTS = 2u32; +pub type SYNCHRONIZATION_ACCESS_RIGHTS = u32; pub const S_FALSE: HRESULT = 1i32; pub const S_OK: HRESULT = 0i32; +pub type THREAD_ACCESS_RIGHTS = u32; +pub const THREAD_SYNCHRONIZE: THREAD_ACCESS_RIGHTS = 1048576u32; +pub const WAIT_OBJECT_0: WIN32_ERROR = 0u32; pub type WIN32_ERROR = u32; /// Adapted from From 14c1c9be9a77ca13d8d9863f7eb6b06e1fef2205 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 00:02:25 +1100 Subject: [PATCH 02/22] Convert parallel `compile_objects` to use future instead of threads Also fixed compilation errors in mod `job_token` Signed-off-by: Jiahao XU --- src/job_token.rs | 14 +++--- src/lib.rs | 127 +++++++++++++++++++++++++++++++---------------- 2 files changed, 90 insertions(+), 51 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index de462fcb8..6207d46fa 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -47,7 +47,7 @@ impl JobTokenServer { } } - pub(crate) fn try_acquire(&self) -> Result, crate::Error> { + pub(crate) fn try_acquire(&'static self) -> Result, crate::Error> { match self { Self::Inherited(jobserver) => jobserver .try_acquire() @@ -65,7 +65,7 @@ mod inherited_jobserver { sync::mpsc::{self, Receiver, Sender}, }; - pub(super) struct JobServer { + pub(crate) struct JobServer { inner: sys::JobServerClient, tx: Sender>, rx: Receiver>, @@ -87,7 +87,7 @@ mod inherited_jobserver { Some(Self { inner, tx, rx }) } - pub(super) fn try_acquire(&self) -> Result, crate::Error> { + pub(super) fn try_acquire(&'static self) -> Result, crate::Error> { if let Ok(token) = self.rx.try_recv() { // Opportunistically check if there's a token that can be reused. token? @@ -110,7 +110,7 @@ mod inherited_jobserver { /// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. /// Furthermore, instead of giving up job tokens, it keeps them around /// for reuse if we know we're going to request another token after freeing the current one. - pub(super) struct JobToken { + pub(crate) struct JobToken { /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead /// of storing it back in the pool - see [`Self::forget()`] function for that. pool: Option>>, @@ -145,7 +145,7 @@ mod inprocess_jobserver { sync::atomic::{AtomicU32, Ordering::Relaxed}, }; - pub(super) struct JobServer(AtomicU32); + pub(crate) struct JobServer(AtomicU32); impl JobServer { pub(super) fn new() -> Self { @@ -165,7 +165,7 @@ mod inprocess_jobserver { Self(AtomicU32::new(parallelism)) } - pub(super) fn try_acquire(&self) -> Option { + pub(super) fn try_acquire(&'static self) -> Option { let res = self.0.fetch_update(Relaxed, Relaxed, |tokens| { if tokens > 0 { Some(tokens - 1) @@ -182,7 +182,7 @@ mod inprocess_jobserver { } } - pub(super) struct JobToken(&'static JobServer); + pub(crate) struct JobToken(&'static JobServer); impl Drop for JobToken { fn drop(&mut self) { diff --git a/src/lib.rs b/src/lib.rs index 77f1a1db1..5414ab0cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1297,7 +1297,13 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::mpsc; + use std::{ + future::Future, + pin::Pin, + ptr, + sync::mpsc, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + }; if objs.len() <= 1 { for obj in objs { @@ -1332,17 +1338,13 @@ impl Build { let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>(); - // Since jobserver::Client::acquire can block, waiting - // must be done in parallel so that acquire won't block forever. - let wait_thread = thread::Builder::new().spawn(move || { + let mut wait_future = async move { let mut error = None; let mut pendings = Vec::new(); // Buffer the stdout let mut stdout = io::BufWriter::with_capacity(128, io::stdout()); - let mut backoff_cnt = 0; loop { - let mut has_made_progress = false; // If the other end of the pipe is already disconnected, then we're not gonna get any new jobs, // so it doesn't make sense to reuse the tokens; in fact, // releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. @@ -1353,10 +1355,7 @@ impl Build { // Reading new pending tasks loop { match rx.try_recv() { - Ok(pending) => { - has_made_progress = true; - pendings.push(pending) - } + Ok(pending) => pendings.push(pending), Err(mpsc::TryRecvError::Disconnected) if pendings.is_empty() => { let _ = stdout.flush(); return if let Some(err) = error { @@ -1381,13 +1380,11 @@ impl Build { if is_disconnected { token.forget(); } - has_made_progress = true; false } Ok(None) => true, // Task still not finished, keep the entry Err(err) => { // Task fail, remove the entry. - has_made_progress = true; if is_disconnected { token.forget(); } @@ -1401,45 +1398,58 @@ impl Build { } }); - if !has_made_progress { - if backoff_cnt > 3 { - // We have yielded at least three times without making' - // any progress, so we will sleep for a while. - let duration = - std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10)); - thread::sleep(duration); + let _ = stdout.flush(); + YieldOnce::default().await; + } + }; + let mut spawn_future = async move { + for obj in objs { + let (mut cmd, program) = self.create_compile_object_cmd(obj)?; + let token = loop { + if let Some(token) = tokens.try_acquire()? { + break token; } else { - // Given that we spawned a lot of compilation tasks, it is unlikely - // that OS cannot find other ready task to execute. - // - // If all of them are done, then we will yield them and spawn more, - // or simply returns. - // - // Thus this will not be turned into a busy-wait loop and it will not - // waste CPU resource. - thread::yield_now(); + YieldOnce::default().await } - } - - backoff_cnt = if has_made_progress { - 0 - } else { - backoff_cnt + 1 }; - } - })?; - for obj in objs { - let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = tokens.acquire()?; - let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; + let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; - tx.send((cmd, program, KillOnDrop(child), token)) + tx.send((cmd, program, KillOnDrop(child), token)) .expect("Wait thread must be alive until all compilation jobs are done, otherwise we risk deadlock"); + } + // Drop tx so that the wait_thread could return + drop(tx); + + Ok::<_, Error>(()) + }; + + // Shadows the future so that it can never be moved and is guaranteed + // to be pinned. + // + // The same trick used in `pin!` macro. + let mut wait_future = Some(unsafe { Pin::new_unchecked(&mut wait_future) }); + let mut spawn_future = Some(unsafe { Pin::new_unchecked(&mut spawn_future) }); + + let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; + let mut context = Context::from_waker(&waker); + + while wait_future.is_some() || spawn_future.is_some() { + if let Some(fut) = spawn_future.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + spawn_future = None; + res?; + } + } + + if let Some(fut) = wait_future.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + wait_future = None; + res?; + } + } } - // Drop tx so that the wait_thread could return - drop(tx); - return wait_thread.join().expect("wait_thread panics"); + return Ok(()); struct KillOnDrop(Child); @@ -1450,6 +1460,35 @@ impl Build { child.kill().ok(); } } + + #[derive(Default)] + struct YieldOnce(bool); + + impl Future for YieldOnce { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + let flag = &mut std::pin::Pin::into_inner(self).0; + if !*flag { + *flag = true; + Poll::Pending + } else { + Poll::Ready(()) + } + } + } + + const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| NOOP_RAW_WAKER, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, + ); + const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); } #[cfg(not(feature = "parallel"))] From 83eccca9e1f5dbc7fb78f4858bc6fcfe895caabd Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 00:25:25 +1100 Subject: [PATCH 03/22] Optimize parallel `compile_objects` Remove use of mpsc since the future is executed on one single thread only. Signed-off-by: Jiahao XU --- src/lib.rs | 111 +++++++++++++++++++++++++++++------------------------ 1 file changed, 60 insertions(+), 51 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5414ab0cb..99ba6c954 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1298,10 +1298,10 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { use std::{ + cell::Cell, future::Future, pin::Pin, ptr, - sync::mpsc, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; @@ -1336,11 +1336,16 @@ impl Build { // acquire the appropriate tokens, Once all objects have been compiled // we wait on all the processes and propagate the results of compilation. - let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>(); + let pendings = Cell::new(Vec::<( + Command, + String, + KillOnDrop, + crate::job_token::JobToken, + )>::new()); + let is_disconnected = Cell::new(false); - let mut wait_future = async move { + let mut wait_future = async { let mut error = None; - let mut pendings = Vec::new(); // Buffer the stdout let mut stdout = io::BufWriter::with_capacity(128, io::stdout()); @@ -1351,58 +1356,51 @@ impl Build { // Imagine that the last file built takes an hour to finish; in this scenario, // by not releasing the tokens before that last file is done we would effectively block other processes from // starting sooner - even though we only need one token for that last file, not N others that were acquired. - let mut is_disconnected = false; - // Reading new pending tasks - loop { - match rx.try_recv() { - Ok(pending) => pendings.push(pending), - Err(mpsc::TryRecvError::Disconnected) if pendings.is_empty() => { - let _ = stdout.flush(); - return if let Some(err) = error { - Err(err) - } else { - Ok(()) - }; - } - Err(mpsc::TryRecvError::Disconnected) => { - is_disconnected = true; - break; - } - _ => break, - } - } - // Try waiting on them. - retain_unordered_mut(&mut pendings, |(cmd, program, child, token)| { - match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { - Ok(Some(())) => { - // Task done, remove the entry - if is_disconnected { - token.forget(); - } - false - } - Ok(None) => true, // Task still not finished, keep the entry - Err(err) => { - // Task fail, remove the entry. - if is_disconnected { - token.forget(); + let mut pendings_is_empty = false; + + cell_update(&pendings, |mut pendings| { + // Try waiting on them. + retain_unordered_mut(&mut pendings, |(cmd, program, child, token)| { + match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { + Ok(Some(())) => { + // Task done, remove the entry + if is_disconnected.get() { + token.forget(); + } + false } - // Since we can only return one error, log the error to make - // sure users always see all the compilation failures. - let _ = writeln!(stdout, "cargo:warning={}", err); - error = Some(err); + Ok(None) => true, // Task still not finished, keep the entry + Err(err) => { + // Task fail, remove the entry. + if is_disconnected.get() { + token.forget(); + } + // Since we can only return one error, log the error to make + // sure users always see all the compilation failures. + let _ = writeln!(stdout, "cargo:warning={}", err); + error = Some(err); - false + false + } } - } + }); + pendings_is_empty = pendings.is_empty(); + pendings }); - let _ = stdout.flush(); + if pendings_is_empty && is_disconnected.get() { + break if let Some(err) = error { + Err(err) + } else { + Ok(()) + }; + } + YieldOnce::default().await; } }; - let mut spawn_future = async move { + let mut spawn_future = async { for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; let token = loop { @@ -1414,11 +1412,12 @@ impl Build { }; let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; - tx.send((cmd, program, KillOnDrop(child), token)) - .expect("Wait thread must be alive until all compilation jobs are done, otherwise we risk deadlock"); + cell_update(&pendings, |mut pendings| { + pendings.push((cmd, program, KillOnDrop(child), token)); + pendings + }); } - // Drop tx so that the wait_thread could return - drop(tx); + is_disconnected.set(true); Ok::<_, Error>(()) }; @@ -1489,6 +1488,16 @@ impl Build { |_| {}, ); const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); + + fn cell_update(cell: &Cell, f: F) + where + T: Default, + F: FnOnce(T) -> T, + { + let old = cell.take(); + let new = f(old); + cell.set(new); + } } #[cfg(not(feature = "parallel"))] From 3ca2a7fbb87d97fdd2f97fad56d4ecd6f3dad624 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 10:47:08 +1100 Subject: [PATCH 04/22] Fix `job_token`: Remove mpsc and make sure tokens are relased The mpsc is stored in a global variable and Rust never calls `Drop::drop` on global variables, so they are never released. This commit removes the mpsc and replaces that with an `AtomicBool` for the implicit token to fix this, also dramatically simplifies the code. Signed-off-by: Jiahao XU --- src/job_token.rs | 75 +++++++++++++++++------------------------------- src/lib.rs | 8 +----- 2 files changed, 27 insertions(+), 56 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 6207d46fa..2d4ef6f45 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -13,18 +13,6 @@ pub(super) enum JobToken { InProcess(inprocess_jobserver::JobToken), } -impl JobToken { - /// Ensure that this token is not put back into queue once it's dropped. - /// This also leads to releasing it sooner for other processes to use, - /// which is a correct thing to do once it is known that there won't be - /// any more token acquisitions. - pub(super) fn forget(&mut self) { - if let Self::Inherited(inherited_jobtoken) = self { - inherited_jobtoken.forget(); - } - } -} - pub(super) enum JobTokenServer { Inherited(inherited_jobserver::JobServer), InProcess(inprocess_jobserver::JobServer), @@ -62,13 +50,19 @@ mod inherited_jobserver { use std::{ env::var_os, - sync::mpsc::{self, Receiver, Sender}, + sync::atomic::{AtomicBool, Ordering::Relaxed}, }; pub(crate) struct JobServer { + /// Implicit token for this process which is obtained and will be + /// released in parent. Since JobTokens only give back what they got, + /// there should be at most one global implicit token in the wild. + /// + /// Since Rust does not execute any `Drop` for global variables, + /// we can't just put it back to jobserver and then re-acquire it at + /// the end of the process. + global_implicit_token: AtomicBool, inner: sys::JobServerClient, - tx: Sender>, - rx: Receiver>, } impl JobServer { @@ -79,28 +73,20 @@ mod inherited_jobserver { let inner = sys::JobServerClient::open(var)?; - let (tx, rx) = mpsc::channel(); - // Push the implicit token. Since JobTokens only give back what they got, - // there should be at most one global implicit token in the wild. - tx.send(Ok(())).unwrap(); - - Some(Self { inner, tx, rx }) + Some(Self { + inner, + global_implicit_token: AtomicBool::new(true), + }) } pub(super) fn try_acquire(&'static self) -> Result, crate::Error> { - if let Ok(token) = self.rx.try_recv() { - // Opportunistically check if there's a token that can be reused. - token? - } else { - // Cold path, request a token + if !self.global_implicit_token.swap(false, Relaxed) { + // Cold path, no global implicit token, obtain one if self.inner.try_acquire()?.is_none() { return Ok(None); } - }; - Ok(Some(JobToken { - pool: Some(self.tx.clone()), - jobserver: self, - })) + } + Ok(Some(JobToken { jobserver: self })) } } @@ -111,32 +97,23 @@ mod inherited_jobserver { /// Furthermore, instead of giving up job tokens, it keeps them around /// for reuse if we know we're going to request another token after freeing the current one. pub(crate) struct JobToken { - /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead - /// of storing it back in the pool - see [`Self::forget()`] function for that. - pool: Option>>, jobserver: &'static JobServer, } impl Drop for JobToken { fn drop(&mut self) { - if let Some(pool) = &self.pool { - // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. - let _ = pool.send(Ok(())); - } else { - let _ = self.jobserver.inner.release(); + let jobserver = self.jobserver; + if jobserver + .global_implicit_token + .compare_exchange(false, true, Relaxed, Relaxed) + .is_err() + { + // There's already a global implicit token, so this token must + // be released back into jobserver + let _ = jobserver.inner.release(); } } } - - impl JobToken { - /// Ensure that this token is not put back into queue once it's dropped. - /// This also leads to releasing it sooner for other processes to use, - /// which is a correct thing to do once it is known that there won't be - /// any more token acquisitions. - pub(super) fn forget(&mut self) { - self.pool.take(); - } - } } mod inprocess_jobserver { diff --git a/src/lib.rs b/src/lib.rs index 99ba6c954..7136e0f9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1361,21 +1361,15 @@ impl Build { cell_update(&pendings, |mut pendings| { // Try waiting on them. - retain_unordered_mut(&mut pendings, |(cmd, program, child, token)| { + retain_unordered_mut(&mut pendings, |(cmd, program, child, _token)| { match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { Ok(Some(())) => { // Task done, remove the entry - if is_disconnected.get() { - token.forget(); - } false } Ok(None) => true, // Task still not finished, keep the entry Err(err) => { // Task fail, remove the entry. - if is_disconnected.get() { - token.forget(); - } // Since we can only return one error, log the error to make // sure users always see all the compilation failures. let _ = writeln!(stdout, "cargo:warning={}", err); From 1f1e8578c77ec2b6595f8b178b134556101b9ada Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 11:00:06 +1100 Subject: [PATCH 05/22] Optimize `job_token`: Make `JobToken` zero-sized Signed-off-by: Jiahao XU --- src/job_token.rs | 59 ++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 2d4ef6f45..0c72da737 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,5 +1,7 @@ use std::{mem::MaybeUninit, sync::Once}; +use crate::Error; + #[cfg(unix)] #[path = "job_token/unix.rs"] mod sys; @@ -8,9 +10,15 @@ mod sys; #[path = "job_token/windows.rs"] mod sys; -pub(super) enum JobToken { - Inherited(inherited_jobserver::JobToken), - InProcess(inprocess_jobserver::JobToken), +pub(super) struct JobToken(); + +impl Drop for JobToken { + fn drop(&mut self) { + match JobTokenServer::new() { + JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), + JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), + } + } } pub(super) enum JobTokenServer { @@ -35,18 +43,16 @@ impl JobTokenServer { } } - pub(crate) fn try_acquire(&'static self) -> Result, crate::Error> { + pub(crate) fn try_acquire(&'static self) -> Result, Error> { match self { - Self::Inherited(jobserver) => jobserver - .try_acquire() - .map(|option| option.map(JobToken::Inherited)), - Self::InProcess(jobserver) => Ok(jobserver.try_acquire().map(JobToken::InProcess)), + Self::Inherited(jobserver) => jobserver.try_acquire(), + Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), } } } mod inherited_jobserver { - use super::sys; + use super::{sys, Error, JobToken}; use std::{ env::var_os, @@ -79,44 +85,33 @@ mod inherited_jobserver { }) } - pub(super) fn try_acquire(&'static self) -> Result, crate::Error> { + pub(super) fn try_acquire(&'static self) -> Result, Error> { if !self.global_implicit_token.swap(false, Relaxed) { // Cold path, no global implicit token, obtain one if self.inner.try_acquire()?.is_none() { return Ok(None); } } - Ok(Some(JobToken { jobserver: self })) + Ok(Some(JobToken())) } - } - /// A thin wrapper around jobserver Client. - /// It would be perfectly fine to just use jobserver Client, but we also want to reuse - /// our own implicit token assigned for this build script. This struct manages that and - /// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. - /// Furthermore, instead of giving up job tokens, it keeps them around - /// for reuse if we know we're going to request another token after freeing the current one. - pub(crate) struct JobToken { - jobserver: &'static JobServer, - } - - impl Drop for JobToken { - fn drop(&mut self) { - let jobserver = self.jobserver; - if jobserver + pub(super) fn release_token_raw(&self) { + if self .global_implicit_token .compare_exchange(false, true, Relaxed, Relaxed) .is_err() { // There's already a global implicit token, so this token must // be released back into jobserver - let _ = jobserver.inner.release(); + let _ = self.inner.release(); } } } } mod inprocess_jobserver { + use super::JobToken; + use std::{ env::var, sync::atomic::{AtomicU32, Ordering::Relaxed}, @@ -152,18 +147,14 @@ mod inprocess_jobserver { }); if res.is_ok() { - Some(JobToken(self)) + Some(JobToken()) } else { None } } - } - - pub(crate) struct JobToken(&'static JobServer); - impl Drop for JobToken { - fn drop(&mut self) { - self.0 .0.fetch_add(1, Relaxed); + pub(super) fn release_token_raw(&self) { + self.0.fetch_add(1, Relaxed); } } } From dfcbae569df4f99c379ed7eda19066b1376c6234 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 11:33:16 +1100 Subject: [PATCH 06/22] Fix `windows::JobServerClient::try_acquire` impl Return `Ok(None)` instead of `Err()` if no token is ready. Signed-off-by: Jiahao XU --- src/job_token/windows.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs index 19ff50239..81cb8b1db 100644 --- a/src/job_token/windows.rs +++ b/src/job_token/windows.rs @@ -44,8 +44,10 @@ impl JobServerClient { let r = unsafe { WaitForSingleObject(self.sem, 0) }; if r == WAIT_OBJECT_0 { Ok(Some(())) - } else { + } else if r == 0 { Err(io::Error::last_os_error()) + } else { + Ok(None) } } From 19abb40bb164530cf60e3b2b9ed3030530e713c3 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 11:40:18 +1100 Subject: [PATCH 07/22] Fix `unix::JobServerClient::from_pipe`: Accept more fd access modes `O_RDWR` is a valid access mode for both read and write end of the pipe. Signed-off-by: Jiahao XU --- src/job_token/unix.rs | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/job_token/unix.rs b/src/job_token/unix.rs index bf332abd2..6af5e3944 100644 --- a/src/job_token/unix.rs +++ b/src/job_token/unix.rs @@ -76,24 +76,31 @@ impl JobServerClient { // If we're called from `make` *without* the leading + on our rule // then we'll have `MAKEFLAGS` env vars but won't actually have // access to the file descriptors. - if is_pipe(&read)? - && is_pipe(&write)? - && get_access_mode(&read) == Some(libc::O_RDONLY) - && get_access_mode(&write) == Some(libc::O_WRONLY) - { - let read = read.try_clone().ok()?; - let write = write.try_clone().ok()?; - - // Set read and write end to nonblocking - set_nonblocking(&read)?; - set_nonblocking(&write)?; - - Some(Self { - read, - write: Some(write), - }) - } else { - None + match ( + is_pipe(&read), + is_pipe(&write), + get_access_mode(&read), + get_access_mode(&write), + ) { + ( + Some(true), + Some(true), + Some(libc::O_RDONLY) | Some(libc::O_RDWR), + Some(libc::O_WRONLY) | Some(libc::O_RDWR), + ) => { + let read = read.try_clone().ok()?; + let write = write.try_clone().ok()?; + + // Set read and write end to nonblocking + set_nonblocking(&read)?; + set_nonblocking(&write)?; + + Some(Self { + read, + write: Some(write), + }) + } + _ => None, } } From 7543bb60076b7c6a45ae070890758aaa79a5e9bb Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 11:43:16 +1100 Subject: [PATCH 08/22] Rm unnecessary `'static` bound in parameter of `job_token` Signed-off-by: Jiahao XU --- src/job_token.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 0c72da737..30049b56e 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -43,7 +43,7 @@ impl JobTokenServer { } } - pub(crate) fn try_acquire(&'static self) -> Result, Error> { + pub(crate) fn try_acquire(&self) -> Result, Error> { match self { Self::Inherited(jobserver) => jobserver.try_acquire(), Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), @@ -85,7 +85,7 @@ mod inherited_jobserver { }) } - pub(super) fn try_acquire(&'static self) -> Result, Error> { + pub(super) fn try_acquire(&self) -> Result, Error> { if !self.global_implicit_token.swap(false, Relaxed) { // Cold path, no global implicit token, obtain one if self.inner.try_acquire()?.is_none() { @@ -137,7 +137,7 @@ mod inprocess_jobserver { Self(AtomicU32::new(parallelism)) } - pub(super) fn try_acquire(&'static self) -> Option { + pub(super) fn try_acquire(&self) -> Option { let res = self.0.fetch_update(Relaxed, Relaxed, |tokens| { if tokens > 0 { Some(tokens - 1) From df659cd7ad9bc73d228c90096fe26654344f75eb Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 22 Oct 2023 12:44:34 +1100 Subject: [PATCH 09/22] Optimize parallel `compile_objects`: Sleep/yield if no progress is made Signed-off-by: Jiahao XU --- src/lib.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7136e0f9c..8264d9df0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1343,6 +1343,7 @@ impl Build { crate::job_token::JobToken, )>::new()); let is_disconnected = Cell::new(false); + let has_made_progress = Cell::new(false); let mut wait_future = async { let mut error = None; @@ -1365,6 +1366,7 @@ impl Build { match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { Ok(Some(())) => { // Task done, remove the entry + has_made_progress.set(true); false } Ok(None) => true, // Task still not finished, keep the entry @@ -1372,6 +1374,8 @@ impl Build { // Task fail, remove the entry. // Since we can only return one error, log the error to make // sure users always see all the compilation failures. + has_made_progress.set(true); + let _ = writeln!(stdout, "cargo:warning={}", err); error = Some(err); @@ -1410,6 +1414,8 @@ impl Build { pendings.push((cmd, program, KillOnDrop(child), token)); pendings }); + + has_made_progress.set(true); } is_disconnected.set(true); @@ -1426,7 +1432,11 @@ impl Build { let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; let mut context = Context::from_waker(&waker); - while wait_future.is_some() || spawn_future.is_some() { + let mut backoff_cnt = 0; + + loop { + has_made_progress.set(false); + if let Some(fut) = spawn_future.as_mut() { if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { spawn_future = None; @@ -1440,9 +1450,37 @@ impl Build { res?; } } - } - return Ok(()); + if wait_future.is_none() && spawn_future.is_none() { + return Ok(()); + } + + if !has_made_progress.get() { + if backoff_cnt > 3 { + // We have yielded at least three times without making' + // any progress, so we will sleep for a while. + let duration = + std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10)); + thread::sleep(duration); + } else { + // Given that we spawned a lot of compilation tasks, it is unlikely + // that OS cannot find other ready task to execute. + // + // If all of them are done, then we will yield them and spawn more, + // or simply returns. + // + // Thus this will not be turned into a busy-wait loop and it will not + // waste CPU resource. + thread::yield_now(); + } + } + + backoff_cnt = if has_made_progress.get() { + 0 + } else { + backoff_cnt + 1 + }; + } struct KillOnDrop(Child); From 43527dda0da62d7c9ffb114b0a6591350a47a431 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 23 Oct 2023 22:24:19 +1100 Subject: [PATCH 10/22] Fix windows implementation: Match all return value explicitly Signed-off-by: Jiahao XU --- gen-windows-sys-binding/windows_sys.list | 3 +++ src/job_token/windows.rs | 19 +++++++++++-------- src/windows_sys.rs | 3 +++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/gen-windows-sys-binding/windows_sys.list b/gen-windows-sys-binding/windows_sys.list index 6c84d7f98..f5ed55e16 100644 --- a/gen-windows-sys-binding/windows_sys.list +++ b/gen-windows-sys-binding/windows_sys.list @@ -9,6 +9,9 @@ Windows.Win32.Foundation.S_OK Windows.Win32.Foundation.FALSE Windows.Win32.Foundation.HANDLE Windows.Win32.Foundation.WAIT_OBJECT_0 +Windows.Win32.Foundation.WAIT_TIMEOUT +Windows.Win32.Foundation.WAIT_FAILED +Windows.Win32.Foundation.WAIT_ABANDONED Windows.Win32.System.Com.SAFEARRAY Windows.Win32.System.Com.SAFEARRAYBOUND diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs index 81cb8b1db..e77c9d74c 100644 --- a/src/job_token/windows.rs +++ b/src/job_token/windows.rs @@ -5,9 +5,13 @@ use std::{ use crate::windows_sys::{ OpenSemaphoreA, ReleaseSemaphore, WaitForSingleObject, FALSE, HANDLE, SEMAPHORE_MODIFY_STATE, - THREAD_SYNCHRONIZE, WAIT_OBJECT_0, + THREAD_SYNCHRONIZE, WAIT_ABANDONED, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, }; +const WAIT_ABANDOEND_ERR_MSG: &str = r#" The specified object is a mutex object that was not released by the thread that owned the mutex object before the owning thread terminated. Ownership of the mutex object is granted to the calling thread and the mutex state is set to nonsignaled. + +If the mutex was protecting persistent state information, you should check it for consistency."#; + pub(super) struct JobServerClient { sem: HANDLE, } @@ -41,13 +45,12 @@ impl JobServerClient { } pub(super) fn try_acquire(&self) -> io::Result> { - let r = unsafe { WaitForSingleObject(self.sem, 0) }; - if r == WAIT_OBJECT_0 { - Ok(Some(())) - } else if r == 0 { - Err(io::Error::last_os_error()) - } else { - Ok(None) + match unsafe { WaitForSingleObject(self.sem, 0) } { + WAIT_OBJECT_0 => Ok(Some(())), + WAIT_TIMEOUT => Ok(None), + WAIT_FAILED => Err(io::Error::last_os_error()), + WAIT_ABANDONED => Err(io::Error::new(io::ErrorKind::Other, WAIT_ABANDOEND_ERR_MSG)), + _ => unreachable!("Unexpected return value from WaitForSingleObject"), } } diff --git a/src/windows_sys.rs b/src/windows_sys.rs index eea210853..20a256076 100644 --- a/src/windows_sys.rs +++ b/src/windows_sys.rs @@ -202,7 +202,10 @@ pub const S_FALSE: HRESULT = 1i32; pub const S_OK: HRESULT = 0i32; pub type THREAD_ACCESS_RIGHTS = u32; pub const THREAD_SYNCHRONIZE: THREAD_ACCESS_RIGHTS = 1048576u32; +pub const WAIT_ABANDONED: WIN32_ERROR = 128u32; +pub const WAIT_FAILED: WIN32_ERROR = 4294967295u32; pub const WAIT_OBJECT_0: WIN32_ERROR = 0u32; +pub const WAIT_TIMEOUT: WIN32_ERROR = 258u32; pub type WIN32_ERROR = u32; /// Adapted from From 4f127d255dd79e62e3d69aab2c1daf2fb350b6a7 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 09:26:46 +1000 Subject: [PATCH 11/22] Use Result::ok() in job_token.rs Co-authored-by: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> --- src/job_token.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 30049b56e..fd747785c 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -146,11 +146,7 @@ mod inprocess_jobserver { } }); - if res.is_ok() { - Some(JobToken()) - } else { - None - } + res.ok().map(|_| JobToken()) } pub(super) fn release_token_raw(&self) { From 2f1b5aaddd5c20b3f52f635d8bed59b21ae36524 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 09:28:30 +1000 Subject: [PATCH 12/22] Fix grammer in comments Co-authored-by: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 8264d9df0..b9d987180 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1467,7 +1467,7 @@ impl Build { // that OS cannot find other ready task to execute. // // If all of them are done, then we will yield them and spawn more, - // or simply returns. + // or simply return. // // Thus this will not be turned into a busy-wait loop and it will not // waste CPU resource. From 7bb7e407133e3342db7a45ef6f27c93f55420707 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 09:58:29 +1000 Subject: [PATCH 13/22] simplify job_token impl Co-authored-by: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> --- src/job_token.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index fd747785c..7520f91dc 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -139,11 +139,7 @@ mod inprocess_jobserver { pub(super) fn try_acquire(&self) -> Option { let res = self.0.fetch_update(Relaxed, Relaxed, |tokens| { - if tokens > 0 { - Some(tokens - 1) - } else { - None - } + tokens.checked_sub(1) }); res.ok().map(|_| JobToken()) From 1a45c58f3b4bf8d935482f195ea037d768c61c6d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 22:13:12 +1100 Subject: [PATCH 14/22] Add more comment explaining the design choice Signed-off-by: Jiahao XU --- src/job_token.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 7520f91dc..05a5cd026 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -27,6 +27,14 @@ pub(super) enum JobTokenServer { } impl JobTokenServer { + /// This function returns a static reference to the jobserver because + /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might + /// be closed by other jobserver users in the process) and better do it + /// at the start of the program. + /// - in case a jobserver cannot be created from env (e.g. it's not + /// present), we will create a global in-process only jobserver + /// that has to be static so that it will be shared by all cc + /// compilation. pub(crate) fn new() -> &'static Self { static INIT: Once = Once::new(); static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); @@ -96,6 +104,9 @@ mod inherited_jobserver { } pub(super) fn release_token_raw(&self) { + // All tokens will be put back into the jobserver immediately + // and they cannot be cached, since Rust does not call `Drop::drop` + // on global variables. if self .global_implicit_token .compare_exchange(false, true, Relaxed, Relaxed) @@ -138,9 +149,9 @@ mod inprocess_jobserver { } pub(super) fn try_acquire(&self) -> Option { - let res = self.0.fetch_update(Relaxed, Relaxed, |tokens| { - tokens.checked_sub(1) - }); + let res = self + .0 + .fetch_update(Relaxed, Relaxed, |tokens| tokens.checked_sub(1)); res.ok().map(|_| JobToken()) } From 31b3480df8232723f7e23839dca6abf29042d156 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 22:40:16 +1100 Subject: [PATCH 15/22] Refactor: Extract new mod `async_executor` Signed-off-by: Jiahao XU --- src/async_executor.rs | 118 ++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 105 +++---------------------------------- 2 files changed, 126 insertions(+), 97 deletions(-) create mode 100644 src/async_executor.rs diff --git a/src/async_executor.rs b/src/async_executor.rs new file mode 100644 index 000000000..ad9e62a65 --- /dev/null +++ b/src/async_executor.rs @@ -0,0 +1,118 @@ +use std::{ + cell::Cell, + future::Future, + pin::Pin, + ptr, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + thread, + time::Duration, +}; + +use crate::Error; + +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| NOOP_RAW_WAKER, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, +); +const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); + +#[derive(Default)] +pub(super) struct YieldOnce(bool); + +impl Future for YieldOnce { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + let flag = &mut std::pin::Pin::into_inner(self).0; + if !*flag { + *flag = true; + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +/// Execute the futures and return when they are all done. +/// +/// Here we use our own homebrew async executor since cc is used in the build +/// script of many popular projects, pulling in additional dependencies would +/// significantly slow down its compilation. +pub(super) fn block_on( + mut fut1: Fut1, + mut fut2: Fut2, + has_made_progress: &Cell, +) -> Result<(), Error> +where + Fut1: Future>, + Fut2: Future>, +{ + // Shadows the future so that it can never be moved and is guaranteed + // to be pinned. + // + // The same trick used in `pin!` macro. + // + // TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!` + let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) }); + let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) }); + + // TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version + // which it is stablised, replace this wth `Waker::noop`. + let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; + let mut context = Context::from_waker(&waker); + + let mut backoff_cnt = 0; + + loop { + has_made_progress.set(false); + + if let Some(fut) = fut2.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut2 = None; + res?; + } + } + + if let Some(fut) = fut1.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut1 = None; + res?; + } + } + + if fut1.is_none() && fut2.is_none() { + return Ok(()); + } + + if !has_made_progress.get() { + if backoff_cnt > 3 { + // We have yielded at least three times without making' + // any progress, so we will sleep for a while. + let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10)); + thread::sleep(duration); + } else { + // Given that we spawned a lot of compilation tasks, it is unlikely + // that OS cannot find other ready task to execute. + // + // If all of them are done, then we will yield them and spawn more, + // or simply return. + // + // Thus this will not be turned into a busy-wait loop and it will not + // waste CPU resource. + thread::yield_now(); + } + } + + backoff_cnt = if has_made_progress.get() { + 0 + } else { + backoff_cnt + 1 + }; + } +} diff --git a/src/lib.rs b/src/lib.rs index b9d987180..86e55f438 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,8 @@ use std::process::{Child, Command, Stdio}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "parallel")] +mod async_executor; #[cfg(feature = "parallel")] mod job_token; mod os_pipe; @@ -1297,13 +1299,9 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::{ - cell::Cell, - future::Future, - pin::Pin, - ptr, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, - }; + use std::cell::Cell; + + use async_executor::{block_on, YieldOnce}; if objs.len() <= 1 { for obj in objs { @@ -1345,7 +1343,7 @@ impl Build { let is_disconnected = Cell::new(false); let has_made_progress = Cell::new(false); - let mut wait_future = async { + let wait_future = async { let mut error = None; // Buffer the stdout let mut stdout = io::BufWriter::with_capacity(128, io::stdout()); @@ -1398,7 +1396,7 @@ impl Build { YieldOnce::default().await; } }; - let mut spawn_future = async { + let spawn_future = async { for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; let token = loop { @@ -1422,65 +1420,7 @@ impl Build { Ok::<_, Error>(()) }; - // Shadows the future so that it can never be moved and is guaranteed - // to be pinned. - // - // The same trick used in `pin!` macro. - let mut wait_future = Some(unsafe { Pin::new_unchecked(&mut wait_future) }); - let mut spawn_future = Some(unsafe { Pin::new_unchecked(&mut spawn_future) }); - - let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; - let mut context = Context::from_waker(&waker); - - let mut backoff_cnt = 0; - - loop { - has_made_progress.set(false); - - if let Some(fut) = spawn_future.as_mut() { - if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { - spawn_future = None; - res?; - } - } - - if let Some(fut) = wait_future.as_mut() { - if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { - wait_future = None; - res?; - } - } - - if wait_future.is_none() && spawn_future.is_none() { - return Ok(()); - } - - if !has_made_progress.get() { - if backoff_cnt > 3 { - // We have yielded at least three times without making' - // any progress, so we will sleep for a while. - let duration = - std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10)); - thread::sleep(duration); - } else { - // Given that we spawned a lot of compilation tasks, it is unlikely - // that OS cannot find other ready task to execute. - // - // If all of them are done, then we will yield them and spawn more, - // or simply return. - // - // Thus this will not be turned into a busy-wait loop and it will not - // waste CPU resource. - thread::yield_now(); - } - } - - backoff_cnt = if has_made_progress.get() { - 0 - } else { - backoff_cnt + 1 - }; - } + return block_on(wait_future, spawn_future, &has_made_progress); struct KillOnDrop(Child); @@ -1492,35 +1432,6 @@ impl Build { } } - #[derive(Default)] - struct YieldOnce(bool); - - impl Future for YieldOnce { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { - let flag = &mut std::pin::Pin::into_inner(self).0; - if !*flag { - *flag = true; - Poll::Pending - } else { - Poll::Ready(()) - } - } - } - - const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - // Cloning just returns a new no-op raw waker - |_| NOOP_RAW_WAKER, - // `wake` does nothing - |_| {}, - // `wake_by_ref` does nothing - |_| {}, - // Dropping does nothing as we don't allocate anything - |_| {}, - ); - const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); - fn cell_update(cell: &Cell, f: F) where T: Default, From 5cd8470069d6ca1d79bae11baffe9d800086994b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 6 Nov 2023 08:58:52 +1100 Subject: [PATCH 16/22] Update src/job_token/unix.rs Co-authored-by: Thom Chiovoloni --- src/job_token/unix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/job_token/unix.rs b/src/job_token/unix.rs index 6af5e3944..7c8a6a978 100644 --- a/src/job_token/unix.rs +++ b/src/job_token/unix.rs @@ -141,7 +141,7 @@ impl JobServerClient { // // If it does happen, it is likely a bug in the program using this // crate or some other programs that use the same jobserver have a - // bug in their code + // bug in their code. // // If that turns out to not be the case we'll get an error anyway! let mut write = self.write.as_ref().unwrap_or(&self.read); From 34ef631e01cd6b2eab92324c6f9030099364360b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 6 Nov 2023 22:58:03 +1100 Subject: [PATCH 17/22] Remove outdated comment Signed-off-by: Jiahao XU --- src/job_token/unix.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/job_token/unix.rs b/src/job_token/unix.rs index 7c8a6a978..af626c24d 100644 --- a/src/job_token/unix.rs +++ b/src/job_token/unix.rs @@ -131,10 +131,6 @@ impl JobServerClient { } pub(super) fn release(&self) -> io::Result<()> { - // Note that the fd may be nonblocking but we're going to go ahead - // and assume that the writes here are always nonblocking (we can - // always quickly release a token). - // // For write to block, this would mean that pipe is full. // If all every release are pair with an acquire, then this cannot // happen. From b06b9206908f7751e76dab99f54f0d1da99e498b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 6 Nov 2023 23:02:34 +1100 Subject: [PATCH 18/22] Do not check for `--jobserver-fds` on windows Since the manual specifies that only `--jobsewrver-auth` will be used and windows does not have the concept of fds anyway. Signed-off-by: Jiahao XU --- src/job_token/windows.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs index e77c9d74c..7f97d1144 100644 --- a/src/job_token/windows.rs +++ b/src/job_token/windows.rs @@ -24,10 +24,7 @@ impl JobServerClient { let s = var .to_str()? .split_ascii_whitespace() - .filter_map(|arg| { - arg.strip_prefix("--jobserver-fds=") - .or_else(|| arg.strip_prefix("--jobserver-auth=")) - }) + .filter_map(|arg| arg.strip_prefix("--jobserver-auth=")) .find(|s| !s.is_empty())?; let name = CString::new(s).ok()?; From c36493fc0b286ac763f6cde5c804abc6a6cb8bd9 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 6 Nov 2023 23:14:56 +1100 Subject: [PATCH 19/22] Accept ASCII only in windows `JobServerClient::open` impl Signed-off-by: Jiahao XU --- src/job_token/windows.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs index 7f97d1144..a44fabc8b 100644 --- a/src/job_token/windows.rs +++ b/src/job_token/windows.rs @@ -21,8 +21,19 @@ unsafe impl Send for JobServerClient {} impl JobServerClient { pub(super) unsafe fn open(var: OsString) -> Option { + let var = var.to_str()?; + if !var.is_ascii() { + // `OpenSemaphoreA` only accepts ASCII, not utf-8. + // + // Upstream implementation jobserver and jobslot also uses the + // same function and they works without problem, so there's no + // motivation to support utf-8 here using `OpenSemaphoreW` + // which only makes the code harder to maintain by making it more + // different than upstream. + return None; + } + let s = var - .to_str()? .split_ascii_whitespace() .filter_map(|arg| arg.strip_prefix("--jobserver-auth=")) .find(|s| !s.is_empty())?; From c7f2d74eac356894808a272be880ecb599b7bb60 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 6 Nov 2023 23:25:24 +1100 Subject: [PATCH 20/22] Use acquire and release ordering for atomic operation in `JobServer` Signed-off-by: Jiahao XU --- src/job_token.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 05a5cd026..ac503a4ec 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -64,7 +64,10 @@ mod inherited_jobserver { use std::{ env::var_os, - sync::atomic::{AtomicBool, Ordering::Relaxed}, + sync::atomic::{ + AtomicBool, + Ordering::{AcqRel, Acquire}, + }, }; pub(crate) struct JobServer { @@ -94,7 +97,7 @@ mod inherited_jobserver { } pub(super) fn try_acquire(&self) -> Result, Error> { - if !self.global_implicit_token.swap(false, Relaxed) { + if !self.global_implicit_token.swap(false, AcqRel) { // Cold path, no global implicit token, obtain one if self.inner.try_acquire()?.is_none() { return Ok(None); @@ -109,7 +112,7 @@ mod inherited_jobserver { // on global variables. if self .global_implicit_token - .compare_exchange(false, true, Relaxed, Relaxed) + .compare_exchange(false, true, AcqRel, Acquire) .is_err() { // There's already a global implicit token, so this token must @@ -125,7 +128,10 @@ mod inprocess_jobserver { use std::{ env::var, - sync::atomic::{AtomicU32, Ordering::Relaxed}, + sync::atomic::{ + AtomicU32, + Ordering::{AcqRel, Acquire}, + }, }; pub(crate) struct JobServer(AtomicU32); @@ -151,13 +157,13 @@ mod inprocess_jobserver { pub(super) fn try_acquire(&self) -> Option { let res = self .0 - .fetch_update(Relaxed, Relaxed, |tokens| tokens.checked_sub(1)); + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); res.ok().map(|_| JobToken()) } pub(super) fn release_token_raw(&self) { - self.0.fetch_add(1, Relaxed); + self.0.fetch_add(1, AcqRel); } } } From 4e5536c6e4463eb18f5d7fdbf6ab1f5366e79c9e Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Tue, 7 Nov 2023 20:15:29 +1100 Subject: [PATCH 21/22] Add a TODO for use of `NUM_JOBS` Signed-off-by: Jiahao XU --- src/job_token.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/job_token.rs b/src/job_token.rs index ac503a4ec..4b4fa989e 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -145,6 +145,8 @@ mod inprocess_jobserver { // dependency that will almost never be used, so // it's generally not too worth it. let mut parallelism = 4; + // TODO: Use std::thread::available_parallelism as an upper bound + // when MSRV is bumped. if let Ok(amt) = var("NUM_JOBS") { if let Ok(amt) = amt.parse() { parallelism = amt; From e7dbd3e8325d660ad155054c5a22d33929ebda14 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Tue, 7 Nov 2023 20:22:28 +1100 Subject: [PATCH 22/22] Simplify windows jobserver `WAIT_ABANDONED` errmsg Signed-off-by: Jiahao XU --- src/job_token/windows.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/job_token/windows.rs b/src/job_token/windows.rs index a44fabc8b..03ec8869b 100644 --- a/src/job_token/windows.rs +++ b/src/job_token/windows.rs @@ -8,10 +8,6 @@ use crate::windows_sys::{ THREAD_SYNCHRONIZE, WAIT_ABANDONED, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, }; -const WAIT_ABANDOEND_ERR_MSG: &str = r#" The specified object is a mutex object that was not released by the thread that owned the mutex object before the owning thread terminated. Ownership of the mutex object is granted to the calling thread and the mutex state is set to nonsignaled. - -If the mutex was protecting persistent state information, you should check it for consistency."#; - pub(super) struct JobServerClient { sem: HANDLE, } @@ -57,7 +53,12 @@ impl JobServerClient { WAIT_OBJECT_0 => Ok(Some(())), WAIT_TIMEOUT => Ok(None), WAIT_FAILED => Err(io::Error::last_os_error()), - WAIT_ABANDONED => Err(io::Error::new(io::ErrorKind::Other, WAIT_ABANDOEND_ERR_MSG)), + // We believe this should be impossible for a semaphore, but still + // check the error code just in case it happens. + WAIT_ABANDONED => Err(io::Error::new( + io::ErrorKind::Other, + "Wait on jobserver semaphore returned WAIT_ABANDONED", + )), _ => unreachable!("Unexpected return value from WaitForSingleObject"), } }