Skip to content

Commit

Permalink
Merge pull request #878 from osiewicz/no_drops_on_implicit_job_token
Browse files Browse the repository at this point in the history
feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first
  • Loading branch information
NobodyXu authored Oct 19, 2023
2 parents 2447a2b + 65ab371 commit 4f9866e
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 69 deletions.
139 changes: 139 additions & 0 deletions src/job_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use jobserver::{Acquired, Client, HelperThread};
use std::{
env,
mem::MaybeUninit,
sync::{
mpsc::{self, Receiver, Sender},
Once,
},
};

pub(crate) struct JobToken {
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
/// Both are valid values to put into queue.
token: Option<Acquired>,
/// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead
/// of storing it back in the pool - see [`Self::forget()`] function for that.
pool: Option<Sender<Option<Result<Acquired, crate::Error>>>>,
}

impl Drop for JobToken {
fn drop(&mut self) {
if let Some(pool) = &self.pool {
// Always send back an Ok() variant as we know that the acquisition for this token has succeeded.
let _ = pool.send(self.token.take().map(|token| Ok(token)));
}
}
}

impl JobToken {
/// Ensure that this token is not put back into queue once it's dropped.
/// This also leads to releasing it sooner for other processes to use,
/// which is a correct thing to do once it is known that there won't be
/// any more token acquisitions.
pub(crate) fn forget(&mut self) {
self.pool.take();
}
}

/// A thin wrapper around jobserver's Client.
/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse
/// our own implicit token assigned for this build script. This struct manages that and
/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver.
/// Furthermore, instead of giving up job tokens, it keeps them around
/// for reuse if we know we're going to request another token after freeing the current one.
pub(crate) struct JobTokenServer {
helper: HelperThread,
tx: Sender<Option<Result<Acquired, crate::Error>>>,
rx: Receiver<Option<Result<Acquired, crate::Error>>>,
}

impl JobTokenServer {
pub(crate) fn new() -> &'static Self {
jobserver()
}
fn new_inner(client: Client) -> Result<Self, crate::Error> {
let (tx, rx) = mpsc::channel();
// Push the implicit token. Since JobTokens only give back what they got,
// there should be at most one global implicit token in the wild.
tx.send(None).unwrap();
let pool = tx.clone();
let helper = client.into_helper_thread(move |acq| {
let _ = pool.send(Some(acq.map_err(|e| e.into())));
})?;
Ok(Self { helper, tx, rx })
}

pub(crate) fn acquire(&self) -> Result<JobToken, crate::Error> {
let token = if let Ok(token) = self.rx.try_recv() {
// Opportunistically check if there's a token that can be reused.
token
} else {
// Cold path, request a token and block
self.helper.request_token();
self.rx.recv().unwrap()
};
let token = if let Some(token) = token {
Some(token?)
} else {
None
};
Ok(JobToken {
token,
pool: Some(self.tx.clone()),
})
}
}

/// Returns a suitable `JobTokenServer` used to coordinate
/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures
/// that only one implicit job token is used in the wild.
/// Having multiple separate job token servers would lead to each of them assuming that they have control
/// over the implicit job token.
/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most
/// one implicit job token in the wild.
fn jobserver() -> &'static JobTokenServer {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = MaybeUninit::new(
JobTokenServer::new_inner(server).expect("Job server initialization failed"),
);
});
// Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
&*JOBSERVER.as_ptr()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}
94 changes: 25 additions & 69 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

#[cfg(feature = "parallel")]
mod job_token;
mod os_pipe;

// These modules are all glue to support reading the MSVC version from
// the registry and from COM interfaces
#[cfg(windows)]
Expand Down Expand Up @@ -1294,7 +1295,7 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::sync::{mpsc, Once};
use std::sync::mpsc;

if objs.len() <= 1 {
for obj in objs {
Expand All @@ -1305,14 +1306,8 @@ impl Build {
return Ok(());
}

// Limit our parallelism globally with a jobserver. Start off by
// releasing our own token for this process so we can have a bit of an
// easier to write loop below. If this fails, though, then we're likely
// on Windows with the main implicit token, so we just have a bit extra
// parallelism for a bit and don't reacquire later.
let server = jobserver();
// Reacquire our process's token on drop
let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
// Limit our parallelism globally with a jobserver.
let tokens = crate::job_token::JobTokenServer::new();

// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
Expand All @@ -1333,7 +1328,7 @@ impl Build {
// acquire the appropriate tokens, Once all objects have been compiled
// we wait on all the processes and propagate the results of compilation.

let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>();

// Since jobserver::Client::acquire can block, waiting
// must be done in parallel so that acquire won't block forever.
Expand All @@ -1346,7 +1341,13 @@ impl Build {

loop {
let mut has_made_progress = false;

// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
// so it doesn't make sense to reuse the tokens; in fact,
// releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
// Imagine that the last file built takes an hour to finish; in this scenario,
// by not releasing the tokens before that last file is done we would effectively block other processes from
// starting sooner - even though we only need one token for that last file, not N others that were acquired.
let mut is_disconnected = false;
// Reading new pending tasks
loop {
match rx.try_recv() {
Expand All @@ -1362,23 +1363,32 @@ impl Build {
Ok(())
};
}
Err(mpsc::TryRecvError::Disconnected) => {
is_disconnected = true;
break;
}
_ => break,
}
}

// Try waiting on them.
pendings.retain_mut(|(cmd, program, child, _)| {
pendings.retain_mut(|(cmd, program, child, token)| {
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
Ok(Some(())) => {
// Task done, remove the entry
if is_disconnected {
token.forget();
}
has_made_progress = true;
false
}
Ok(None) => true, // Task still not finished, keep the entry
Err(err) => {
// Task fail, remove the entry.
has_made_progress = true;

if is_disconnected {
token.forget();
}
// Since we can only return one error, log the error to make
// sure users always see all the compilation failures.
let _ = writeln!(stdout, "cargo:warning={}", err);
Expand Down Expand Up @@ -1416,11 +1426,9 @@ impl Build {
};
}
})?;

for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = server.acquire()?;

let token = tokens.acquire()?;
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

tx.send((cmd, program, KillOnDrop(child), token))
Expand All @@ -1431,51 +1439,6 @@ impl Build {

return wait_thread.join().expect("wait_thread panics");

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.as_ref().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}

struct KillOnDrop(Child);

impl Drop for KillOnDrop {
Expand All @@ -1485,13 +1448,6 @@ impl Build {
child.kill().ok();
}
}

struct JobserverToken(&'static jobserver::Client);
impl Drop for JobserverToken {
fn drop(&mut self) {
let _ = self.0.acquire_raw();
}
}
}

#[cfg(not(feature = "parallel"))]
Expand Down

0 comments on commit 4f9866e

Please sign in to comment.