Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first #878

Merged
merged 12 commits into from
Oct 19, 2023
Merged
53 changes: 52 additions & 1 deletion src/job_token.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use jobserver::{Acquired, Client, HelperThread};
use std::sync::mpsc::{self, Receiver, Sender};
use std::{
env,
sync::{
mpsc::{self, Receiver, Sender},
Once,
},
};

pub(crate) struct JobToken {
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
Expand Down Expand Up @@ -68,3 +74,48 @@ impl JobTokenServer {
}
}
}

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
pub(super) fn jobserver() -> jobserver::Client {
osiewicz marked this conversation as resolved.
Show resolved Hide resolved
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.clone().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}
49 changes: 2 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::sync::{mpsc, Once};
use std::sync::mpsc;

if objs.len() <= 1 {
for obj in objs {
Expand All @@ -1306,7 +1306,7 @@ impl Build {
}

// Limit our parallelism globally with a jobserver.
let server = jobserver();
let server = job_token::jobserver();
// Reacquire our process's token on drop

// When compiling objects in parallel we do a few dirty tricks to speed
Expand Down Expand Up @@ -1440,51 +1440,6 @@ impl Build {

return wait_thread.join().expect("wait_thread panics");

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> jobserver::Client {
static INIT: Once = Once::new();
static mut JOBSERVER: Option<jobserver::Client> = None;

fn _assert_sync<T: Sync>() {}
_assert_sync::<jobserver::Client>();

unsafe {
INIT.call_once(|| {
let server = default_jobserver();
JOBSERVER = Some(server);
});
JOBSERVER.clone().unwrap()
}
}

unsafe fn default_jobserver() -> jobserver::Client {
// Try to use the environmental jobserver which Cargo typically
// initializes for us...
if let Some(client) = jobserver::Client::from_env() {
return client;
}

// ... but if that fails for whatever reason select something
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
// configured by Cargo) and otherwise just fall back to a
// semi-reasonable number. Note that we could use `num_cpus` here
// but it's an extra dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
if let Ok(amt) = env::var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

// If we create our own jobserver then be sure to reserve one token
// for ourselves.
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
client.acquire_raw().expect("failed to acquire initial");
return client;
}

struct KillOnDrop(Child);

impl Drop for KillOnDrop {
Expand Down