Skip to content

Commit

Permalink
Allow running on wasm32 targets using fetch() for http requests (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez authored Aug 27, 2024
1 parent baa023d commit bcade94
Show file tree
Hide file tree
Showing 11 changed files with 1,568 additions and 49 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
target
pkg
**/*.rs.bk

.idea
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions elasticsearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@ url = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "3"
tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }


#tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
void = "1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1.0"
default-features = false
features = ["macros", "net", "time", "rt-multi-thread"]

[dev-dependencies]
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["env"]}
failure = "0.1"
futures = "0.3"
http = "1"
axum = "0.7"
hyper = { version = "1", features = ["server", "http1"] }
#hyper = { version = "1", features = ["server", "http1"] }
os_type = "2"
regex="1"
#sysinfo = "0.31"
Expand Down
121 changes: 77 additions & 44 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
//! HTTP transport and connection components
#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))]
compile_error!("TLS features are not compatible with the wasm target");

#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
use crate::auth::ClientCertificate;
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
Expand Down Expand Up @@ -130,6 +133,8 @@ fn build_meta() -> String {
meta.push_str(",tls=n");
} else if cfg!(feature = "rustls-tls") {
meta.push_str(",tls=r");
} else if cfg!(target_arch = "wasm32") {
meta.push_str(",tls=w");
}

meta
Expand All @@ -138,15 +143,19 @@ fn build_meta() -> String {
/// Builds a HTTP transport to make API calls to Elasticsearch
pub struct TransportBuilder {
client_builder: reqwest::ClientBuilder,
conn_pool: Box<dyn ConnectionPool>,
conn_pool: Arc<dyn ConnectionPool>,
credentials: Option<Credentials>,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: Option<CertificateValidation>,
#[cfg(not(target_arch = "wasm32"))]
proxy: Option<Url>,
#[cfg(not(target_arch = "wasm32"))]
proxy_credentials: Option<Credentials>,
#[cfg(not(target_arch = "wasm32"))]
disable_proxy: bool,
headers: HeaderMap,
meta_header: bool,
#[cfg(not(target_arch = "wasm32"))]
timeout: Option<Duration>,
}

Expand All @@ -159,15 +168,19 @@ impl TransportBuilder {
{
Self {
client_builder: reqwest::ClientBuilder::new(),
conn_pool: Box::new(conn_pool),
conn_pool: Arc::new(conn_pool),
credentials: None,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: None,
#[cfg(not(target_arch = "wasm32"))]
proxy: None,
#[cfg(not(target_arch = "wasm32"))]
proxy_credentials: None,
#[cfg(not(target_arch = "wasm32"))]
disable_proxy: false,
headers: HeaderMap::new(),
meta_header: true,
#[cfg(not(target_arch = "wasm32"))]
timeout: None,
}
}
Expand All @@ -176,6 +189,7 @@ impl TransportBuilder {
///
/// An optional username and password will be used to set the
/// `Proxy-Authorization` header using Basic Authentication.
#[cfg(not(target_arch = "wasm32"))]
pub fn proxy(mut self, url: Url, username: Option<&str>, password: Option<&str>) -> Self {
self.proxy = Some(url);
if let Some(u) = username {
Expand All @@ -189,6 +203,7 @@ impl TransportBuilder {
/// Whether to disable proxies, including system proxies.
///
/// NOTE: System proxies are enabled by default.
#[cfg(not(target_arch = "wasm32"))]
pub fn disable_proxy(mut self) -> Self {
self.disable_proxy = true;
self
Expand Down Expand Up @@ -241,6 +256,7 @@ impl TransportBuilder {
///
/// The timeout is applied from when the request starts connecting until the response body has finished.
/// Default is no timeout.
#[cfg(not(target_arch = "wasm32"))]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
Expand All @@ -254,6 +270,7 @@ impl TransportBuilder {
client_builder = client_builder.default_headers(self.headers);
}

#[cfg(not(target_arch = "wasm32"))]
if let Some(t) = self.timeout {
client_builder = client_builder.timeout(t);
}
Expand Down Expand Up @@ -300,6 +317,7 @@ impl TransportBuilder {
}
}

#[cfg(not(target_arch = "wasm32"))]
if self.disable_proxy {
client_builder = client_builder.no_proxy();
} else if let Some(url) = self.proxy {
Expand All @@ -316,7 +334,7 @@ impl TransportBuilder {
let client = client_builder.build()?;
Ok(Transport {
client,
conn_pool: Arc::new(self.conn_pool),
conn_pool: self.conn_pool,
credentials: self.credentials,
send_meta: self.meta_header,
})
Expand Down Expand Up @@ -363,7 +381,7 @@ impl Connection {
pub struct Transport {
client: reqwest::Client,
credentials: Option<Credentials>,
conn_pool: Arc<Box<dyn ConnectionPool>>,
conn_pool: Arc<dyn ConnectionPool>,
send_meta: bool,
}

Expand Down Expand Up @@ -463,6 +481,7 @@ impl Transport {
headers: HeaderMap,
query_string: Option<&Q>,
body: Option<B>,
#[allow(unused_variables)]
timeout: Option<Duration>,
) -> Result<reqwest::RequestBuilder, Error>
where
Expand All @@ -473,6 +492,7 @@ impl Transport {
let url = connection.url.join(path.trim_start_matches('/'))?;
let mut request_builder = self.client.request(reqwest_method, url);

#[cfg(not(target_arch = "wasm32"))]
if let Some(t) = timeout {
request_builder = request_builder.timeout(t);
}
Expand Down Expand Up @@ -564,6 +584,47 @@ impl Transport {
)?)
}

async fn reseed(&self) {
// Requests will execute against old connection pool during reseed
let connection = self.conn_pool.next();

// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
HeaderMap::default(),
None::<&()>,
None::<()>,
None,
).unwrap();

let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|(_, node)| {
let address = node["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
node["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();

self.conn_pool.reseed(connections);
}

/// Creates an asynchronous request that can be awaited
pub async fn send<B, Q>(
&self,
Expand All @@ -578,47 +639,19 @@ impl Transport {
B: Body,
Q: Serialize + ?Sized,
{
// Requests will execute against old connection pool during reseed
if self.conn_pool.reseedable() {
let conn_pool = self.conn_pool.clone();
let connection = conn_pool.next();

// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
headers.clone(),
None::<&Q>,
None::<B>,
timeout,
)?;

tokio::spawn(async move {
let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|(_, node)| {
let address = node["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
node["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();
conn_pool.reseed(connections);
});
#[cfg(not(target_arch = "wasm32"))]
{
let transport = self.clone();
tokio::spawn(async move { transport.reseed().await });
}
#[cfg(target_arch = "wasm32")]
{
// Reseed synchronously (i.e. do not spawn a background task) in WASM.
// Running in the background is platform-dependent (web-sys / wasi), we'll
// address this if synchronous reseed is an issue.
self.reseed().await
}
}

let connection = self.conn_pool.next();
Expand Down
4 changes: 3 additions & 1 deletion elasticsearch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ mod readme {
extern crate dyn_clone;

pub mod auth;
pub mod cert;
pub mod http;
pub mod params;

#[cfg(not(target_arch = "wasm32"))]
pub mod cert;

// GENERATED-BEGIN:namespace-modules
// Generated code - do not edit until the next GENERATED-END marker

Expand Down
3 changes: 3 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This directory contains standalone examples that need their own independent build configuration.

Other examples can also be found in [`elasticsearch/examples`](../elasticsearch/examples/).
Loading

0 comments on commit bcade94

Please sign in to comment.