diff --git a/src/job_token.rs b/src/job_token.rs new file mode 100644 index 000000000..818917c8d --- /dev/null +++ b/src/job_token.rs @@ -0,0 +1,139 @@ +use jobserver::{Acquired, Client, HelperThread}; +use std::{ + env, + mem::MaybeUninit, + sync::{ + mpsc::{self, Receiver, Sender}, + Once, + }, +}; + +pub(crate) struct JobToken { + /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. + /// Both are valid values to put into queue. + token: Option, + /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead + /// of storing it back in the pool - see [`Self::forget()`] function for that. + pool: Option>>>, +} + +impl Drop for JobToken { + fn drop(&mut self) { + if let Some(pool) = &self.pool { + // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. + let _ = pool.send(self.token.take().map(|token| Ok(token))); + } + } +} + +impl JobToken { + /// Ensure that this token is not put back into queue once it's dropped. + /// This also leads to releasing it sooner for other processes to use, + /// which is a correct thing to do once it is known that there won't be + /// any more token acquisitions. + pub(crate) fn forget(&mut self) { + self.pool.take(); + } +} + +/// A thin wrapper around jobserver's Client. +/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse +/// our own implicit token assigned for this build script. This struct manages that and +/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. +/// Furthermore, instead of giving up job tokens, it keeps them around +/// for reuse if we know we're going to request another token after freeing the current one. +pub(crate) struct JobTokenServer { + helper: HelperThread, + tx: Sender>>, + rx: Receiver>>, +} + +impl JobTokenServer { + pub(crate) fn new() -> &'static Self { + jobserver() + } + fn new_inner(client: Client) -> Result { + let (tx, rx) = mpsc::channel(); + // Push the implicit token. Since JobTokens only give back what they got, + // there should be at most one global implicit token in the wild. + tx.send(None).unwrap(); + let pool = tx.clone(); + let helper = client.into_helper_thread(move |acq| { + let _ = pool.send(Some(acq.map_err(|e| e.into()))); + })?; + Ok(Self { helper, tx, rx }) + } + + pub(crate) fn acquire(&self) -> Result { + let token = if let Ok(token) = self.rx.try_recv() { + // Opportunistically check if there's a token that can be reused. + token + } else { + // Cold path, request a token and block + self.helper.request_token(); + self.rx.recv().unwrap() + }; + let token = if let Some(token) = token { + Some(token?) + } else { + None + }; + Ok(JobToken { + token, + pool: Some(self.tx.clone()), + }) + } +} + +/// Returns a suitable `JobTokenServer` used to coordinate +/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures +/// that only one implicit job token is used in the wild. +/// Having multiple separate job token servers would lead to each of them assuming that they have control +/// over the implicit job token. +/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most +/// one implicit job token in the wild. +fn jobserver() -> &'static JobTokenServer { + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); + + fn _assert_sync() {} + _assert_sync::(); + + unsafe { + INIT.call_once(|| { + let server = default_jobserver(); + JOBSERVER = MaybeUninit::new( + JobTokenServer::new_inner(server).expect("Job server initialization failed"), + ); + }); + // Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } +} + +unsafe fn default_jobserver() -> jobserver::Client { + // Try to use the environmental jobserver which Cargo typically + // initializes for us... + if let Some(client) = jobserver::Client::from_env() { + return client; + } + + // ... but if that fails for whatever reason select something + // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's + // configured by Cargo) and otherwise just fall back to a + // semi-reasonable number. Note that we could use `num_cpus` here + // but it's an extra dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + if let Ok(amt) = env::var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + // If we create our own jobserver then be sure to reserve one token + // for ourselves. + let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); + client.acquire_raw().expect("failed to acquire initial"); + return client; +} diff --git a/src/lib.rs b/src/lib.rs index 6257fa0df..21d4e0e45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "parallel")] +mod job_token; mod os_pipe; - // These modules are all glue to support reading the MSVC version from // the registry and from COM interfaces #[cfg(windows)] @@ -1293,7 +1294,7 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::{mpsc, Once}; + use std::sync::mpsc; if objs.len() <= 1 { for obj in objs { @@ -1304,14 +1305,8 @@ impl Build { return Ok(()); } - // Limit our parallelism globally with a jobserver. Start off by - // releasing our own token for this process so we can have a bit of an - // easier to write loop below. If this fails, though, then we're likely - // on Windows with the main implicit token, so we just have a bit extra - // parallelism for a bit and don't reacquire later. - let server = jobserver(); - // Reacquire our process's token on drop - let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server)); + // Limit our parallelism globally with a jobserver. + let tokens = crate::job_token::JobTokenServer::new(); // When compiling objects in parallel we do a few dirty tricks to speed // things up: @@ -1332,7 +1327,7 @@ impl Build { // acquire the appropriate tokens, Once all objects have been compiled // we wait on all the processes and propagate the results of compilation. - let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>(); + let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>(); // Since jobserver::Client::acquire can block, waiting // must be done in parallel so that acquire won't block forever. @@ -1345,7 +1340,13 @@ impl Build { loop { let mut has_made_progress = false; - + // If the other end of the pipe is already disconnected, then we're not gonna get any new jobs, + // so it doesn't make sense to reuse the tokens; in fact, + // releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. + // Imagine that the last file built takes an hour to finish; in this scenario, + // by not releasing the tokens before that last file is done we would effectively block other processes from + // starting sooner - even though we only need one token for that last file, not N others that were acquired. + let mut is_disconnected = false; // Reading new pending tasks loop { match rx.try_recv() { @@ -1361,15 +1362,22 @@ impl Build { Ok(()) }; } + Err(mpsc::TryRecvError::Disconnected) => { + is_disconnected = true; + break; + } _ => break, } } // Try waiting on them. - pendings.retain_mut(|(cmd, program, child, _)| { + pendings.retain_mut(|(cmd, program, child, token)| { match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { Ok(Some(())) => { // Task done, remove the entry + if is_disconnected { + token.forget(); + } has_made_progress = true; false } @@ -1377,7 +1385,9 @@ impl Build { Err(err) => { // Task fail, remove the entry. has_made_progress = true; - + if is_disconnected { + token.forget(); + } // Since we can only return one error, log the error to make // sure users always see all the compilation failures. let _ = writeln!(stdout, "cargo:warning={}", err); @@ -1415,11 +1425,9 @@ impl Build { }; } })?; - for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = server.acquire()?; - + let token = tokens.acquire()?; let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; tx.send((cmd, program, KillOnDrop(child), token)) @@ -1430,51 +1438,6 @@ impl Build { return wait_thread.join().expect("wait_thread panics"); - /// Returns a suitable `jobserver::Client` used to coordinate - /// parallelism between build scripts. - fn jobserver() -> &'static jobserver::Client { - static INIT: Once = Once::new(); - static mut JOBSERVER: Option = None; - - fn _assert_sync() {} - _assert_sync::(); - - unsafe { - INIT.call_once(|| { - let server = default_jobserver(); - JOBSERVER = Some(server); - }); - JOBSERVER.as_ref().unwrap() - } - } - - unsafe fn default_jobserver() -> jobserver::Client { - // Try to use the environmental jobserver which Cargo typically - // initializes for us... - if let Some(client) = jobserver::Client::from_env() { - return client; - } - - // ... but if that fails for whatever reason select something - // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's - // configured by Cargo) and otherwise just fall back to a - // semi-reasonable number. Note that we could use `num_cpus` here - // but it's an extra dependency that will almost never be used, so - // it's generally not too worth it. - let mut parallelism = 4; - if let Ok(amt) = env::var("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - parallelism = amt; - } - } - - // If we create our own jobserver then be sure to reserve one token - // for ourselves. - let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); - client.acquire_raw().expect("failed to acquire initial"); - return client; - } - struct KillOnDrop(Child); impl Drop for KillOnDrop { @@ -1484,13 +1447,6 @@ impl Build { child.kill().ok(); } } - - struct JobserverToken(&'static jobserver::Client); - impl Drop for JobserverToken { - fn drop(&mut self) { - let _ = self.0.acquire_raw(); - } - } } #[cfg(not(feature = "parallel"))]