Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first #878

Merged
merged 12 commits into from
Oct 19, 2023
Merged
139 changes: 139 additions & 0 deletions src/job_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use jobserver::{Acquired, Client, HelperThread};
use std::{
env,
mem::MaybeUninit,
sync::{
mpsc::{self, Receiver, Sender},
Once,
},
};

pub(crate) struct JobToken {
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
/// Both are valid values to put into queue.
token: Option<Acquired>,
/// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead
/// of storing it back in the pool - see [`Self::forget()`] function for that.
pool: Option<Sender<Option<Result<Acquired, crate::Error>>>>,
}

impl Drop for JobToken {
fn drop(&mut self) {
if let Some(pool) = &self.pool {
// Always send back an Ok() variant as we know that the acquisition for this token has succeeded.
let _ = pool.send(self.token.take().map(|token| Ok(token)));
}
}
}

impl JobToken {
/// Ensure that this token is not put back into queue once it's dropped.
/// This also leads to releasing it sooner for other processes to use,
/// which is a correct thing to do once it is known that there won't be
/// any more token acquisitions.
pub(crate) fn forget(&mut self) {
self.pool.take();
}
}

/// A thin wrapper around jobserver's Client.
/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse
/// our own implicit token assigned for this build script. This struct manages that and
/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver.
/// Furthermore, instead of giving up job tokens, it keeps them around
/// for reuse if we know we're going to request another token after freeing the current one.
pub(crate) struct JobTokenServer {
helper: HelperThread,
tx: Sender<Option<Result<Acquired, crate::Error>>>,
rx: Receiver<Option<Result<Acquired, crate::Error>>>,
}

impl JobTokenServer {
pub(crate) fn new() -> &'static Self {
jobserver()
}
fn new_inner(client: Client) -> Result<Self, crate::Error> {
let (tx, rx) = mpsc::channel();
// Push the implicit token. Since JobTokens only give back what they got,
// there should be at most one global implicit token in the wild.
tx.send(None).unwrap();
let pool = tx.clone();
let helper = client.into_helper_thread(move |acq| {
let _ = pool.send(Some(acq.map_err(|e| e.into())));
})?;
Ok(Self { helper, tx, rx })
}

pub(crate) fn acquire(&self) -> Result<JobToken, crate::Error> {
let token = if let Ok(token) = self.rx.try_recv() {
// Opportunistically check if there's a token that can be reused.
token
} else {
// Cold path, request a token and block
self.helper.request_token();
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
self.rx.recv().unwrap()
};
let token = if let Some(token) = token {
Some(token?)
} else {
None
};
Ok(JobToken {
token,
pool: Some(self.tx.clone()),
})
}
}

/// Returns a suitable `JobTokenServer` used to coordinate
/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures
/// that only one implicit job token is used in the wild.
/// Having multiple separate job token servers would lead to each of them assuming that they have control
/// over the implicit job token.
/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most
/// one implicit job token in the wild.
fn jobserver() -> &'static JobTokenServer {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = MaybeUninit::new(
JobTokenServer::new_inner(server).expect("Job server initialization failed"),
);
});
// Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
&*JOBSERVER.as_ptr()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}
94 changes: 25 additions & 69 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

#[cfg(feature = "parallel")]
mod job_token;
mod os_pipe;

// These modules are all glue to support reading the MSVC version from
// the registry and from COM interfaces
#[cfg(windows)]
Expand Down Expand Up @@ -1293,7 +1294,7 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::sync::{mpsc, Once};
use std::sync::mpsc;

if objs.len() <= 1 {
for obj in objs {
Expand All @@ -1304,14 +1305,8 @@ impl Build {
return Ok(());
}

// Limit our parallelism globally with a jobserver. Start off by
// releasing our own token for this process so we can have a bit of an
// easier to write loop below. If this fails, though, then we're likely
// on Windows with the main implicit token, so we just have a bit extra
// parallelism for a bit and don't reacquire later.
let server = jobserver();
// Reacquire our process's token on drop
let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
// Limit our parallelism globally with a jobserver.
let tokens = crate::job_token::JobTokenServer::new();

// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
Expand All @@ -1332,7 +1327,7 @@ impl Build {
// acquire the appropriate tokens, Once all objects have been compiled
// we wait on all the processes and propagate the results of compilation.

let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>();

// Since jobserver::Client::acquire can block, waiting
// must be done in parallel so that acquire won't block forever.
Expand All @@ -1345,7 +1340,13 @@ impl Build {

loop {
let mut has_made_progress = false;

// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
// so it doesn't make sense to reuse the tokens; in fact,
// releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
// Imagine that the last file built takes an hour to finish; in this scenario,
// by not releasing the tokens before that last file is done we would effectively block other processes from
// starting sooner - even though we only need one token for that last file, not N others that were acquired.
let mut is_disconnected = false;
// Reading new pending tasks
loop {
match rx.try_recv() {
Expand All @@ -1361,23 +1362,32 @@ impl Build {
Ok(())
};
}
Err(mpsc::TryRecvError::Disconnected) => {
is_disconnected = true;
break;
}
_ => break,
}
}

// Try waiting on them.
pendings.retain_mut(|(cmd, program, child, _)| {
pendings.retain_mut(|(cmd, program, child, token)| {
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
Ok(Some(())) => {
// Task done, remove the entry
if is_disconnected {
token.forget();
}
has_made_progress = true;
false
}
Ok(None) => true, // Task still not finished, keep the entry
Err(err) => {
// Task fail, remove the entry.
has_made_progress = true;

if is_disconnected {
token.forget();
}
// Since we can only return one error, log the error to make
// sure users always see all the compilation failures.
let _ = writeln!(stdout, "cargo:warning={}", err);
Expand Down Expand Up @@ -1415,11 +1425,9 @@ impl Build {
};
}
})?;

for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = server.acquire()?;

let token = tokens.acquire()?;
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

tx.send((cmd, program, KillOnDrop(child), token))
Expand All @@ -1430,51 +1438,6 @@ impl Build {

return wait_thread.join().expect("wait_thread panics");

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.as_ref().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}

struct KillOnDrop(Child);

impl Drop for KillOnDrop {
Expand All @@ -1484,13 +1447,6 @@ impl Build {
child.kill().ok();
}
}

struct JobserverToken(&'static jobserver::Client);
impl Drop for JobserverToken {
fn drop(&mut self) {
let _ = self.0.acquire_raw();
}
}
}

#[cfg(not(feature = "parallel"))]
Expand Down