From 2167530d79f0f44fd0b6f19b3f913c80473c7b6a Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 13 May 2024 14:38:10 +0800 Subject: [PATCH] g3-socket: add initial windows support --- lib/g3-ctl/src/opts.rs | 3 +- lib/g3-daemon/src/listen/quic.rs | 28 ++++---- lib/g3-daemon/src/listen/tcp.rs | 4 +- lib/g3-daemon/src/server/connection.rs | 14 ++-- lib/g3-fluentd/Cargo.toml | 2 +- lib/g3-socket/Cargo.toml | 4 +- lib/g3-socket/src/guard.rs | 85 ----------------------- lib/g3-socket/src/lib.rs | 5 +- lib/g3-socket/src/raw/mod.rs | 93 ++++++++++++++++++++++++++ lib/g3-socket/src/raw/unix.rs | 49 ++++++++++++++ lib/g3-socket/src/raw/windows.rs | 49 ++++++++++++++ lib/g3-socket/src/sockopt.rs | 18 ----- lib/g3-socket/src/tcp.rs | 66 ++++-------------- lib/g3-socket/src/udp.rs | 63 ++++++----------- lib/g3-types/src/net/proxy/http.rs | 1 + 15 files changed, 257 insertions(+), 227 deletions(-) delete mode 100644 lib/g3-socket/src/guard.rs create mode 100644 lib/g3-socket/src/raw/mod.rs create mode 100644 lib/g3-socket/src/raw/unix.rs create mode 100644 lib/g3-socket/src/raw/windows.rs diff --git a/lib/g3-ctl/src/opts.rs b/lib/g3-ctl/src/opts.rs index 1eb3ce14..46291757 100644 --- a/lib/g3-ctl/src/opts.rs +++ b/lib/g3-ctl/src/opts.rs @@ -15,15 +15,16 @@ */ use std::io; -#[cfg(unix)] use std::path::PathBuf; use anyhow::anyhow; use clap::{value_parser, Arg, ArgMatches, Command, ValueHint}; use clap_complete::Shell; use tokio::io::AsyncWriteExt; +#[cfg(unix)] use tokio::net::UnixStream; +#[cfg(unix)] const DEFAULT_TMP_CONTROL_DIR: &str = "/tmp/g3"; const GLOBAL_ARG_COMPLETION: &str = "completion"; diff --git a/lib/g3-daemon/src/listen/quic.rs b/lib/g3-daemon/src/listen/quic.rs index 73b8feb1..2c3b2293 100644 --- a/lib/g3-daemon/src/listen/quic.rs +++ b/lib/g3-daemon/src/listen/quic.rs @@ -16,7 +16,6 @@ use std::io; use std::net::{SocketAddr, UdpSocket}; -use std::os::fd::{AsRawFd, RawFd}; use std::sync::Arc; use std::time::Duration; @@ -27,6 +26,7 @@ use tokio::runtime::Handle; use tokio::sync::{broadcast, watch}; use g3_socket::util::native_socket_addr; +use g3_socket::RawSocket; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::net::UdpListenConfig; @@ -119,7 +119,7 @@ where mut self, listener: Endpoint, mut listen_addr: SocketAddr, - mut sock_raw_fd: RawFd, + mut sock_raw_fd: RawSocket, mut server_reload_channel: broadcast::Receiver, mut quic_cfg_receiver: watch::Receiver, ) where @@ -172,12 +172,12 @@ where if let Some(listen_config) = aux_config.take_udp_listen_config() { self.listen_config = listen_config; if self.listen_config.address() != listen_addr { - if let Ok((fd, addr)) = self.rebind_socket(&listener) { - sock_raw_fd = fd; + if let Ok((socket, addr)) = self.rebind_socket(&listener) { + sock_raw_fd = socket; listen_addr = addr; } } else { - self.update_socket_opts(sock_raw_fd); + self.update_socket_opts(&sock_raw_fd); } } } @@ -282,9 +282,8 @@ where } } - fn update_socket_opts(&self, raw_fd: RawFd) { - if let Err(e) = g3_socket::udp::set_raw_opts(raw_fd, self.listen_config.socket_misc_opts()) - { + fn update_socket_opts(&self, raw_socket: &RawSocket) { + if let Err(e) = raw_socket.set_udp_misc_opts(self.listen_config.socket_misc_opts()) { warn!( "SRT[{}_v{}#{}] update socket misc opts failed: {e}", self.server.name(), @@ -292,8 +291,7 @@ where self.instance_id, ); } - if let Err(e) = g3_socket::udp::set_raw_buf_opts(raw_fd, self.listen_config.socket_buffer()) - { + if let Err(e) = raw_socket.set_buf_opts(self.listen_config.socket_buffer()) { warn!( "SRT[{}_v{}#{}] update socket buf opts failed: {e}", self.server.name(), @@ -303,12 +301,12 @@ where } } - fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawFd, SocketAddr)> { + fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawSocket, SocketAddr)> { match g3_socket::udp::new_std_bind_listen(&self.listen_config) { Ok(socket) => { - let raw_fd = socket.as_raw_fd(); + let raw_socket = RawSocket::from(&socket); match listener.rebind(socket) { - Ok(_) => Ok((raw_fd, listener.local_addr().unwrap())), + Ok(_) => Ok((raw_socket, listener.local_addr().unwrap())), Err(e) => { warn!( "SRT[{}_v{}#{}] reload rebind {} failed: {e}", @@ -408,7 +406,7 @@ where { let handle = self.get_rt_handle(listen_in_worker); handle.spawn(async move { - let sock_raw_fd = socket.as_raw_fd(); + let raw_socket = RawSocket::from(&socket); // make sure the listen socket associated with the correct reactor match Endpoint::new( Default::default(), @@ -421,7 +419,7 @@ where self.run( endpoint, listen_addr, - sock_raw_fd, + raw_socket, server_reload_channel, quic_cfg_receiver, ) diff --git a/lib/g3-daemon/src/listen/tcp.rs b/lib/g3-daemon/src/listen/tcp.rs index 733c2074..4fb96297 100644 --- a/lib/g3-daemon/src/listen/tcp.rs +++ b/lib/g3-daemon/src/listen/tcp.rs @@ -15,7 +15,6 @@ */ use std::net::SocketAddr; -use std::os::fd::AsRawFd; use std::sync::Arc; use async_trait::async_trait; @@ -26,6 +25,7 @@ use tokio::sync::broadcast; use g3_io_ext::LimitedTcpListener; use g3_socket::util::native_socket_addr; +use g3_socket::RawSocket; use g3_types::net::TcpListenConfig; use crate::listen::ListenStats; @@ -178,7 +178,7 @@ where let server = self.server.clone(); let mut cc_info = ClientConnectionInfo::new(peer_addr, local_addr); - cc_info.set_tcp_raw_fd(stream.as_raw_fd()); + cc_info.set_tcp_raw_socket(RawSocket::from(&stream)); if let Some(worker_id) = self.worker_id { cc_info.set_worker_id(Some(worker_id)); tokio::spawn(async move { diff --git a/lib/g3-daemon/src/server/connection.rs b/lib/g3-daemon/src/server/connection.rs index 85d66db3..98c8b5c4 100644 --- a/lib/g3-daemon/src/server/connection.rs +++ b/lib/g3-daemon/src/server/connection.rs @@ -16,9 +16,9 @@ use std::io; use std::net::{IpAddr, SocketAddr}; -use std::os::fd::RawFd; use g3_io_ext::haproxy::ProxyAddr; +use g3_socket::RawSocket; use g3_types::net::TcpMiscSockOpts; #[derive(Clone, Debug)] @@ -29,7 +29,7 @@ pub struct ClientConnectionInfo { sock_peer_addr: SocketAddr, #[allow(unused)] sock_local_addr: SocketAddr, - tcp_sock_raw_fd: Option, + tcp_raw_socket: Option, } impl ClientConnectionInfo { @@ -40,13 +40,13 @@ impl ClientConnectionInfo { server_addr: local_addr, sock_peer_addr: peer_addr, sock_local_addr: local_addr, - tcp_sock_raw_fd: None, + tcp_raw_socket: None, } } #[inline] - pub fn set_tcp_raw_fd(&mut self, raw_fd: RawFd) { - self.tcp_sock_raw_fd = Some(raw_fd); + pub fn set_tcp_raw_socket(&mut self, raw_fd: RawSocket) { + self.tcp_raw_socket = Some(raw_fd); } #[inline] @@ -105,8 +105,8 @@ impl ClientConnectionInfo { opts: &TcpMiscSockOpts, default_set_nodelay: bool, ) -> io::Result<()> { - if let Some(raw_fd) = self.tcp_sock_raw_fd { - g3_socket::tcp::set_raw_opts(raw_fd, opts, default_set_nodelay) + if let Some(raw_socket) = &self.tcp_raw_socket { + raw_socket.set_tcp_misc_opts(opts, default_set_nodelay) } else { Ok(()) } diff --git a/lib/g3-fluentd/Cargo.toml b/lib/g3-fluentd/Cargo.toml index 9eba7d8e..64943eb8 100644 --- a/lib/g3-fluentd/Cargo.toml +++ b/lib/g3-fluentd/Cargo.toml @@ -17,7 +17,7 @@ flume = { workspace = true, features = ["async"] } rmp.workspace = true rmp-serde.workspace = true serde.workspace = true -tokio = { workspace = true, features = ["rt", "net", "time", "macros"] } +tokio = { workspace = true, features = ["rt", "net", "time", "macros", "io-util"] } tokio-rustls.workspace = true rand.workspace = true digest.workspace = true diff --git a/lib/g3-socket/Cargo.toml b/lib/g3-socket/Cargo.toml index 064f188c..9c71b6b9 100644 --- a/lib/g3-socket/Cargo.toml +++ b/lib/g3-socket/Cargo.toml @@ -8,8 +8,10 @@ rust-version = "1.75.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -libc.workspace = true tokio = { workspace = true, features = ["net"] } socket2 = { version = "0.5", features = ["all"] } fastrand.workspace = true g3-types.workspace = true + +[target.'cfg(target_os = "linux")'.dependencies] +libc.workspace = true diff --git a/lib/g3-socket/src/guard.rs b/lib/g3-socket/src/guard.rs deleted file mode 100644 index 22da5e77..00000000 --- a/lib/g3-socket/src/guard.rs +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::ops::{Deref, DerefMut}; -use std::os::fd::{FromRawFd, IntoRawFd, RawFd}; - -pub struct RawFdGuard -where - T: FromRawFd + IntoRawFd, -{ - inner: Option, -} - -impl RawFdGuard -where - T: FromRawFd + IntoRawFd, -{ - pub fn new(fd: RawFd) -> Self { - Self { - inner: unsafe { Some(T::from_raw_fd(fd)) }, - } - } -} - -impl Drop for RawFdGuard -where - T: FromRawFd + IntoRawFd, -{ - fn drop(&mut self) { - if let Some(resource) = self.inner.take() { - let _ = resource.into_raw_fd(); - } - } -} - -impl Deref for RawFdGuard -where - T: FromRawFd + IntoRawFd, -{ - type Target = T; - - fn deref(&self) -> &Self::Target { - // the only way setting inner to None is drop - self.inner.as_ref().unwrap() - } -} - -impl DerefMut for RawFdGuard -where - T: FromRawFd + IntoRawFd, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - // the only way setting inner to None is drop - self.inner.as_mut().unwrap() - } -} - -#[cfg(test)] -mod tests { - use super::RawFdGuard; - use socket2::Socket; - - #[test] - fn not_close_fd() { - let fd = 0; - { - let _socket = RawFdGuard::::new(fd); - // unsafe { libc::close(fd) }; - } - assert!(unsafe { libc::fcntl(fd, libc::F_GETFD) } != -1); - } -} diff --git a/lib/g3-socket/src/lib.rs b/lib/g3-socket/src/lib.rs index e1e60606..b65c19c0 100644 --- a/lib/g3-socket/src/lib.rs +++ b/lib/g3-socket/src/lib.rs @@ -14,9 +14,12 @@ * limitations under the License. */ +#[cfg(target_os = "linux")] mod sockopt; -pub mod guard; +mod raw; +pub use raw::RawSocket; + pub mod tcp; pub mod udp; pub mod util; diff --git a/lib/g3-socket/src/raw/mod.rs b/lib/g3-socket/src/raw/mod.rs new file mode 100644 index 00000000..4f77f6d8 --- /dev/null +++ b/lib/g3-socket/src/raw/mod.rs @@ -0,0 +1,93 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::io; + +use socket2::Socket; + +use g3_types::net::{SocketBufferConfig, TcpMiscSockOpts, UdpMiscSockOpts}; + +#[cfg(unix)] +mod unix; +#[cfg(windows)] +mod windows; + +#[derive(Debug)] +pub struct RawSocket { + inner: Option, +} + +impl RawSocket { + pub fn set_buf_opts(&self, buf_conf: SocketBufferConfig) -> io::Result<()> { + let Some(socket) = self.inner.as_ref() else { + return Err(io::Error::other("")); + }; + if let Some(size) = buf_conf.recv_size() { + socket.set_recv_buffer_size(size)?; + } + if let Some(size) = buf_conf.send_size() { + socket.set_send_buffer_size(size)?; + } + Ok(()) + } + + pub fn set_tcp_misc_opts( + &self, + misc_opts: &TcpMiscSockOpts, + default_set_nodelay: bool, + ) -> io::Result<()> { + let Some(socket) = self.inner.as_ref() else { + return Err(io::Error::other("")); + }; + if let Some(no_delay) = misc_opts.no_delay { + socket.set_nodelay(no_delay)?; + } else if default_set_nodelay { + socket.set_nodelay(true)?; + } + #[cfg(unix)] + if let Some(mss) = misc_opts.max_segment_size { + socket.set_mss(mss)?; + } + if let Some(ttl) = misc_opts.time_to_live { + socket.set_ttl(ttl)?; + } + if let Some(tos) = misc_opts.type_of_service { + socket.set_tos(tos as u32)?; + } + #[cfg(target_os = "linux")] + if let Some(mark) = misc_opts.netfilter_mark { + socket.set_mark(mark)?; + } + Ok(()) + } + + pub fn set_udp_misc_opts(&self, misc_opts: UdpMiscSockOpts) -> io::Result<()> { + let Some(socket) = self.inner.as_ref() else { + return Err(io::Error::other("")); + }; + if let Some(ttl) = misc_opts.time_to_live { + socket.set_ttl(ttl)?; + } + if let Some(tos) = misc_opts.type_of_service { + socket.set_tos(tos as u32)?; + } + #[cfg(target_os = "linux")] + if let Some(mark) = misc_opts.netfilter_mark { + socket.set_mark(mark)?; + } + Ok(()) + } +} diff --git a/lib/g3-socket/src/raw/unix.rs b/lib/g3-socket/src/raw/unix.rs new file mode 100644 index 00000000..09e543a1 --- /dev/null +++ b/lib/g3-socket/src/raw/unix.rs @@ -0,0 +1,49 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; + +use socket2::Socket; + +use super::RawSocket; + +#[cfg(unix)] +impl Drop for RawSocket { + fn drop(&mut self) { + if let Some(s) = self.inner.take() { + let _ = s.into_raw_fd(); + } + } +} + +impl Clone for RawSocket { + fn clone(&self) -> Self { + if let Some(s) = &self.inner { + Self::from(s) + } else { + RawSocket { inner: None } + } + } +} + +impl From<&T> for RawSocket { + fn from(value: &T) -> Self { + let socket = unsafe { Socket::from_raw_fd(value.as_raw_fd()) }; + RawSocket { + inner: Some(socket), + } + } +} diff --git a/lib/g3-socket/src/raw/windows.rs b/lib/g3-socket/src/raw/windows.rs new file mode 100644 index 00000000..1883da0c --- /dev/null +++ b/lib/g3-socket/src/raw/windows.rs @@ -0,0 +1,49 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::os::windows::io::{AsRawSocket, FromRawSocket}; + +use socket2::Socket; + +use super::RawSocket; + +#[cfg(unix)] +impl Drop for RawSocket { + fn drop(&mut self) { + if let Some(s) = self.inner.take() { + let _ = s.into_raw_socket(); + } + } +} + +impl Clone for RawSocket { + fn clone(&self) -> Self { + if let Some(s) = &self.inner { + Self::from(s) + } else { + RawSocket { inner: None } + } + } +} + +impl From<&T> for RawSocket { + fn from(value: &T) -> Self { + let socket = unsafe { Socket::from_raw_socket(value.as_raw_socket()) }; + RawSocket { + inner: Some(socket), + } + } +} diff --git a/lib/g3-socket/src/sockopt.rs b/lib/g3-socket/src/sockopt.rs index f831dfb1..bc706d64 100644 --- a/lib/g3-socket/src/sockopt.rs +++ b/lib/g3-socket/src/sockopt.rs @@ -37,19 +37,6 @@ where Ok(()) } -pub(crate) fn set_only_ipv6(fd: c_int, only_ipv6: bool) -> io::Result<()> { - unsafe { - setsockopt( - fd, - libc::IPPROTO_IPV6, - libc::IPV6_V6ONLY, - only_ipv6 as c_int, - )?; - Ok(()) - } -} - -#[cfg(target_os = "linux")] pub(crate) fn set_bind_address_no_port(fd: c_int, enable: bool) -> io::Result<()> { unsafe { setsockopt( @@ -61,8 +48,3 @@ pub(crate) fn set_bind_address_no_port(fd: c_int, enable: bool) -> io::Result<() Ok(()) } } - -#[cfg(not(target_os = "linux"))] -pub(crate) fn set_bind_address_no_port(_fd: c_int, _enable: bool) -> io::Result<()> { - Ok(()) -} diff --git a/lib/g3-socket/src/tcp.rs b/lib/g3-socket/src/tcp.rs index f3171135..7806b642 100644 --- a/lib/g3-socket/src/tcp.rs +++ b/lib/g3-socket/src/tcp.rs @@ -16,22 +16,27 @@ use std::io; use std::net::{IpAddr, SocketAddr}; -use std::os::fd::{AsRawFd, RawFd}; +#[cfg(target_os = "linux")] +use std::os::unix::io::AsRawFd; use socket2::{Domain, SockAddr, Socket, TcpKeepalive, Type}; use tokio::net::{TcpListener, TcpSocket}; use g3_types::net::{TcpKeepAliveConfig, TcpListenConfig, TcpMiscSockOpts}; -use super::guard::RawFdGuard; -use super::sockopt::{set_bind_address_no_port, set_only_ipv6}; +#[cfg(target_os = "linux")] +use super::sockopt::set_bind_address_no_port; use super::util::AddressFamily; +use super::RawSocket; pub fn new_std_listener(config: &TcpListenConfig) -> io::Result { let addr = config.address(); let socket = new_tcp_socket(AddressFamily::from(&addr))?; if addr.port() != 0 { + #[cfg(unix)] socket.set_reuse_port(true)?; + #[cfg(not(unix))] + socket.set_reuse_address(true)?; } if config.is_ipv6only() { socket.set_only_v6(true)?; @@ -66,6 +71,7 @@ pub fn new_std_socket_to( format!("peer_ip {peer_ip} and bind_ip {ip} should be of the same family",), )); } + #[cfg(target_os = "linux")] set_bind_address_no_port(socket.as_raw_fd(), true)?; let addr: SockAddr = SocketAddr::new(ip, 0).into(); socket.bind(&addr)?; @@ -76,51 +82,17 @@ pub fn new_std_socket_to( if let Some(interval) = keepalive.probe_interval() { setting = setting.with_interval(interval); } + #[cfg(unix)] if let Some(count) = keepalive.probe_count() { setting = setting.with_retries(count); } socket.set_tcp_keepalive(&setting)?; } - set_misc_opts(&socket, misc_opts, default_set_nodelay)?; + RawSocket::from(&socket).set_tcp_misc_opts(misc_opts, default_set_nodelay)?; Ok(std::net::TcpStream::from(socket)) } -pub fn set_raw_opts( - fd: RawFd, - misc_opts: &TcpMiscSockOpts, - default_set_nodelay: bool, -) -> io::Result<()> { - let socket = RawFdGuard::::new(fd); - set_misc_opts(&socket, misc_opts, default_set_nodelay) -} - -fn set_misc_opts( - socket: &Socket, - misc_opts: &TcpMiscSockOpts, - default_set_nodelay: bool, -) -> io::Result<()> { - if let Some(no_delay) = misc_opts.no_delay { - socket.set_nodelay(no_delay)?; - } else if default_set_nodelay { - socket.set_nodelay(true)?; - } - if let Some(mss) = misc_opts.max_segment_size { - socket.set_mss(mss)?; - } - if let Some(ttl) = misc_opts.time_to_live { - socket.set_ttl(ttl)?; - } - if let Some(tos) = misc_opts.type_of_service { - socket.set_tos(tos as u32)?; - } - #[cfg(target_os = "linux")] - if let Some(mark) = misc_opts.netfilter_mark { - socket.set_mark(mark)?; - } - Ok(()) -} - -#[cfg(target_os = "macos")] +#[cfg(any(windows, target_os = "macos"))] fn new_tcp_socket(family: AddressFamily) -> io::Result { let socket = Socket::new(Domain::from(family), Type::STREAM, None)?; socket.set_nonblocking(true)?; @@ -140,18 +112,8 @@ fn new_tcp_socket(family: AddressFamily) -> io::Result { } pub fn new_listen_to(config: &TcpListenConfig) -> io::Result { - let addr = config.address(); - let socket = match addr { - SocketAddr::V4(_) => TcpSocket::new_v4()?, - SocketAddr::V6(_) => TcpSocket::new_v6()?, - }; - socket.set_reuseport(true)?; - if config.is_ipv6only() { - let raw_fd = socket.as_raw_fd(); - set_only_ipv6(raw_fd, true)?; - } - socket.bind(addr)?; - socket.listen(config.backlog()) + let socket = new_std_listener(config)?; + TcpListener::from_std(socket) } pub fn new_socket_to( diff --git a/lib/g3-socket/src/udp.rs b/lib/g3-socket/src/udp.rs index 7b8f1018..3a0e5e24 100644 --- a/lib/g3-socket/src/udp.rs +++ b/lib/g3-socket/src/udp.rs @@ -16,15 +16,17 @@ use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; -use std::os::fd::{AsRawFd, RawFd}; +#[cfg(target_os = "linux")] +use std::os::unix::io::AsRawFd; use socket2::{Domain, SockAddr, Socket, Type}; use g3_types::net::{PortRange, SocketBufferConfig, UdpListenConfig, UdpMiscSockOpts}; -use super::guard::RawFdGuard; +#[cfg(target_os = "linux")] use super::sockopt::set_bind_address_no_port; use super::util::AddressFamily; +use super::RawSocket; pub fn new_std_socket_to( peer_addr: SocketAddr, @@ -41,11 +43,12 @@ pub fn new_std_socket_to( format!("peer_addr {peer_addr} and bind_ip {ip} should be of the same family",), )); } + #[cfg(target_os = "linux")] set_bind_address_no_port(socket.as_raw_fd(), true)?; let addr: SockAddr = SocketAddr::new(ip, 0).into(); socket.bind(&addr)?; } - set_misc_opts(&socket, misc_opts)?; + RawSocket::from(&socket).set_udp_misc_opts(misc_opts)?; Ok(UdpSocket::from(socket)) } @@ -59,7 +62,7 @@ pub fn new_std_bind_connect( None => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), }; let socket = new_udp_socket(AddressFamily::from(&bind_addr), buf_conf)?; - set_misc_opts(&socket, misc_opts)?; + RawSocket::from(&socket).set_udp_misc_opts(misc_opts)?; let bind_addr = SockAddr::from(bind_addr); socket.bind(&bind_addr)?; let socket = UdpSocket::from(socket); @@ -80,7 +83,7 @@ pub fn new_std_in_range_bind_connect( debug_assert!(port_start < port_end); let socket = new_udp_socket(AddressFamily::from(&bind_ip), buf_conf)?; - set_misc_opts(&socket, misc_opts)?; + RawSocket::from(&socket).set_udp_misc_opts(misc_opts)?; // like what's has been done in dante/sockd/sockd_request.c let tries = port.count().min(10); @@ -125,7 +128,7 @@ pub fn new_std_bind_relay( let socket = new_udp_socket(AddressFamily::from(&bind_addr), buf_conf)?; let bind_addr = SockAddr::from(bind_addr); socket.bind(&bind_addr)?; - set_misc_opts(&socket, misc_opts)?; + RawSocket::from(&socket).set_udp_misc_opts(misc_opts)?; Ok(UdpSocket::from(socket)) } @@ -133,72 +136,44 @@ pub fn new_std_bind_listen(config: &UdpListenConfig) -> io::Result { let addr = config.address(); let socket = new_udp_socket(AddressFamily::from(&addr), config.socket_buffer())?; if addr.port() != 0 { + #[cfg(unix)] socket.set_reuse_port(true)?; + #[cfg(not(unix))] + socket.set_reuse_address(true)?; } if config.is_ipv6only() { socket.set_only_v6(true)?; } let bind_addr = SockAddr::from(addr); socket.bind(&bind_addr)?; - set_misc_opts(&socket, config.socket_misc_opts())?; + RawSocket::from(&socket).set_udp_misc_opts(config.socket_misc_opts())?; Ok(UdpSocket::from(socket)) } pub fn new_std_rebind_listen(config: &UdpListenConfig, addr: SocketAddr) -> io::Result { let socket = new_udp_socket(AddressFamily::from(&addr), config.socket_buffer())?; if addr.port() != 0 { + #[cfg(unix)] socket.set_reuse_port(true)?; + #[cfg(not(unix))] + socket.set_reuse_address(true)?; } if config.is_ipv6only() { socket.set_only_v6(true)?; } let bind_addr = SockAddr::from(addr); socket.bind(&bind_addr)?; - set_misc_opts(&socket, config.socket_misc_opts())?; + RawSocket::from(&socket).set_udp_misc_opts(config.socket_misc_opts())?; Ok(UdpSocket::from(socket)) } -pub fn set_raw_opts(fd: RawFd, misc_opts: UdpMiscSockOpts) -> io::Result<()> { - let socket = RawFdGuard::::new(fd); - set_misc_opts(&socket, misc_opts) -} - -pub fn set_raw_buf_opts(fd: RawFd, buf_conf: SocketBufferConfig) -> io::Result<()> { - let socket = RawFdGuard::::new(fd); - set_buf_opts(&socket, buf_conf) -} - -fn set_misc_opts(socket: &Socket, misc_opts: UdpMiscSockOpts) -> io::Result<()> { - if let Some(ttl) = misc_opts.time_to_live { - socket.set_ttl(ttl)?; - } - if let Some(tos) = misc_opts.type_of_service { - socket.set_tos(tos as u32)?; - } - #[cfg(target_os = "linux")] - if let Some(mark) = misc_opts.netfilter_mark { - socket.set_mark(mark)?; - } - Ok(()) -} - fn new_udp_socket(family: AddressFamily, buf_conf: SocketBufferConfig) -> io::Result { let socket = new_nonblocking_udp_socket(family)?; - set_buf_opts(&socket, buf_conf)?; + RawSocket::from(&socket).set_buf_opts(buf_conf)?; Ok(socket) } -fn set_buf_opts(socket: &Socket, buf_conf: SocketBufferConfig) -> io::Result<()> { - if let Some(size) = buf_conf.recv_size() { - socket.set_recv_buffer_size(size)?; - } - if let Some(size) = buf_conf.send_size() { - socket.set_send_buffer_size(size)?; - } - Ok(()) -} - -#[cfg(target_os = "macos")] +#[cfg(any(windows, target_os = "macos"))] fn new_nonblocking_udp_socket(family: AddressFamily) -> io::Result { let socket = Socket::new(Domain::from(family), Type::DGRAM, None)?; socket.set_nonblocking(true)?; diff --git a/lib/g3-types/src/net/proxy/http.rs b/lib/g3-types/src/net/proxy/http.rs index 108b5af9..bd2a0d5f 100644 --- a/lib/g3-types/src/net/proxy/http.rs +++ b/lib/g3-types/src/net/proxy/http.rs @@ -45,6 +45,7 @@ impl HttpProxy { Ok(HttpProxy { peer, auth, + #[cfg(feature = "openssl")] tls_config: None, }) }