Skip to content

Commit

Permalink
Use rayon global thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
deedy5 committed Jul 26, 2024
1 parent d0121b6 commit e3e1b1d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ahash = "0.8"
indexmap = { version = "2", features = ["serde"] }
tokio = { version = "1", features = ["rt"] }
html2text = "0.12"
rayon = "1"

[profile.release]
codegen-units = 1
Expand Down
25 changes: 21 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::sync::{mpsc, Arc, OnceLock};
use std::time::Duration;

use ahash::RandomState;
use indexmap::IndexMap;
use pyo3::exceptions;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyString};
use rayon::{ThreadPool, ThreadPoolBuilder};
use reqwest_impersonate::header::{HeaderMap, HeaderName, HeaderValue, COOKIE};
use reqwest_impersonate::impersonate::Impersonate;
use reqwest_impersonate::multipart;
Expand All @@ -20,6 +21,12 @@ use response::Response;
mod utils;
use utils::{get_encoding_from_content, get_encoding_from_headers, json_dumps, url_encode};

// Rayon global thread pool
fn cpu_pool() -> &'static ThreadPool {
static CPU_POOL: OnceLock<ThreadPool> = OnceLock::new();
CPU_POOL.get_or_init(|| ThreadPoolBuilder::new().build().unwrap())
}

// Tokio global one-thread runtime
fn runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
Expand Down Expand Up @@ -397,9 +404,19 @@ impl Client {
Ok((buf, cookies, encoding, headers, status_code, url))
};

// Execute an async future, releasing the Python GIL for concurrency.
// Use Tokio global runtime to block on the future.
let result = py.allow_threads(|| runtime().block_on(future));
// Execute an async future in Python, releasing the GIL for concurrency.
// Uses Rayon's global thread pool and Tokio global runtime to block on the future.
let (tx, rx) = mpsc::sync_channel(1);
py.allow_threads(|| {
cpu_pool().install(|| {
let result = runtime().block_on(future);
_ = tx.send(result);
});
});
let result = rx.recv().map_err(|e| {
PyErr::new::<exceptions::PyException, _>(format!("Error executing future: {}", e))
})?;

let (f_buf, f_cookies, f_encoding, f_headers, f_status_code, f_url) = match result {
Ok(value) => value,
Err(e) => return Err(e),
Expand Down

0 comments on commit e3e1b1d

Please sign in to comment.