Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make http client pooling #169

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
613 changes: 277 additions & 336 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ reqwest = { version = "^0.11", features = ["json", "blocking"] }
isahc = "1.7.2"
async-recursion = "1.0.4"
regex = "1.10.2"
once_cell = "1.19.0"

[dev-dependencies]
fake = { version = "2.4", features = ["uuid", "chrono"] }
Expand Down
2 changes: 1 addition & 1 deletion src/tigergraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct IdAllocationResult {
}

pub async fn id_allocation(payload: &IdAllocation) -> Result<IdAllocationResult, Error> {
let http_client = make_client();
let http_client = make_client().await.unwrap();
let id_allocation_url = format!("{}:{}", C.tdb.host.trim_end_matches(":9000"), "9002");
let uri: http::Uri = format!("{}/id_allocation/allocation", id_allocation_url,)
.parse()
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let mut page = 1;

let mut next_targets: TargetProcessedList = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ pub struct Metadata {
}

async fn get_clusters_by_address(address: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_name?address={}",
C.upstream.clusters_api.url.clone(),
Expand Down Expand Up @@ -535,7 +535,7 @@ async fn get_clusters_by_address(address: &str) -> Result<Vec<Metadata>, Error>
}

async fn get_address_by_clusters(name: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_address?name={}",
C.upstream.clusters_api.url.clone(),
Expand Down
14 changes: 7 additions & 7 deletions src/upstream/dotbit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn query_by_handle(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -268,7 +268,7 @@ async fn query_by_wallet(platform: &Platform, address: &str) -> Result<Vec<Accou
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -315,7 +315,7 @@ async fn query_reverse_record(platform: &Platform, identity: &str) -> Result<Acc
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -617,7 +617,7 @@ async fn fetch_and_save_account_info(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -741,7 +741,7 @@ async fn fetch_reverse_record(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -827,7 +827,7 @@ async fn fetch_account_list_by_addrs(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -1204,7 +1204,7 @@ async fn search_account_detail(name: &str) -> Result<Option<AccountDetail>, Erro
};
let json_params = serde_json::to_string(&params).map_err(|err| Error::JSONParseError(err))?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.register_api.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/ens_reverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Fetcher for ENSReverseLookup {
}

pub async fn fetch_record(wallet: &str) -> Result<Response, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let url: http::Uri = format!("{}{}", C.upstream.ens_reverse.url, wallet)
.parse()
.map_err(|err: http::uri::InvalidUri| {
Expand Down
6 changes: 3 additions & 3 deletions src/upstream/farcaster/warpcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ pub struct Verification {
}

async fn user_by_username(username: &str) -> Result<Option<User>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/user-by-username?username={}",
C.upstream.warpcast_api.url, username
Expand Down Expand Up @@ -523,7 +523,7 @@ async fn user_by_verification(address: &str) -> Result<Option<User>, Error> {
return Ok(None);
}

let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/user-by-verification?address={}",
C.upstream.warpcast_api.url, address
Expand Down Expand Up @@ -587,7 +587,7 @@ async fn user_by_verification(address: &str) -> Result<Option<User>, Error> {
}

async fn get_verifications(fid: i64) -> Result<Option<Vec<Verification>>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/verifications?fid={}",
C.upstream.warpcast_api.url, fid
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/firefly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn search_records(
platform: &Platform,
identity: &str,
) -> Result<Vec<AggregationRecord>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/aggregation/search?platform={}&identity={}",
C.upstream.aggregation_service.url.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/genome/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ async fn fetch_address_by_domain(
}

async fn get_name(address: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_name?tld=gno&address={}",
C.upstream.genome_api.url.clone(),
Expand Down Expand Up @@ -622,7 +622,7 @@ async fn get_name(address: &str) -> Result<Vec<Metadata>, Error> {
}

async fn get_address(domain: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_address?tld=gno&domain={}",
C.upstream.genome_api.url.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/keybase/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = match format!(
"{}?{}={}&fields=proofs_summary",
C.upstream.keybase_service.url, platform, identity
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/opensea/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn search_opensea_account(
platform: &Platform,
identity: &str,
) -> Result<Vec<SnsRecord>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/aggregation/opensea_account?platform={}&identity={}",
C.upstream.aggregation_service.url.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/proof_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn batch_fetch_connections(
platform: &Platform,
identity: &str,
) -> Result<(TargetProcessedList, EdgeList), Error> {
let client = make_client();
let client = make_client().await.unwrap();

let uri: http::Uri = format!(
"{}/v1/proof?exact=true&platform={}&identity={}",
Expand Down Expand Up @@ -259,7 +259,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();

let uri: http::Uri = format!(
"{}/v1/proof?exact=true&platform={}&identity={}",
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/rss3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Fetcher for Rss3 {
}

async fn batch_fetch_nfts(target: &Target) -> Result<(TargetProcessedList, EdgeList), Error> {
let client = make_client();
let client = make_client().await.unwrap();
let address = target.identity()?.to_lowercase();
let mut current_cursor = String::from("");

Expand Down
4 changes: 2 additions & 2 deletions src/upstream/space_id/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async fn fetch_address_by_domain(

/// Resolve Names: https://docs.space.id/developer-guide/web3-name-sdk/sid-api#resolve-names
async fn get_address(domain: &str) -> Result<String, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v1/getAddress?tld=bnb&domain={}",
C.upstream.spaceid_api.url.clone(),
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn get_address(domain: &str) -> Result<String, Error> {

/// Reverse Resolve Names: https://docs.space.id/developer-guide/web3-name-sdk/sid-api#reverse-resolve-names
async fn get_name(address: &str) -> Result<Option<String>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v1/getName?tld=bnb&address={}",
C.upstream.spaceid_api.url.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/sybil_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn save_item(

/// Trigger a refetch from github.
pub async fn prefetch() -> Result<(), Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = (C.upstream.sybil_service.url).parse().unwrap();

let req = hyper::Request::builder()
Expand Down
12 changes: 6 additions & 6 deletions src/upstream/unstoppable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn fetch_domain_by_owner(
owners: &str,
next: Option<String>,
) -> Result<GetDomainByOwnerResp, Error> {
let client = make_client();
let client = make_client().await.unwrap();
// curl --request GET "https://api.unstoppabledomains.com/resolve/owners/0x50b6a9ba0b1ca77ce67c22b30afc0a5bbbdb5a18/domains"
let uri: http::Uri = if next.is_none() {
format!(
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn fetch_domain_by_owner(
}

async fn fetch_owner_by_domain(domains: &str) -> Result<DomainResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/resolve/domains/{}",
C.upstream.unstoppable_api.url, domains
Expand Down Expand Up @@ -491,7 +491,7 @@ async fn fetch_owner_by_domain(domains: &str) -> Result<DomainResponse, Error> {
/// Do not use `fetch_domain` query
#[allow(dead_code)]
async fn fetch_domain(owners: &str, page: &str) -> Result<RecordsForOwnerResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = if page.is_empty() {
format!(
"{}/domains?owners={}",
Expand Down Expand Up @@ -551,7 +551,7 @@ async fn fetch_domain(owners: &str, page: &str) -> Result<RecordsForOwnerRespons
}

async fn fetch_reverse(owner: &str) -> Result<ReverseResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
// https://api.unstoppabledomains.com/resolve/reverse/{owner}
let reverse_uri: http::Uri = format!(
"{}/resolve/reverse/{}",
Expand Down Expand Up @@ -715,7 +715,7 @@ async fn fetch_domains_by_account(
}

async fn fetch_owner(domains: &str) -> Result<DomainResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!("{}/domains/{}", C.upstream.unstoppable_api.url, domains)
.parse()
.map_err(|_err: InvalidUri| Error::ParamError(format!("Uri format Error {}", _err)))?;
Expand Down Expand Up @@ -1040,7 +1040,7 @@ struct DomainInfo {

// https://api.unstoppabledomains.com/api/domain/search/internal?q=0xbillys
async fn domain_search(name: &str) -> Result<Vec<Exact>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let encoded_name = urlencoding::encode(name);
let uri: http::Uri = format!(
"{}/api/domain/search/internal?q={}",
Expand Down
47 changes: 38 additions & 9 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
#[cfg(test)]
mod tests;

use std::{collections::HashSet, hash::Hash};
use std::{collections::HashSet, hash::Hash, sync::Arc};

use crate::error::Error;
use chrono::{DateTime, NaiveDateTime};
use http::Response;
use deadpool::managed::{Manager, Object, Pool, RecycleResult};
use http::{Response, StatusCode};
use hyper::{body::HttpBody as _, client::HttpConnector, Body, Client, Request};
use hyper_tls::HttpsConnector;
use once_cell::sync::Lazy;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

static CLIENT_POOL: Lazy<Arc<ClientPool>> = Lazy::new(|| {
let manager = ClientManager{};
let pool = Pool::builder(manager)
.build()
.unwrap();
Arc::new(pool)
});

/// Returns current UNIX timestamp (unit: second).
pub fn timestamp() -> i64 {
naive_now().and_utc().timestamp()
Expand Down Expand Up @@ -58,13 +68,32 @@ pub fn utc_to_naive(s: String) -> Result<NaiveDateTime, Error> {
Ok(dt.naive_utc())
}

pub fn make_client() -> Client<HttpsConnector<HttpConnector>> {
let https = HttpsConnector::new();
// let mut http = HttpConnector::new();
// http.set_connect_timeout(Some(std::time::Duration::from_secs(5)));
// let https = HttpsConnector::new_with_connector(http);
pub struct ClientManager;

#[async_trait::async_trait]
impl Manager for ClientManager {
type Type = Client<HttpsConnector<HttpConnector>>;
type Error = Error;

// make_client() implementation moved here
async fn create(&self) -> Result<Client<HttpsConnector<HttpConnector>>, Error> {
let https = HttpsConnector::new();
Ok(Client::builder().build::<_, hyper::Body>(https))
}

async fn recycle(&self, _: &mut Client<HttpsConnector<HttpConnector>>) -> RecycleResult<Error> {
Ok(())
}
}

type ClientPool = Pool<ClientManager>;

Client::builder().build::<_, hyper::Body>(https)
pub async fn make_client() -> Result<Object<ClientManager>, Error> {
let pool = CLIENT_POOL.clone();
let client = pool.get().await.map_err(|_| {
Error::General("Failed to acquire client from pool".to_string(), StatusCode::INTERNAL_SERVER_ERROR)
})?;
Ok(client)
}

pub fn make_http_client() -> Client<HttpConnector> {
Expand All @@ -76,7 +105,7 @@ pub fn make_http_client() -> Client<HttpConnector> {

/// If timeout is None, default timeout is 5 seconds.
pub async fn request_with_timeout(
client: &Client<HttpsConnector<HttpConnector>>,
client: &Object<ClientManager>,
req: Request<Body>,
timeout: Option<std::time::Duration>,
) -> Result<Response<Body>, Error> {
Expand Down