Skip to content

Commit

Permalink
add netif outbound resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
bdbai committed Apr 20, 2024
1 parent 11e1191 commit 2e9b3a5
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
3 changes: 2 additions & 1 deletion ytflow-bin-shared/src/edit/gen/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ impl PluginType {
}),
PluginType::Netif => cbor!(NetifFactory {
family_preference: FamilyPreference::Ipv4Only,
selection: SelectionMode::Manual("eth0".into())
selection: SelectionMode::Manual("eth0".into()),
outbound_resolver: None,
}),
}
.unwrap(),
Expand Down
42 changes: 36 additions & 6 deletions ytflow/src/config/plugin/netif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@ use crate::config::*;
use crate::plugin::netif;

#[derive(Serialize, Deserialize)]
pub struct NetifFactory {
pub struct NetifFactory<'a> {
pub family_preference: netif::FamilyPreference,
#[serde(flatten)]
pub selection: netif::SelectionMode,
pub outbound_resolver: Option<&'a str>,
}

impl NetifFactory {
pub(in super::super) fn parse(plugin: &Plugin) -> ConfigResult<ParsedPlugin<'static, Self>> {
impl<'de> NetifFactory<'de> {
pub(in super::super) fn parse(plugin: &'de Plugin) -> ConfigResult<ParsedPlugin<'de, Self>> {
let Plugin { name, param, .. } = plugin;
let config: Self = parse_param(name, param)?;
Ok(ParsedPlugin {
requires: config
.outbound_resolver
.iter()
.map(|r| Descriptor {
descriptor: *r,
r#type: AccessPointType::RESOLVER,
})
.collect(),
factory: config,
requires: vec![],
provides: vec![
Descriptor {
descriptor: name.to_string() + ".tcp",
Expand All @@ -37,10 +45,32 @@ impl NetifFactory {
}
}

impl Factory for NetifFactory {
impl<'a> Factory for NetifFactory<'a> {
#[cfg(feature = "plugins")]
fn load(&mut self, plugin_name: String, set: &mut PartialPluginSet) -> LoadResult<()> {
let netif = netif::NetifSelector::new(self.selection.clone(), self.family_preference);
use crate::plugin::null::Null;

let mut err = None;
let netif =
netif::NetifSelector::new(self.selection.clone(), self.family_preference, |weak| {
set.stream_outbounds
.insert(plugin_name.clone() + ".tcp", weak.clone());
set.datagram_outbounds
.insert(plugin_name.clone() + ".udp", weak.clone());
set.resolver
.insert(plugin_name.clone() + ".resolver", weak.clone());

self.outbound_resolver.map(|outbound_resolver| {
set.get_or_create_resolver(plugin_name.clone(), outbound_resolver)
.unwrap_or_else(|e| {
err = Some(e);
Arc::downgrade(&(Arc::new(Null) as _))
})
})
});
if let Some(err) = err {
set.errors.push(err);
}
set.control_hub.create_plugin_control(
plugin_name.clone(),
"netif",
Expand Down
25 changes: 22 additions & 3 deletions ytflow/src/plugin/netif/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ pub struct NetifSelector {
pub(super) cached_netif: ArcSwap<sys::Netif>,
provider: sys::NetifProvider,
resolver: sys::Resolver,
outbound_resolver: Option<Weak<dyn Resolver>>,
me: Weak<Self>,
}

impl NetifSelector {
pub fn new(selection: SelectionMode, prefer: FamilyPreference) -> Arc<Self> {
pub fn new(
selection: SelectionMode,
prefer: FamilyPreference,
create_outbound_resolver: impl FnOnce(&Weak<Self>) -> Option<Weak<dyn Resolver>>,
) -> Arc<Self> {
let dummy_netif = sys::Netif {
name: String::from("dummy_netif_awaiting_change"),
..sys::Netif::default()
};
Arc::<Self>::new_cyclic(|this| {
let outbound_resolver = create_outbound_resolver(this);
let this = this.clone();
let provider = sys::NetifProvider::new({
let this = this.clone();
Expand All @@ -35,6 +41,7 @@ impl NetifSelector {
cached_netif: ArcSwap::new(Arc::new(dummy_netif)),
provider,
resolver: sys::Resolver::new(this.clone()),
outbound_resolver,
me: this,
}
})
Expand Down Expand Up @@ -72,9 +79,15 @@ impl StreamOutboundFactory for NetifSelector {
) -> FlowResult<(Box<dyn Stream>, Buffer)> {
let preference = self.selection.load().1;
let netif = self.cached_netif.load();
let resolver = self
.outbound_resolver
.as_ref()
.map(|r| r.upgrade().ok_or(FlowError::NoOutbound))
.transpose()?
.unwrap_or_else(|| self.me.upgrade().unwrap());
crate::plugin::socket::dial_stream(
context,
self.me.upgrade().unwrap(),
resolver,
// A workaround for E0308 "one type is more general than the other"
// https://github.com/rust-lang/rust/issues/70263
Some(|s: &mut _| sys::bind_socket_v4(&netif, s)).filter(|_| {
Expand All @@ -100,9 +113,15 @@ impl DatagramSessionFactory for NetifSelector {
async fn bind(&self, context: Box<FlowContext>) -> FlowResult<Box<dyn DatagramSession>> {
let preference = self.selection.load().1;
let netif = self.cached_netif.load_full();
let resolver = self
.outbound_resolver
.as_ref()
.map(|r| r.upgrade().ok_or(FlowError::NoOutbound))
.transpose()?
.unwrap_or_else(|| self.me.upgrade().unwrap());
crate::plugin::socket::dial_datagram_session(
&context,
self.me.upgrade().unwrap(),
resolver,
// A workaround for E0308 "one type is more general than the other"
// https://github.com/rust-lang/rust/issues/70263
Some({
Expand Down

0 comments on commit 2e9b3a5

Please sign in to comment.