Skip to content

Commit

Permalink
Use rayon global thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
deedy5 committed Jul 27, 2024
1 parent ca57cad commit cfacbf6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ahash = "0.8"
indexmap = { version = "2", features = ["serde"] }
tokio = { version = "1", features = ["rt"] }
html2text = "0.12"
rayon = "1"

[profile.release]
codegen-units = 1
Expand Down
25 changes: 21 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::sync::{mpsc, Arc, OnceLock};
use std::time::Duration;

use ahash::RandomState;
use indexmap::IndexMap;
use pyo3::exceptions;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyString};
use rayon::{ThreadPool, ThreadPoolBuilder};
use rquest::header::{HeaderMap, HeaderName, HeaderValue, COOKIE};
use rquest::impersonate::Impersonate;
use rquest::multipart;
Expand All @@ -20,6 +21,12 @@ use response::Response;
mod utils;
use utils::{get_encoding_from_content, get_encoding_from_headers, json_dumps, url_encode};

// Rayon global thread pool
fn cpu_pool() -> &'static ThreadPool {
static CPU_POOL: OnceLock<ThreadPool> = OnceLock::new();
CPU_POOL.get_or_init(|| ThreadPoolBuilder::new().build().unwrap())
}

// Tokio global one-thread runtime
fn runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
Expand Down Expand Up @@ -397,9 +404,19 @@ impl Client {
Ok((buf, cookies, encoding, headers, status_code, url))
};

// Execute an async future, releasing the Python GIL for concurrency.
// Use Tokio global runtime to block on the future.
let result = py.allow_threads(|| runtime().block_on(future));
// Execute an async future in Python, releasing the GIL for concurrency.
// Uses Rayon's global thread pool and Tokio global runtime to block on the future.
let (tx, rx) = mpsc::sync_channel(1);
py.allow_threads(|| {
cpu_pool().install(|| {
let result = runtime().block_on(future);
_ = tx.send(result);
});
});
let result = rx.recv().map_err(|e| {
PyErr::new::<exceptions::PyException, _>(format!("Error executing future: {}", e))
})?;

let (f_buf, f_cookies, f_encoding, f_headers, f_status_code, f_url) = match result {
Ok(value) => value,
Err(e) => return Err(e),
Expand Down

0 comments on commit cfacbf6

Please sign in to comment.