Skip to content

Commit

Permalink
feat!: make http client pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
hmqgg committed Sep 26, 2024
1 parent 182432f commit c8b5371
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 377 deletions.
613 changes: 277 additions & 336 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ reqwest = { version = "^0.11", features = ["json", "blocking"] }
isahc = "1.7.2"
async-recursion = "1.0.4"
regex = "1.10.2"
once_cell = "1.19.0"

[dev-dependencies]
fake = { version = "2.4", features = ["uuid", "chrono"] }
Expand Down
2 changes: 1 addition & 1 deletion src/tigergraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct IdAllocationResult {
}

pub async fn id_allocation(payload: &IdAllocation) -> Result<IdAllocationResult, Error> {
let http_client = make_client();
let http_client = make_client().await.unwrap();
let id_allocation_url = format!("{}:{}", C.tdb.host.trim_end_matches(":9000"), "9002");
let uri: http::Uri = format!("{}/id_allocation/allocation", id_allocation_url,)
.parse()
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let mut page = 1;

let mut next_targets: TargetProcessedList = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ pub struct Metadata {
}

async fn get_clusters_by_address(address: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_name?address={}",
C.upstream.clusters_api.url.clone(),
Expand Down Expand Up @@ -535,7 +535,7 @@ async fn get_clusters_by_address(address: &str) -> Result<Vec<Metadata>, Error>
}

async fn get_address_by_clusters(name: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_address?name={}",
C.upstream.clusters_api.url.clone(),
Expand Down
14 changes: 7 additions & 7 deletions src/upstream/dotbit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn query_by_handle(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -268,7 +268,7 @@ async fn query_by_wallet(platform: &Platform, address: &str) -> Result<Vec<Accou
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -315,7 +315,7 @@ async fn query_reverse_record(platform: &Platform, identity: &str) -> Result<Acc
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -617,7 +617,7 @@ async fn fetch_and_save_account_info(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -741,7 +741,7 @@ async fn fetch_reverse_record(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -827,7 +827,7 @@ async fn fetch_account_list_by_addrs(
};
let json_params = serde_json::to_vec(&params)?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.url.clone())
Expand Down Expand Up @@ -1204,7 +1204,7 @@ async fn search_account_detail(name: &str) -> Result<Option<AccountDetail>, Erro
};
let json_params = serde_json::to_string(&params).map_err(|err| Error::JSONParseError(err))?;

let client = make_client();
let client = make_client().await.unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(C.upstream.dotbit_service.register_api.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/ens_reverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Fetcher for ENSReverseLookup {
}

pub async fn fetch_record(wallet: &str) -> Result<Response, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let url: http::Uri = format!("{}{}", C.upstream.ens_reverse.url, wallet)
.parse()
.map_err(|err: http::uri::InvalidUri| {
Expand Down
6 changes: 3 additions & 3 deletions src/upstream/farcaster/warpcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ pub struct Verification {
}

async fn user_by_username(username: &str) -> Result<Option<User>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/user-by-username?username={}",
C.upstream.warpcast_api.url, username
Expand Down Expand Up @@ -523,7 +523,7 @@ async fn user_by_verification(address: &str) -> Result<Option<User>, Error> {
return Ok(None);
}

let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/user-by-verification?address={}",
C.upstream.warpcast_api.url, address
Expand Down Expand Up @@ -587,7 +587,7 @@ async fn user_by_verification(address: &str) -> Result<Option<User>, Error> {
}

async fn get_verifications(fid: i64) -> Result<Option<Vec<Verification>>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v2/verifications?fid={}",
C.upstream.warpcast_api.url, fid
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/firefly/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn search_records(
platform: &Platform,
identity: &str,
) -> Result<Vec<AggregationRecord>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/aggregation/search?platform={}&identity={}",
C.upstream.aggregation_service.url.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/genome/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ async fn fetch_address_by_domain(
}

async fn get_name(address: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_name?tld=gno&address={}",
C.upstream.genome_api.url.clone(),
Expand Down Expand Up @@ -622,7 +622,7 @@ async fn get_name(address: &str) -> Result<Vec<Metadata>, Error> {
}

async fn get_address(domain: &str) -> Result<Vec<Metadata>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/get_address?tld=gno&domain={}",
C.upstream.genome_api.url.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/keybase/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = match format!(
"{}?{}={}&fields=proofs_summary",
C.upstream.keybase_service.url, platform, identity
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/opensea/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn search_opensea_account(
platform: &Platform,
identity: &str,
) -> Result<Vec<SnsRecord>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/aggregation/opensea_account?platform={}&identity={}",
C.upstream.aggregation_service.url.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/upstream/proof_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn batch_fetch_connections(
platform: &Platform,
identity: &str,
) -> Result<(TargetProcessedList, EdgeList), Error> {
let client = make_client();
let client = make_client().await.unwrap();

let uri: http::Uri = format!(
"{}/v1/proof?exact=true&platform={}&identity={}",
Expand Down Expand Up @@ -259,7 +259,7 @@ async fn fetch_connections_by_platform_identity(
platform: &Platform,
identity: &str,
) -> Result<TargetProcessedList, Error> {
let client = make_client();
let client = make_client().await.unwrap();

let uri: http::Uri = format!(
"{}/v1/proof?exact=true&platform={}&identity={}",
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/rss3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Fetcher for Rss3 {
}

async fn batch_fetch_nfts(target: &Target) -> Result<(TargetProcessedList, EdgeList), Error> {
let client = make_client();
let client = make_client().await.unwrap();
let address = target.identity()?.to_lowercase();
let mut current_cursor = String::from("");

Expand Down
4 changes: 2 additions & 2 deletions src/upstream/space_id/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async fn fetch_address_by_domain(

/// Resolve Names: https://docs.space.id/developer-guide/web3-name-sdk/sid-api#resolve-names
async fn get_address(domain: &str) -> Result<String, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v1/getAddress?tld=bnb&domain={}",
C.upstream.spaceid_api.url.clone(),
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn get_address(domain: &str) -> Result<String, Error> {

/// Reverse Resolve Names: https://docs.space.id/developer-guide/web3-name-sdk/sid-api#reverse-resolve-names
async fn get_name(address: &str) -> Result<Option<String>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/v1/getName?tld=bnb&address={}",
C.upstream.spaceid_api.url.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/upstream/sybil_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn save_item(

/// Trigger a refetch from github.
pub async fn prefetch() -> Result<(), Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = (C.upstream.sybil_service.url).parse().unwrap();

let req = hyper::Request::builder()
Expand Down
12 changes: 6 additions & 6 deletions src/upstream/unstoppable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn fetch_domain_by_owner(
owners: &str,
next: Option<String>,
) -> Result<GetDomainByOwnerResp, Error> {
let client = make_client();
let client = make_client().await.unwrap();
// curl --request GET "https://api.unstoppabledomains.com/resolve/owners/0x50b6a9ba0b1ca77ce67c22b30afc0a5bbbdb5a18/domains"
let uri: http::Uri = if next.is_none() {
format!(
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn fetch_domain_by_owner(
}

async fn fetch_owner_by_domain(domains: &str) -> Result<DomainResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!(
"{}/resolve/domains/{}",
C.upstream.unstoppable_api.url, domains
Expand Down Expand Up @@ -491,7 +491,7 @@ async fn fetch_owner_by_domain(domains: &str) -> Result<DomainResponse, Error> {
/// Do not use `fetch_domain` query
#[allow(dead_code)]
async fn fetch_domain(owners: &str, page: &str) -> Result<RecordsForOwnerResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = if page.is_empty() {
format!(
"{}/domains?owners={}",
Expand Down Expand Up @@ -551,7 +551,7 @@ async fn fetch_domain(owners: &str, page: &str) -> Result<RecordsForOwnerRespons
}

async fn fetch_reverse(owner: &str) -> Result<ReverseResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
// https://api.unstoppabledomains.com/resolve/reverse/{owner}
let reverse_uri: http::Uri = format!(
"{}/resolve/reverse/{}",
Expand Down Expand Up @@ -715,7 +715,7 @@ async fn fetch_domains_by_account(
}

async fn fetch_owner(domains: &str) -> Result<DomainResponse, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let uri: http::Uri = format!("{}/domains/{}", C.upstream.unstoppable_api.url, domains)
.parse()
.map_err(|_err: InvalidUri| Error::ParamError(format!("Uri format Error {}", _err)))?;
Expand Down Expand Up @@ -1040,7 +1040,7 @@ struct DomainInfo {

// https://api.unstoppabledomains.com/api/domain/search/internal?q=0xbillys
async fn domain_search(name: &str) -> Result<Vec<Exact>, Error> {
let client = make_client();
let client = make_client().await.unwrap();
let encoded_name = urlencoding::encode(name);
let uri: http::Uri = format!(
"{}/api/domain/search/internal?q={}",
Expand Down
47 changes: 38 additions & 9 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
#[cfg(test)]
mod tests;

use std::{collections::HashSet, hash::Hash};
use std::{collections::HashSet, hash::Hash, sync::Arc};

use crate::error::Error;
use chrono::{DateTime, NaiveDateTime};
use http::Response;
use deadpool::managed::{Manager, Object, Pool, RecycleResult};
use http::{Response, StatusCode};
use hyper::{body::HttpBody as _, client::HttpConnector, Body, Client, Request};
use hyper_tls::HttpsConnector;
use once_cell::sync::Lazy;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

static CLIENT_POOL: Lazy<Arc<ClientPool>> = Lazy::new(|| {
let manager = ClientManager{};
let pool = Pool::builder(manager)
.build()
.unwrap();
Arc::new(pool)
});

/// Returns current UNIX timestamp (unit: second).
pub fn timestamp() -> i64 {
naive_now().and_utc().timestamp()
Expand Down Expand Up @@ -58,13 +68,32 @@ pub fn utc_to_naive(s: String) -> Result<NaiveDateTime, Error> {
Ok(dt.naive_utc())
}

pub fn make_client() -> Client<HttpsConnector<HttpConnector>> {
let https = HttpsConnector::new();
// let mut http = HttpConnector::new();
// http.set_connect_timeout(Some(std::time::Duration::from_secs(5)));
// let https = HttpsConnector::new_with_connector(http);
pub struct ClientManager;

#[async_trait::async_trait]
impl Manager for ClientManager {
type Type = Client<HttpsConnector<HttpConnector>>;
type Error = Error;

// make_client() implementation moved here
async fn create(&self) -> Result<Client<HttpsConnector<HttpConnector>>, Error> {
let https = HttpsConnector::new();
Ok(Client::builder().build::<_, hyper::Body>(https))
}

async fn recycle(&self, _: &mut Client<HttpsConnector<HttpConnector>>) -> RecycleResult<Error> {
Ok(())
}
}

type ClientPool = Pool<ClientManager>;

Client::builder().build::<_, hyper::Body>(https)
pub async fn make_client() -> Result<Object<ClientManager>, Error> {
let pool = CLIENT_POOL.clone();
let client = pool.get().await.map_err(|_| {
Error::General("Failed to acquire client from pool".to_string(), StatusCode::INTERNAL_SERVER_ERROR)
})?;
Ok(client)
}

pub fn make_http_client() -> Client<HttpConnector> {
Expand All @@ -76,7 +105,7 @@ pub fn make_http_client() -> Client<HttpConnector> {

/// If timeout is None, default timeout is 5 seconds.
pub async fn request_with_timeout(
client: &Client<HttpsConnector<HttpConnector>>,
client: &Object<ClientManager>,
req: Request<Body>,
timeout: Option<std::time::Duration>,
) -> Result<Response<Body>, Error> {
Expand Down

0 comments on commit c8b5371

Please sign in to comment.