Skip to content

Commit

Permalink
add new g3-ip-locate lib crate
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 9, 2024
1 parent 89ffd1e commit 1775599
Show file tree
Hide file tree
Showing 20 changed files with 1,144 additions and 9 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ members = [
"lib/g3-tls-cert",
"lib/g3-slog-types",
"lib/g3-geoip",
"lib/g3-ip-locate",
"lib/g3-openssl",
"g3bench",
"g3fcgen",
Expand Down Expand Up @@ -219,6 +220,7 @@ g3-stdlog = { version = "0.1", path = "lib/g3-stdlog" }
g3-syslog = { version = "0.6", path = "lib/g3-syslog" }
g3-slog-types = { version = "0.1", path = "lib/g3-slog-types" }
g3-geoip = { version = "0.1", path = "lib/g3-geoip" }
g3-ip-locate = { version = "0.1", path = "lib/g3-ip-locate" }
g3-tls-cert = { version = "0.5", path = "lib/g3-tls-cert" }
g3-types = { version = "0.4", path = "lib/g3-types" }
g3-xcrypt = { version = "0.1", path = "lib/g3-xcrypt" }
Expand Down
1 change: 1 addition & 0 deletions lib/g3-geoip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ arc-swap.workspace = true
once_cell.workspace = true
ip_network.workspace = true
ip_network_table.workspace = true
smol_str.workspace = true
csv = "1.2"
flate2.workspace = true
zip = { workspace = true, features = ["deflate"] }
5 changes: 3 additions & 2 deletions lib/g3-geoip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
* limitations under the License.
*/

extern crate core;

mod continent;
pub use continent::{Continent, ContinentCode};

mod country;
pub use country::IsoCountryCode;

mod location;
pub use location::{IpLocation, IpLocationBuilder};

mod record;
pub use record::{GeoIpAsnRecord, GeoIpCountryRecord};

Expand Down
116 changes: 116 additions & 0 deletions lib/g3-geoip/src/location.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use anyhow::anyhow;
use ip_network::IpNetwork;
use smol_str::SmolStr;

use super::{ContinentCode, IsoCountryCode};

#[derive(Default)]
pub struct IpLocationBuilder {
net: Option<IpNetwork>,
country: Option<IsoCountryCode>,
continent: Option<ContinentCode>,
as_number: Option<u32>,
isp_name: Option<SmolStr>,
isp_domain: Option<SmolStr>,
}

impl IpLocationBuilder {
pub fn set_network(&mut self, net: IpNetwork) {
self.net = Some(net);
}

pub fn set_country(&mut self, country: IsoCountryCode) {
self.country = Some(country);
}

pub fn set_continent(&mut self, continent: ContinentCode) {
self.continent = Some(continent);
}

pub fn set_as_number(&mut self, number: u32) {
self.as_number = Some(number);
}

pub fn set_isp_name(&mut self, name: String) {
self.isp_name = Some(name.into());
}

pub fn set_isp_domain(&mut self, domain: String) {
self.isp_domain = Some(domain.into());
}

pub fn build(mut self) -> anyhow::Result<IpLocation> {
let net = self
.net
.take()
.ok_or(anyhow!("network address is not set"))?;
let continent = self
.continent
.or_else(|| self.country.map(|c| c.continent()));
Ok(IpLocation {
net,
country: self.country,
continent,
as_number: self.as_number,
isp_name: self.isp_name,
isp_domain: self.isp_domain,
})
}
}

pub struct IpLocation {
net: IpNetwork,
country: Option<IsoCountryCode>,
continent: Option<ContinentCode>,
as_number: Option<u32>,
isp_name: Option<SmolStr>,
isp_domain: Option<SmolStr>,
}

impl IpLocation {
#[inline]
pub fn network_addr(&self) -> IpNetwork {
self.net
}

#[inline]
pub fn country(&self) -> Option<IsoCountryCode> {
self.country
}

#[inline]
pub fn continent(&self) -> Option<ContinentCode> {
self.continent
}

#[inline]
pub fn network_asn(&self) -> Option<u32> {
self.as_number
}

#[inline]
pub fn isp_name(&self) -> Option<&str> {
self.isp_name.as_deref()
}

#[inline]
pub fn isp_domain(&self) -> Option<&str> {
self.isp_domain.as_deref()
}
}
19 changes: 19 additions & 0 deletions lib/g3-ip-locate/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "g3-ip-locate"
version = "0.1.0"
license.workspace = true
edition.workspace = true

[dependencies]
anyhow.workspace = true
log.workspace = true
ip_network.workspace = true
ip_network_table.workspace = true
tokio = { workspace = true, features = ["sync", "net", "rt"] }
tokio-util = { workspace = true, features = ["time"] }
ahash.workspace = true
rmpv.workspace = true
g3-types.workspace = true
g3-geoip.workspace = true
g3-msgpack = { workspace = true, features = ["geoip"] }
g3-socket.workspace = true
140 changes: 140 additions & 0 deletions lib/g3-ip-locate/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::collections::hash_map;
use std::future::Future;
use std::io;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use ahash::AHashMap;
use ip_network_table::IpNetworkTable;
use tokio::sync::mpsc;
use tokio::time::Instant;

use g3_geoip::IpLocation;

use super::{CacheQueryRequest, IpLocationCacheResponse, IpLocationServiceConfig};

pub(crate) struct IpLocationCacheRuntime {
request_batch_handle_count: usize,
cache: IpNetworkTable<Arc<IpLocation>>,
doing: AHashMap<IpAddr, Vec<CacheQueryRequest>>,
req_receiver: mpsc::UnboundedReceiver<CacheQueryRequest>,
rsp_receiver: mpsc::UnboundedReceiver<(Option<IpAddr>, IpLocationCacheResponse)>,
query_sender: mpsc::UnboundedSender<IpAddr>,
expire_at: Instant,
}

impl IpLocationCacheRuntime {
pub(crate) fn new(
config: &IpLocationServiceConfig,
req_receiver: mpsc::UnboundedReceiver<CacheQueryRequest>,
rsp_receiver: mpsc::UnboundedReceiver<(Option<IpAddr>, IpLocationCacheResponse)>,
query_sender: mpsc::UnboundedSender<IpAddr>,
) -> Self {
IpLocationCacheRuntime {
request_batch_handle_count: config.cache_request_batch_count,
cache: IpNetworkTable::new(),
doing: AHashMap::new(),
req_receiver,
rsp_receiver,
query_sender,
expire_at: Instant::now(),
}
}

fn handle_rsp(&mut self, ip: Option<IpAddr>, mut rsp: IpLocationCacheResponse) {
self.expire_at = rsp.expire_at;
if let Some(location) = rsp.value.take() {
let net = location.network_addr();
let location = Arc::new(location);

if let Some(ip) = ip {
if let Some(vec) = self.doing.remove(&ip) {
for req in vec.into_iter() {
let _ = req.notifier.send(location.clone());
}
}
}

// also allow push if no doing ip found
self.cache.insert(net, location);
}
}

fn send_req(&mut self, ip: IpAddr) {
if self.query_sender.send(ip).is_err() {
// the query runtime should not close before the cache runtime
unreachable!()
}
}

fn handle_req(&mut self, req: CacheQueryRequest) {
if let Some((_net, v)) = self.cache.longest_match(req.ip) {
let _ = req.notifier.send(v.clone());
if self.expire_at.elapsed() != Duration::ZERO {
// trigger query again if expired
self.send_req(req.ip);
}
return;
}

match self.doing.entry(req.ip) {
hash_map::Entry::Occupied(mut o) => {
o.get_mut().push(req);
}
hash_map::Entry::Vacant(v) => {
let ip = req.ip;
v.insert(vec![req]);
self.send_req(ip);
}
}
}

fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
// handle rsp
loop {
match self.rsp_receiver.poll_recv(cx) {
Poll::Pending => break,
Poll::Ready(None) => unreachable!(), // unreachable as we have kept a sender
Poll::Ready(Some((ip, rsp))) => self.handle_rsp(ip, rsp),
}
}

// handle req
for _ in 1..self.request_batch_handle_count {
match self.req_receiver.poll_recv(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Ready(Some(req)) => self.handle_req(req),
}
}
}
}
}

impl Future for IpLocationCacheRuntime {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
(*self).poll_loop(cx)
}
}
Loading

0 comments on commit 1775599

Please sign in to comment.