Skip to content

Commit

Permalink
update expire check for ip locate response
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 9, 2024
1 parent a9644a5 commit d9be405
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 60 deletions.
6 changes: 3 additions & 3 deletions g3proxy/doc/configuration/values/geoip.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,23 @@ The keys are:

Set the timeout for the cache runtime to wait response from the query runtime.

**default**: 400ms
**default**: 100ms

.. _conf_value_ip_locate_service_default_expire_ttl:

* default_expire_ttl

**optional**, **type**: u32

Set the default expire ttl for the ip locate service.
Set the default expire ttl for the response.

**default**: 10

* maximum_expire_ttl

**optional**, **type**: u32

Set the maximum expire ttl for the ip locate service.
Set the maximum expire ttl for the response.

**default**: 300

Expand Down
4 changes: 2 additions & 2 deletions g3proxy/doc/protocol/helper/ip_locate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The peer service should listen on a UDP port, which may be IPv4 or IPv6 based, w
Each UDP packet from our side to the peer service will contains exactly one request. And each UDP packet from the peer
service should contains exactly one response.

The peer service can also push location and expire ttl responses directly to our side without any prior request.
The peer service can also push location response directly to our side without any prior request.

Both the request and the response are structured data and should be encoded in `msgpack`_ format.

Expand Down Expand Up @@ -55,7 +55,7 @@ ttl

**optional**, **id**: 3, **type**: u32

Set the expire ttl of the peer service.
Set the expire ttl of the response.

If not set, the :ref:`default expire ttl <conf_value_ip_locate_service_default_expire_ttl>` config will
take effect.
43 changes: 0 additions & 43 deletions g3proxy/src/config/geoip.rs

This file was deleted.

36 changes: 25 additions & 11 deletions lib/g3-ip-locate/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use ahash::AHashMap;
use ip_network_table::IpNetworkTable;
Expand All @@ -32,14 +31,18 @@ use g3_geoip::IpLocation;

use super::{CacheQueryRequest, IpLocateServiceConfig, IpLocationCacheResponse};

struct CacheValue {
valid_before: Instant,
location: Arc<IpLocation>,
}

pub(crate) struct IpLocationCacheRuntime {
request_batch_handle_count: usize,
cache: IpNetworkTable<Arc<IpLocation>>,
cache: IpNetworkTable<CacheValue>,
doing: AHashMap<IpAddr, Vec<CacheQueryRequest>>,
req_receiver: mpsc::UnboundedReceiver<CacheQueryRequest>,
rsp_receiver: mpsc::UnboundedReceiver<(Option<IpAddr>, IpLocationCacheResponse)>,
query_sender: mpsc::UnboundedSender<IpAddr>,
expire_at: Instant,
}

impl IpLocationCacheRuntime {
Expand All @@ -56,12 +59,10 @@ impl IpLocationCacheRuntime {
req_receiver,
rsp_receiver,
query_sender,
expire_at: Instant::now(),
}
}

fn handle_rsp(&mut self, ip: Option<IpAddr>, mut rsp: IpLocationCacheResponse) {
self.expire_at = rsp.expire_at;
if let Some(location) = rsp.value.take() {
let net = location.network_addr();
let location = Arc::new(location);
Expand All @@ -75,7 +76,22 @@ impl IpLocationCacheRuntime {
}

// also allow push if no doing ip found
self.cache.insert(net, location);
self.cache.insert(
net,
CacheValue {
valid_before: rsp.expire_at,
location,
},
);
} else if let Some(ip) = ip {
// if no new value found, just use the old expired value
if let Some((_net, v)) = self.cache.longest_match(ip) {
if let Some(vec) = self.doing.remove(&ip) {
for req in vec.into_iter() {
let _ = req.notifier.send(v.location.clone());
}
}
}
}
}

Expand All @@ -88,12 +104,10 @@ impl IpLocationCacheRuntime {

fn handle_req(&mut self, req: CacheQueryRequest) {
if let Some((_net, v)) = self.cache.longest_match(req.ip) {
let _ = req.notifier.send(v.clone());
if self.expire_at.elapsed() != Duration::ZERO {
// trigger query again if expired
self.send_req(req.ip);
if v.valid_before >= Instant::now() {
let _ = req.notifier.send(v.location.clone());
return;
}
return;
}

match self.doing.entry(req.ip) {
Expand Down
2 changes: 1 addition & 1 deletion lib/g3-ip-locate/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Default for IpLocateServiceConfig {
cache_request_timeout: Duration::from_millis(800),
query_peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 2888),
query_socket_buffer: SocketBufferConfig::default(),
query_wait_timeout: Duration::from_millis(400),
query_wait_timeout: Duration::from_millis(100),
default_expire_ttl: 10,
maximum_expire_ttl: 300,
}
Expand Down

0 comments on commit d9be405

Please sign in to comment.