Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first #878

Merged
merged 12 commits into from
Oct 19, 2023
Merged
70 changes: 70 additions & 0 deletions src/job_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use jobserver::{Acquired, Client, HelperThread};
use std::sync::mpsc::{self, Receiver, Sender};

pub(crate) struct JobToken {
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
/// Both are valid values to put into queue.
token: Option<Acquired>,
pool: Sender<Option<Acquired>>,
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
should_return_to_queue: bool,
}

impl Drop for JobToken {
fn drop(&mut self) {
if self.should_return_to_queue {
let _ = self.pool.send(self.token.take());
}
}
}

impl JobToken {
/// Ensure that this token is not put back into queue once it's dropped.
/// This also leads to releasing it sooner for other processes to use,
/// which is a correct thing to do once it is known that there won't be
/// any more token acquisitions.
pub(crate) fn forget(&mut self) {
self.should_return_to_queue = false;
}
}

/// A thin wrapper around jobserver's Client.
/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse
/// our own implicit token assigned for this build script. This struct manages that and
/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver.
/// Furthermore, instead of giving up job tokens, it keeps them around
/// for reuse if we know we're going to request another token after freeing the current one.
pub(crate) struct JobTokenServer {
helper: HelperThread,
tx: Sender<Option<Acquired>>,
rx: Receiver<Option<Acquired>>,
}

impl JobTokenServer {
pub(crate) fn new(client: Client) -> Result<Self, crate::Error> {
let (tx, rx) = mpsc::channel();
// Push the implicit token. Since JobTokens only give back what they got,
// there should be at most one global implicit token in the wild.
tx.send(None).unwrap();
let pool = tx.clone();
let helper = client.into_helper_thread(move |acq| {
let _ = pool.send(Some(acq.unwrap()));
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
})?;
Ok(Self { helper, tx, rx })
}

pub(crate) fn acquire(&mut self) -> JobToken {
let token = if let Ok(token) = self.rx.try_recv() {
// Opportunistically check if there's a token that can be reused.
token
} else {
// Cold path, request a token and block
self.helper.request_token();
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
self.rx.recv().unwrap()
};
JobToken {
token,
pool: self.tx.clone(),
should_return_to_queue: true,
}
}
}
49 changes: 26 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

#[cfg(feature = "parallel")]
mod job_token;
mod os_pipe;

// These modules are all glue to support reading the MSVC version from
// the registry and from COM interfaces
#[cfg(windows)]
Expand Down Expand Up @@ -1304,14 +1305,9 @@ impl Build {
return Ok(());
}

// Limit our parallelism globally with a jobserver. Start off by
// releasing our own token for this process so we can have a bit of an
// easier to write loop below. If this fails, though, then we're likely
// on Windows with the main implicit token, so we just have a bit extra
// parallelism for a bit and don't reacquire later.
// Limit our parallelism globally with a jobserver.
let server = jobserver();
// Reacquire our process's token on drop
let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));

// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
Expand All @@ -1332,7 +1328,7 @@ impl Build {
// acquire the appropriate tokens, Once all objects have been compiled
// we wait on all the processes and propagate the results of compilation.

let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>();

// Since jobserver::Client::acquire can block, waiting
// must be done in parallel so that acquire won't block forever.
Expand All @@ -1345,7 +1341,13 @@ impl Build {

loop {
let mut has_made_progress = false;

// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
// so it doesn't make sense to reuse the tokens; in fact,
// releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
// Imagine that the last file built takes an hour to finish; in this scenario,
// by not releasing the tokens before that last file is done we would effectively block other processes from
// starting sooner - even though we only need one token for that last file, not N others that were acquired.
let mut is_disconnected = false;
// Reading new pending tasks
loop {
match rx.try_recv() {
Expand All @@ -1361,23 +1363,32 @@ impl Build {
Ok(())
};
}
Err(mpsc::TryRecvError::Disconnected) => {
is_disconnected = true;
break;
}
_ => break,
}
}

// Try waiting on them.
pendings.retain_mut(|(cmd, program, child, _)| {
pendings.retain_mut(|(cmd, program, child, token)| {
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
Ok(Some(())) => {
// Task done, remove the entry
if is_disconnected {
token.forget();
}
has_made_progress = true;
false
}
Ok(None) => true, // Task still not finished, keep the entry
Err(err) => {
// Task fail, remove the entry.
has_made_progress = true;

if is_disconnected {
token.forget();
}
// Since we can only return one error, log the error to make
// sure users always see all the compilation failures.
let _ = writeln!(stdout, "cargo:warning={}", err);
Expand Down Expand Up @@ -1415,11 +1426,10 @@ impl Build {
};
}
})?;

let mut tokens = crate::job_token::JobTokenServer::new(server)?;
for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = server.acquire()?;

let token = tokens.acquire();
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

tx.send((cmd, program, KillOnDrop(child), token))
Expand All @@ -1432,7 +1442,7 @@ impl Build {

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
fn jobserver() -> jobserver::Client {
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

Expand All @@ -1444,7 +1454,7 @@ impl Build {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.as_ref().unwrap()
JOBSERVER.clone().unwrap()
}
}

Expand Down Expand Up @@ -1484,13 +1494,6 @@ impl Build {
child.kill().ok();
}
}

struct JobserverToken(&'static jobserver::Client);
impl Drop for JobserverToken {
fn drop(&mut self) {
let _ = self.0.acquire_raw();
}
}
}

#[cfg(not(feature = "parallel"))]
Expand Down