diff --git a/Cargo.lock b/Cargo.lock index d6e9368c3..c981da2ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1128,6 +1128,7 @@ dependencies = [ "ip_network", "ip_network_table", "once_cell", + "smol_str", "zip", ] @@ -1240,6 +1241,24 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "g3-ip-locate" +version = "0.1.0" +dependencies = [ + "ahash", + "anyhow", + "g3-geoip", + "g3-msgpack", + "g3-socket", + "g3-types", + "ip_network", + "ip_network_table", + "log", + "rmpv", + "tokio", + "tokio-util", +] + [[package]] name = "g3-journal" version = "0.2.0" @@ -1284,7 +1303,9 @@ dependencies = [ "anyhow", "atoi", "chrono", + "g3-geoip", "g3-types", + "ip_network", "rmpv", "rustls", "rustls-pemfile", diff --git a/Cargo.toml b/Cargo.toml index 246ae9cda..dbf86b859 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "lib/g3-tls-cert", "lib/g3-slog-types", "lib/g3-geoip", + "lib/g3-ip-locate", "lib/g3-openssl", "g3bench", "g3fcgen", @@ -219,6 +220,7 @@ g3-stdlog = { version = "0.1", path = "lib/g3-stdlog" } g3-syslog = { version = "0.6", path = "lib/g3-syslog" } g3-slog-types = { version = "0.1", path = "lib/g3-slog-types" } g3-geoip = { version = "0.1", path = "lib/g3-geoip" } +g3-ip-locate = { version = "0.1", path = "lib/g3-ip-locate" } g3-tls-cert = { version = "0.5", path = "lib/g3-tls-cert" } g3-types = { version = "0.4", path = "lib/g3-types" } g3-xcrypt = { version = "0.1", path = "lib/g3-xcrypt" } diff --git a/lib/g3-geoip/Cargo.toml b/lib/g3-geoip/Cargo.toml index f5433bfc1..6c512f49d 100644 --- a/lib/g3-geoip/Cargo.toml +++ b/lib/g3-geoip/Cargo.toml @@ -12,6 +12,7 @@ arc-swap.workspace = true once_cell.workspace = true ip_network.workspace = true ip_network_table.workspace = true +smol_str.workspace = true csv = "1.2" flate2.workspace = true zip = { workspace = true, features = ["deflate"] } diff --git a/lib/g3-geoip/src/lib.rs b/lib/g3-geoip/src/lib.rs index 7f48714c6..6e63a370b 100644 --- a/lib/g3-geoip/src/lib.rs +++ b/lib/g3-geoip/src/lib.rs @@ -14,14 +14,15 @@ * limitations under the License. */ -extern crate core; - mod continent; pub use continent::{Continent, ContinentCode}; mod country; pub use country::IsoCountryCode; +mod location; +pub use location::{IpLocation, IpLocationBuilder}; + mod record; pub use record::{GeoIpAsnRecord, GeoIpCountryRecord}; diff --git a/lib/g3-geoip/src/location.rs b/lib/g3-geoip/src/location.rs new file mode 100644 index 000000000..91107ab0a --- /dev/null +++ b/lib/g3-geoip/src/location.rs @@ -0,0 +1,116 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use anyhow::anyhow; +use ip_network::IpNetwork; +use smol_str::SmolStr; + +use super::{ContinentCode, IsoCountryCode}; + +#[derive(Default)] +pub struct IpLocationBuilder { + net: Option, + country: Option, + continent: Option, + as_number: Option, + isp_name: Option, + isp_domain: Option, +} + +impl IpLocationBuilder { + pub fn set_network(&mut self, net: IpNetwork) { + self.net = Some(net); + } + + pub fn set_country(&mut self, country: IsoCountryCode) { + self.country = Some(country); + } + + pub fn set_continent(&mut self, continent: ContinentCode) { + self.continent = Some(continent); + } + + pub fn set_as_number(&mut self, number: u32) { + self.as_number = Some(number); + } + + pub fn set_isp_name(&mut self, name: String) { + self.isp_name = Some(name.into()); + } + + pub fn set_isp_domain(&mut self, domain: String) { + self.isp_domain = Some(domain.into()); + } + + pub fn build(mut self) -> anyhow::Result { + let net = self + .net + .take() + .ok_or(anyhow!("network address is not set"))?; + let continent = self + .continent + .or_else(|| self.country.map(|c| c.continent())); + Ok(IpLocation { + net, + country: self.country, + continent, + as_number: self.as_number, + isp_name: self.isp_name, + isp_domain: self.isp_domain, + }) + } +} + +pub struct IpLocation { + net: IpNetwork, + country: Option, + continent: Option, + as_number: Option, + isp_name: Option, + isp_domain: Option, +} + +impl IpLocation { + #[inline] + pub fn network_addr(&self) -> IpNetwork { + self.net + } + + #[inline] + pub fn country(&self) -> Option { + self.country + } + + #[inline] + pub fn continent(&self) -> Option { + self.continent + } + + #[inline] + pub fn network_asn(&self) -> Option { + self.as_number + } + + #[inline] + pub fn isp_name(&self) -> Option<&str> { + self.isp_name.as_deref() + } + + #[inline] + pub fn isp_domain(&self) -> Option<&str> { + self.isp_domain.as_deref() + } +} diff --git a/lib/g3-ip-locate/Cargo.toml b/lib/g3-ip-locate/Cargo.toml new file mode 100644 index 000000000..2e9d767d8 --- /dev/null +++ b/lib/g3-ip-locate/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "g3-ip-locate" +version = "0.1.0" +license.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +log.workspace = true +ip_network.workspace = true +ip_network_table.workspace = true +tokio = { workspace = true, features = ["sync", "net", "rt"] } +tokio-util = { workspace = true, features = ["time"] } +ahash.workspace = true +rmpv.workspace = true +g3-types.workspace = true +g3-geoip.workspace = true +g3-msgpack = { workspace = true, features = ["geoip"] } +g3-socket.workspace = true diff --git a/lib/g3-ip-locate/src/cache.rs b/lib/g3-ip-locate/src/cache.rs new file mode 100644 index 000000000..14e94c9ea --- /dev/null +++ b/lib/g3-ip-locate/src/cache.rs @@ -0,0 +1,140 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::hash_map; +use std::future::Future; +use std::io; +use std::net::IpAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use ahash::AHashMap; +use ip_network_table::IpNetworkTable; +use tokio::sync::mpsc; +use tokio::time::Instant; + +use g3_geoip::IpLocation; + +use super::{CacheQueryRequest, IpLocationCacheResponse, IpLocationServiceConfig}; + +pub(crate) struct IpLocationCacheRuntime { + request_batch_handle_count: usize, + cache: IpNetworkTable>, + doing: AHashMap>, + req_receiver: mpsc::UnboundedReceiver, + rsp_receiver: mpsc::UnboundedReceiver<(Option, IpLocationCacheResponse)>, + query_sender: mpsc::UnboundedSender, + expire_at: Instant, +} + +impl IpLocationCacheRuntime { + pub(crate) fn new( + config: &IpLocationServiceConfig, + req_receiver: mpsc::UnboundedReceiver, + rsp_receiver: mpsc::UnboundedReceiver<(Option, IpLocationCacheResponse)>, + query_sender: mpsc::UnboundedSender, + ) -> Self { + IpLocationCacheRuntime { + request_batch_handle_count: config.cache_request_batch_count, + cache: IpNetworkTable::new(), + doing: AHashMap::new(), + req_receiver, + rsp_receiver, + query_sender, + expire_at: Instant::now(), + } + } + + fn handle_rsp(&mut self, ip: Option, mut rsp: IpLocationCacheResponse) { + self.expire_at = rsp.expire_at; + if let Some(location) = rsp.value.take() { + let net = location.network_addr(); + let location = Arc::new(location); + + if let Some(ip) = ip { + if let Some(vec) = self.doing.remove(&ip) { + for req in vec.into_iter() { + let _ = req.notifier.send(location.clone()); + } + } + } + + // also allow push if no doing ip found + self.cache.insert(net, location); + } + } + + fn send_req(&mut self, ip: IpAddr) { + if self.query_sender.send(ip).is_err() { + // the query runtime should not close before the cache runtime + unreachable!() + } + } + + fn handle_req(&mut self, req: CacheQueryRequest) { + if let Some((_net, v)) = self.cache.longest_match(req.ip) { + let _ = req.notifier.send(v.clone()); + if self.expire_at.elapsed() != Duration::ZERO { + // trigger query again if expired + self.send_req(req.ip); + } + return; + } + + match self.doing.entry(req.ip) { + hash_map::Entry::Occupied(mut o) => { + o.get_mut().push(req); + } + hash_map::Entry::Vacant(v) => { + let ip = req.ip; + v.insert(vec![req]); + self.send_req(ip); + } + } + } + + fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + // handle rsp + loop { + match self.rsp_receiver.poll_recv(cx) { + Poll::Pending => break, + Poll::Ready(None) => unreachable!(), // unreachable as we have kept a sender + Poll::Ready(Some((ip, rsp))) => self.handle_rsp(ip, rsp), + } + } + + // handle req + for _ in 1..self.request_batch_handle_count { + match self.req_receiver.poll_recv(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Ready(Some(req)) => self.handle_req(req), + } + } + } + } +} + +impl Future for IpLocationCacheRuntime { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (*self).poll_loop(cx) + } +} diff --git a/lib/g3-ip-locate/src/config.rs b/lib/g3-ip-locate/src/config.rs new file mode 100644 index 000000000..aafc8af6c --- /dev/null +++ b/lib/g3-ip-locate/src/config.rs @@ -0,0 +1,109 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; + +use anyhow::anyhow; +use tokio::net::UdpSocket; + +use g3_types::net::SocketBufferConfig; + +use super::{IpLocationQueryRuntime, IpLocationServiceHandle}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct IpLocationServiceConfig { + pub(crate) cache_request_batch_count: usize, + pub(crate) cache_request_timeout: Duration, + pub(crate) query_peer_addr: SocketAddr, + pub(crate) query_socket_buffer: SocketBufferConfig, + pub(crate) query_wait_timeout: Duration, + pub(crate) default_expire_ttl: u32, + pub(crate) maximum_expire_ttl: u32, +} + +impl Default for IpLocationServiceConfig { + fn default() -> Self { + IpLocationServiceConfig { + cache_request_batch_count: 10, + cache_request_timeout: Duration::from_millis(800), + query_peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 2888), + query_socket_buffer: SocketBufferConfig::default(), + query_wait_timeout: Duration::from_millis(400), + default_expire_ttl: 10, + maximum_expire_ttl: 300, + } + } +} + +impl IpLocationServiceConfig { + pub fn set_cache_request_batch_count(&mut self, count: usize) { + self.cache_request_batch_count = count; + } + + pub fn set_cache_request_timeout(&mut self, time: Duration) { + self.cache_request_timeout = time; + } + + pub fn set_query_peer_addr(&mut self, addr: SocketAddr) { + self.query_peer_addr = addr; + } + + pub fn set_query_socket_buffer(&mut self, config: SocketBufferConfig) { + self.query_socket_buffer = config; + } + + pub fn set_query_wait_timeout(&mut self, time: Duration) { + self.query_wait_timeout = time; + } + + pub fn set_default_expire_ttl(&mut self, ttl: u32) { + self.default_expire_ttl = ttl; + } + + pub fn set_maximum_expire_ttl(&mut self, ttl: u32) { + self.maximum_expire_ttl = ttl; + } + + pub fn spawn_cert_agent(&self) -> anyhow::Result { + use anyhow::Context; + + let (socket, _addr) = g3_socket::udp::new_std_bind_connect( + None, + self.query_socket_buffer, + Default::default(), + ) + .context("failed to setup udp socket")?; + socket.connect(self.query_peer_addr).map_err(|e| { + anyhow!( + "failed to connect to peer address {}: {e:?}", + self.query_peer_addr + ) + })?; + let socket = UdpSocket::from_std(socket).context("failed to setup udp socket")?; + + let (cache_runtime, cache_handle, query_handle) = super::spawn_ip_location_cache(self); + let query_runtime = IpLocationQueryRuntime::new(self, socket, query_handle); + + tokio::spawn(query_runtime); + tokio::spawn(cache_runtime); + + Ok(IpLocationServiceHandle::new( + cache_handle, + self.cache_request_timeout, + )) + } +} diff --git a/lib/g3-ip-locate/src/handle.rs b/lib/g3-ip-locate/src/handle.rs new file mode 100644 index 000000000..1af21d05b --- /dev/null +++ b/lib/g3-ip-locate/src/handle.rs @@ -0,0 +1,132 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::hash_map; +use std::net::IpAddr; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use ahash::AHashMap; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::time::{delay_queue, DelayQueue}; + +use g3_geoip::IpLocation; + +use super::{CacheQueryRequest, IpLocationCacheResponse}; + +pub struct IpLocationServiceHandle { + cache_handle: IpLocationCacheHandle, + request_timeout: Duration, +} + +impl IpLocationServiceHandle { + pub(crate) fn new(cache_handle: IpLocationCacheHandle, request_timeout: Duration) -> Self { + IpLocationServiceHandle { + cache_handle, + request_timeout, + } + } + + pub async fn fetch(&self, ip: IpAddr) -> Option> { + self.cache_handle.fetch(ip, self.request_timeout).await + } +} + +pub(crate) struct IpLocationCacheHandle { + req_sender: mpsc::UnboundedSender, +} + +impl IpLocationCacheHandle { + pub(crate) fn new(req_sender: mpsc::UnboundedSender) -> Self { + IpLocationCacheHandle { req_sender } + } + + async fn fetch(&self, ip: IpAddr, timeout: Duration) -> Option> { + let (rsp_sender, rsp_receiver) = oneshot::channel(); + let req = CacheQueryRequest { + ip, + notifier: rsp_sender, + }; + self.req_sender.send(req).ok()?; + + match tokio::time::timeout(timeout, rsp_receiver).await { + Ok(Ok(r)) => Some(r), + Ok(Err(_)) => None, // recv error + Err(_) => None, // timeout + } + } +} + +pub(super) struct IpLocationQueryHandle { + req_receiver: mpsc::UnboundedReceiver, + rsp_sender: mpsc::UnboundedSender<(Option, IpLocationCacheResponse)>, + doing_cache: AHashMap, + doing_timeout_queue: DelayQueue, +} + +impl IpLocationQueryHandle { + pub(super) fn new( + req_receiver: mpsc::UnboundedReceiver, + rsp_sender: mpsc::UnboundedSender<(Option, IpLocationCacheResponse)>, + ) -> Self { + IpLocationQueryHandle { + req_receiver, + rsp_sender, + doing_cache: AHashMap::new(), + doing_timeout_queue: DelayQueue::new(), + } + } + + pub(super) fn poll_recv_req(&mut self, cx: &mut Context<'_>) -> Poll> { + self.req_receiver.poll_recv(cx) + } + + pub(super) fn poll_query_expired(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.doing_timeout_queue.poll_expired(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(e)) => Poll::Ready(Some(e.into_inner())), + } + } + + pub(super) fn send_rsp_data( + &mut self, + ip: Option, + data: IpLocationCacheResponse, + expired: bool, + ) { + if let Some(ip) = ip { + if let Some(timeout_key) = self.doing_cache.remove(&ip) { + if !expired { + self.doing_timeout_queue.remove(&timeout_key); + } + } + } + let _ = self.rsp_sender.send((ip, data)); + } + + pub(super) fn should_send_raw_query(&mut self, ip: IpAddr, query_wait: Duration) -> bool { + match self.doing_cache.entry(ip) { + hash_map::Entry::Occupied(_) => false, + hash_map::Entry::Vacant(cv) => { + let timeout_key = self.doing_timeout_queue.insert(ip, query_wait); + cv.insert(timeout_key); + true + } + } + } +} diff --git a/lib/g3-ip-locate/src/lib.rs b/lib/g3-ip-locate/src/lib.rs new file mode 100644 index 000000000..4970e4897 --- /dev/null +++ b/lib/g3-ip-locate/src/lib.rs @@ -0,0 +1,97 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::IpAddr; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::{mpsc, oneshot}; +use tokio::time::Instant; + +use g3_geoip::IpLocation; + +mod config; +pub use config::IpLocationServiceConfig; + +mod handle; +pub use handle::IpLocationServiceHandle; +use handle::{IpLocationCacheHandle, IpLocationQueryHandle}; + +mod cache; +use cache::IpLocationCacheRuntime; + +mod query; +use query::IpLocationQueryRuntime; + +mod protocol; +pub use protocol::{request_key, request_key_id, response_key, response_key_id}; + +mod request; + +mod response; +use response::Response; + +struct CacheQueryRequest { + ip: IpAddr, + notifier: oneshot::Sender>, +} + +struct IpLocationCacheResponse { + value: Option, + expire_at: Instant, +} + +impl IpLocationCacheResponse { + fn new(location: IpLocation, ttl: u32) -> Self { + let now = Instant::now(); + let expire_at = now + .checked_add(Duration::from_secs(ttl as u64)) + .unwrap_or(now); + IpLocationCacheResponse { + value: Some(location), + expire_at, + } + } + + fn empty(protective_ttl: u32) -> Self { + let now = Instant::now(); + let expire_at = now + .checked_add(Duration::from_secs(protective_ttl as u64)) + .unwrap_or(now); + IpLocationCacheResponse { + value: None, + expire_at, + } + } +} + +fn spawn_ip_location_cache( + config: &IpLocationServiceConfig, +) -> ( + IpLocationCacheRuntime, + IpLocationCacheHandle, + IpLocationQueryHandle, +) { + let (rsp_sender, rsp_receiver) = mpsc::unbounded_channel(); + let (query_sender, query_receiver) = mpsc::unbounded_channel(); + let (req_sender, req_receiver) = mpsc::unbounded_channel(); + + let cache_runtime = + IpLocationCacheRuntime::new(config, req_receiver, rsp_receiver, query_sender); + let cache_handle = IpLocationCacheHandle::new(req_sender); + let query_handle = IpLocationQueryHandle::new(query_receiver, rsp_sender); + (cache_runtime, cache_handle, query_handle) +} diff --git a/lib/g3-ip-locate/src/protocol.rs b/lib/g3-ip-locate/src/protocol.rs new file mode 100644 index 000000000..c95841e1a --- /dev/null +++ b/lib/g3-ip-locate/src/protocol.rs @@ -0,0 +1,35 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod request_key { + pub const IP: &str = "ip"; +} + +pub mod request_key_id { + pub const IP: u64 = 1; +} + +pub mod response_key { + pub const IP: &str = "ip"; + pub const TTL: &str = "ttl"; + pub const LOCATION: &str = "location"; +} + +pub mod response_key_id { + pub const IP: u64 = 1; + pub const TTL: u64 = 2; + pub const LOCATION: u64 = 3; +} diff --git a/lib/g3-ip-locate/src/query.rs b/lib/g3-ip-locate/src/query.rs new file mode 100644 index 000000000..109af183d --- /dev/null +++ b/lib/g3-ip-locate/src/query.rs @@ -0,0 +1,166 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::VecDeque; +use std::future::Future; +use std::io; +use std::net::IpAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use anyhow::anyhow; +use log::warn; +use tokio::io::ReadBuf; +use tokio::net::UdpSocket; + +use super::{IpLocationCacheResponse, IpLocationQueryHandle, IpLocationServiceConfig, Response}; + +pub(crate) struct IpLocationQueryRuntime { + socket: UdpSocket, + query_handle: IpLocationQueryHandle, + read_buffer: Box<[u8]>, + write_queue: VecDeque, + default_expire_ttl: u32, + maximum_expire_ttl: u32, + query_wait: Duration, +} + +impl IpLocationQueryRuntime { + pub(crate) fn new( + config: &IpLocationServiceConfig, + socket: UdpSocket, + query_handle: IpLocationQueryHandle, + ) -> Self { + IpLocationQueryRuntime { + socket, + query_handle, + read_buffer: vec![0u8; 16384].into_boxed_slice(), + write_queue: VecDeque::new(), + default_expire_ttl: config.default_expire_ttl, + maximum_expire_ttl: config.maximum_expire_ttl, + query_wait: config.query_wait_timeout, + } + } + + fn send_empty_result(&mut self, ip: IpAddr, ttl: u32, expired: bool) { + let result = IpLocationCacheResponse::empty(ttl); + self.query_handle.send_rsp_data(Some(ip), result, expired); + } + + fn send_expire_ttl(&mut self, ttl: u32) { + let result = IpLocationCacheResponse::empty(ttl); + self.query_handle.send_rsp_data(None, result, false); + } + + fn handle_req(&mut self, ip: IpAddr) { + if self.query_handle.should_send_raw_query(ip, self.query_wait) { + // TODO encode request + self.write_queue.push_back(ip); + } + } + + fn handle_rsp(&mut self, len: usize) { + let mut buf = &self.read_buffer[..len]; + match rmpv::decode::read_value_ref(&mut buf) + .map_err(|e| anyhow!("invalid msgpack response data: {e}")) + .and_then(|v| Response::parse(v)) + .map(|r| r.into_parts()) + { + Ok((ip, location, ttl)) => { + let ttl = ttl + .unwrap_or(self.default_expire_ttl) + .min(self.maximum_expire_ttl); + + if let Some(location) = location { + let result = IpLocationCacheResponse::new(location, ttl); + self.query_handle.send_rsp_data(ip, result, false); + } else if let Some(ip) = ip { + self.send_empty_result(ip, ttl, false); + } else { + self.send_expire_ttl(ttl); + } + } + Err(e) => { + warn!("parse cert generator rsp error: {e:?}"); + } + } + } + + fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + // handle rsp + let mut buf = ReadBuf::new(&mut self.read_buffer); + match self.socket.poll_recv(cx, &mut buf) { + Poll::Pending => {} + Poll::Ready(Err(e)) => { + warn!("socket recv error: {e:?}"); + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok(_)) => { + let len = buf.filled().len(); + if len > 0 { + self.handle_rsp(len); + } + } + } + + // send req from write queue + while let Some(ip) = self.write_queue.pop_front() { + let r = match ip { + IpAddr::V4(v4) => self.socket.poll_send(cx, &v4.octets()), + IpAddr::V6(v6) => self.socket.poll_send(cx, &v6.octets()), + }; + match r { + Poll::Pending => { + self.write_queue.push_front(ip); + break; + } + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(_)) => { + self.send_empty_result(ip, self.default_expire_ttl, false) + } + } + } + + // handle timeout + loop { + match self.query_handle.poll_query_expired(cx) { + Poll::Pending => break, + Poll::Ready(None) => break, + Poll::Ready(Some(ip)) => { + self.send_empty_result(ip, self.default_expire_ttl, true) + } + } + } + + // handle req + match self.query_handle.poll_recv_req(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Ready(Some(req)) => self.handle_req(req), + } + } + } +} + +impl Future for IpLocationQueryRuntime { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (*self).poll_loop(cx) + } +} diff --git a/lib/g3-ip-locate/src/request.rs b/lib/g3-ip-locate/src/request.rs new file mode 100644 index 000000000..867e1a183 --- /dev/null +++ b/lib/g3-ip-locate/src/request.rs @@ -0,0 +1,15 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ diff --git a/lib/g3-ip-locate/src/response.rs b/lib/g3-ip-locate/src/response.rs new file mode 100644 index 000000000..6af0a40bd --- /dev/null +++ b/lib/g3-ip-locate/src/response.rs @@ -0,0 +1,100 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::IpAddr; + +use anyhow::{anyhow, Context}; +use rmpv::ValueRef; + +use g3_geoip::IpLocation; + +use super::{response_key, response_key_id}; + +#[derive(Default)] +pub(super) struct Response { + ip: Option, + location: Option, + ttl: Option, +} + +impl Response { + fn set(&mut self, k: ValueRef, v: ValueRef) -> anyhow::Result<()> { + match k { + ValueRef::String(s) => { + let key = s + .as_str() + .ok_or_else(|| anyhow!("invalid string key {k}"))?; + match g3_msgpack::key::normalize(key).as_str() { + response_key::IP => { + let ip = g3_msgpack::value::as_ipaddr(&v) + .context(format!("invalid ip address value for key {key}"))?; + self.ip = Some(ip); + } + response_key::TTL => { + let ttl = g3_msgpack::value::as_u32(&v) + .context(format!("invalid u32 value for key {key}"))?; + self.ttl = Some(ttl); + } + response_key::LOCATION => { + let location = g3_msgpack::value::as_ip_location(&v) + .context(format!("invalid ip location value for key {key}"))?; + self.location = Some(location); + } + _ => {} // ignore unknown keys + } + } + ValueRef::Integer(i) => { + let key_id = i.as_u64().ok_or_else(|| anyhow!("invalid u64 key {k}"))?; + match key_id { + response_key_id::IP => { + let ip = g3_msgpack::value::as_ipaddr(&v) + .context(format!("invalid ip address value for key id {key_id}"))?; + self.ip = Some(ip); + } + response_key_id::TTL => { + let ttl = g3_msgpack::value::as_u32(&v) + .context(format!("invalid u32 value for key id {key_id}"))?; + self.ttl = Some(ttl); + } + response_key_id::LOCATION => { + let location = g3_msgpack::value::as_ip_location(&v) + .context(format!("invalid ip location value for key id {key_id}"))?; + self.location = Some(location); + } + _ => {} // ignore unknown keys + } + } + _ => return Err(anyhow!("unsupported key type: {k}")), + } + Ok(()) + } + + pub(super) fn parse(v: ValueRef) -> anyhow::Result { + if let ValueRef::Map(map) = v { + let mut response = Response::default(); + for (k, v) in map { + response.set(k, v)?; + } + Ok(response) + } else { + Err(anyhow!("the response data type should be 'map'")) + } + } + + pub(super) fn into_parts(self) -> (Option, Option, Option) { + (self.ip, self.location, self.ttl) + } +} diff --git a/lib/g3-msgpack/Cargo.toml b/lib/g3-msgpack/Cargo.toml index b38855f76..26e7ab976 100644 --- a/lib/g3-msgpack/Cargo.toml +++ b/lib/g3-msgpack/Cargo.toml @@ -16,9 +16,12 @@ chrono = { workspace = true, features = ["std"] } rustls = { workspace = true, optional = true } rustls-pemfile = { workspace = true, optional = true } openssl = { workspace = true, optional = true } +ip_network = { workspace = true, optional = true } g3-types.workspace = true +g3-geoip = { workspace = true, optional = true } [features] default = [] rustls = ["g3-types/rustls", "dep:rustls", "dep:rustls-pemfile"] openssl = ["g3-types/openssl", "dep:openssl"] +geoip = ["dep:g3-geoip", "dep:ip_network"] diff --git a/lib/g3-msgpack/src/value/geoip.rs b/lib/g3-msgpack/src/value/geoip.rs new file mode 100644 index 000000000..152e179d3 --- /dev/null +++ b/lib/g3-msgpack/src/value/geoip.rs @@ -0,0 +1,86 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::str::FromStr; + +use anyhow::{anyhow, Context}; +use rmpv::ValueRef; + +use g3_geoip::{ContinentCode, IpLocation, IpLocationBuilder, IsoCountryCode}; + +pub fn as_iso_country_code(value: &ValueRef) -> anyhow::Result { + let s = crate::value::as_string(value) + .context("msgpack 'string' value type is expected for iso country code")?; + let country = IsoCountryCode::from_str(&s).map_err(|_| anyhow!("invalid iso country code"))?; + Ok(country) +} + +pub fn as_continent_code(value: &ValueRef) -> anyhow::Result { + let s = crate::value::as_string(value) + .context("msgpack 'string' value type is expected for continent code")?; + let country = ContinentCode::from_str(&s).map_err(|_| anyhow!("invalid continent code"))?; + Ok(country) +} + +pub fn as_ip_location(value: &ValueRef) -> anyhow::Result { + if let ValueRef::Map(map) = value { + let mut builder = IpLocationBuilder::default(); + + for (k, v) in map { + let k = + crate::value::as_string(k).context("key of the map is not a valid string value")?; + match crate::key::normalize(&k).as_str() { + "network" | "net" => { + let net = crate::value::as_ip_network(v) + .context(format!("invalid ip network value for key {k}"))?; + builder.set_network(net); + } + "country" => { + let country = as_iso_country_code(v) + .context(format!("invalid iso country code value for key {k}"))?; + builder.set_country(country); + } + "continent" => { + let continent = as_continent_code(v) + .context(format!("invalid continent code value for key {k}"))?; + builder.set_continent(continent); + } + "as_number" | "asn" => { + let asn = crate::value::as_u32(v) + .context(format!("invalid u32 value for key {k}"))?; + builder.set_as_number(asn); + } + "isp_name" => { + let name = crate::value::as_string(v) + .context(format!("invalid string value for key {k}"))?; + builder.set_isp_name(name); + } + "isp_domain" => { + let domain = crate::value::as_string(v) + .context(format!("invalid string value for key {k}"))?; + builder.set_isp_domain(domain); + } + _ => return Err(anyhow!("invalid key {k}")), + } + } + + builder.build() + } else { + Err(anyhow!( + "msgpack value type for 'ip location' should be 'map'" + )) + } +} diff --git a/lib/g3-msgpack/src/value/mod.rs b/lib/g3-msgpack/src/value/mod.rs index 08a422210..807ab525b 100644 --- a/lib/g3-msgpack/src/value/mod.rs +++ b/lib/g3-msgpack/src/value/mod.rs @@ -16,23 +16,29 @@ mod datetime; mod metrics; +mod net; mod primary; mod tls; mod uuid; -#[cfg(feature = "openssl")] -mod openssl; - -#[cfg(feature = "rustls")] -mod rustls; - pub use self::uuid::as_uuid; pub use datetime::as_rfc3339_datetime; pub use metrics::{as_metrics_name, as_weighted_metrics_name}; +pub use net::*; pub use primary::{as_f64, as_string, as_u32, as_weighted_name_string}; pub use tls::{as_tls_cert_usage, as_tls_service_type}; +#[cfg(feature = "openssl")] +mod openssl; +#[cfg(feature = "openssl")] pub use openssl::{as_openssl_certificate, as_openssl_certificates, as_openssl_private_key}; +#[cfg(feature = "rustls")] +mod rustls; #[cfg(feature = "rustls")] pub use self::rustls::{as_rustls_certificates, as_rustls_private_key}; + +#[cfg(feature = "geoip")] +mod geoip; +#[cfg(feature = "geoip")] +pub use geoip::{as_continent_code, as_ip_location, as_iso_country_code}; diff --git a/lib/g3-msgpack/src/value/net/base.rs b/lib/g3-msgpack/src/value/net/base.rs new file mode 100644 index 000000000..3f6775dd5 --- /dev/null +++ b/lib/g3-msgpack/src/value/net/base.rs @@ -0,0 +1,64 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::IpAddr; +use std::str::FromStr; + +use anyhow::anyhow; +#[cfg(feature = "geoip")] +use ip_network::IpNetwork; +use rmpv::ValueRef; + +pub fn as_ipaddr(value: &ValueRef) -> anyhow::Result { + match value { + ValueRef::String(s) => { + let s = s + .as_str() + .ok_or(anyhow!("invalid utf-8 ip address string value"))?; + let ip = IpAddr::from_str(s).map_err(|e| anyhow!("invalid ip address: {e}"))?; + Ok(ip) + } + _ => Err(anyhow!( + "msgpack value type for 'IpAddr' should be 'string'" + )), + } +} + +#[cfg(feature = "geoip")] +pub fn as_ip_network(value: &ValueRef) -> anyhow::Result { + if let ValueRef::String(s) = value { + let s = s + .as_str() + .ok_or(anyhow!("invalid utf-8 ip network string value"))?; + let net = match IpNetwork::from_str(s) { + Ok(net) => net, + Err(_) => match IpAddr::from_str(s) { + Ok(IpAddr::V4(ip4)) => IpNetwork::new(ip4, 32) + .map_err(|_| anyhow!("failed to add ipv4 address: internal error"))?, + Ok(IpAddr::V6(ip6)) => IpNetwork::new(ip6, 128) + .map_err(|_| anyhow!("failed to add ipv6 address: internal error"))?, + Err(_) => { + return Err(anyhow!("invalid network or ip string: {s}")); + } + }, + }; + Ok(net) + } else { + Err(anyhow!( + "yaml value type for 'IpNetwork' should be 'string'" + )) + } +} diff --git a/lib/g3-msgpack/src/value/net/mod.rs b/lib/g3-msgpack/src/value/net/mod.rs new file mode 100644 index 000000000..fe2785e14 --- /dev/null +++ b/lib/g3-msgpack/src/value/net/mod.rs @@ -0,0 +1,22 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod base; + +pub use base::as_ipaddr; + +#[cfg(feature = "geoip")] +pub use base::as_ip_network; diff --git a/lib/g3-yaml/src/value/geoip.rs b/lib/g3-yaml/src/value/geoip.rs index 9219afaaf..cc0ed3fe2 100644 --- a/lib/g3-yaml/src/value/geoip.rs +++ b/lib/g3-yaml/src/value/geoip.rs @@ -14,9 +14,9 @@ * limitations under the License. */ -use anyhow::anyhow; use std::str::FromStr; +use anyhow::anyhow; use yaml_rust::Yaml; use g3_geoip::{ContinentCode, IsoCountryCode};