Skip to content

Commit

Permalink
g3-socket: add initial windows support
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 13, 2024
1 parent 0d489a1 commit 2167530
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 227 deletions.
3 changes: 2 additions & 1 deletion lib/g3-ctl/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/

use std::io;
#[cfg(unix)]
use std::path::PathBuf;

use anyhow::anyhow;
use clap::{value_parser, Arg, ArgMatches, Command, ValueHint};
use clap_complete::Shell;
use tokio::io::AsyncWriteExt;
#[cfg(unix)]
use tokio::net::UnixStream;

#[cfg(unix)]
const DEFAULT_TMP_CONTROL_DIR: &str = "/tmp/g3";

const GLOBAL_ARG_COMPLETION: &str = "completion";
Expand Down
28 changes: 13 additions & 15 deletions lib/g3-daemon/src/listen/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -27,6 +26,7 @@ use tokio::runtime::Handle;
use tokio::sync::{broadcast, watch};

use g3_socket::util::native_socket_addr;
use g3_socket::RawSocket;
use g3_types::acl::{AclAction, AclNetworkRule};
use g3_types::net::UdpListenConfig;

Expand Down Expand Up @@ -119,7 +119,7 @@ where
mut self,
listener: Endpoint,
mut listen_addr: SocketAddr,
mut sock_raw_fd: RawFd,
mut sock_raw_fd: RawSocket,
mut server_reload_channel: broadcast::Receiver<ServerReloadCommand>,
mut quic_cfg_receiver: watch::Receiver<C>,
) where
Expand Down Expand Up @@ -172,12 +172,12 @@ where
if let Some(listen_config) = aux_config.take_udp_listen_config() {
self.listen_config = listen_config;
if self.listen_config.address() != listen_addr {
if let Ok((fd, addr)) = self.rebind_socket(&listener) {
sock_raw_fd = fd;
if let Ok((socket, addr)) = self.rebind_socket(&listener) {
sock_raw_fd = socket;
listen_addr = addr;
}
} else {
self.update_socket_opts(sock_raw_fd);
self.update_socket_opts(&sock_raw_fd);
}
}
}
Expand Down Expand Up @@ -282,18 +282,16 @@ where
}
}

fn update_socket_opts(&self, raw_fd: RawFd) {
if let Err(e) = g3_socket::udp::set_raw_opts(raw_fd, self.listen_config.socket_misc_opts())
{
fn update_socket_opts(&self, raw_socket: &RawSocket) {
if let Err(e) = raw_socket.set_udp_misc_opts(self.listen_config.socket_misc_opts()) {
warn!(
"SRT[{}_v{}#{}] update socket misc opts failed: {e}",
self.server.name(),
self.server_version,
self.instance_id,
);
}
if let Err(e) = g3_socket::udp::set_raw_buf_opts(raw_fd, self.listen_config.socket_buffer())
{
if let Err(e) = raw_socket.set_buf_opts(self.listen_config.socket_buffer()) {
warn!(
"SRT[{}_v{}#{}] update socket buf opts failed: {e}",
self.server.name(),
Expand All @@ -303,12 +301,12 @@ where
}
}

fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawFd, SocketAddr)> {
fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawSocket, SocketAddr)> {
match g3_socket::udp::new_std_bind_listen(&self.listen_config) {
Ok(socket) => {
let raw_fd = socket.as_raw_fd();
let raw_socket = RawSocket::from(&socket);
match listener.rebind(socket) {
Ok(_) => Ok((raw_fd, listener.local_addr().unwrap())),
Ok(_) => Ok((raw_socket, listener.local_addr().unwrap())),
Err(e) => {
warn!(
"SRT[{}_v{}#{}] reload rebind {} failed: {e}",
Expand Down Expand Up @@ -408,7 +406,7 @@ where
{
let handle = self.get_rt_handle(listen_in_worker);
handle.spawn(async move {
let sock_raw_fd = socket.as_raw_fd();
let raw_socket = RawSocket::from(&socket);
// make sure the listen socket associated with the correct reactor
match Endpoint::new(
Default::default(),
Expand All @@ -421,7 +419,7 @@ where
self.run(
endpoint,
listen_addr,
sock_raw_fd,
raw_socket,
server_reload_channel,
quic_cfg_receiver,
)
Expand Down
4 changes: 2 additions & 2 deletions lib/g3-daemon/src/listen/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -26,6 +25,7 @@ use tokio::sync::broadcast;

use g3_io_ext::LimitedTcpListener;
use g3_socket::util::native_socket_addr;
use g3_socket::RawSocket;
use g3_types::net::TcpListenConfig;

use crate::listen::ListenStats;
Expand Down Expand Up @@ -178,7 +178,7 @@ where
let server = self.server.clone();

let mut cc_info = ClientConnectionInfo::new(peer_addr, local_addr);
cc_info.set_tcp_raw_fd(stream.as_raw_fd());
cc_info.set_tcp_raw_socket(RawSocket::from(&stream));
if let Some(worker_id) = self.worker_id {
cc_info.set_worker_id(Some(worker_id));
tokio::spawn(async move {
Expand Down
14 changes: 7 additions & 7 deletions lib/g3-daemon/src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

use std::io;
use std::net::{IpAddr, SocketAddr};
use std::os::fd::RawFd;

use g3_io_ext::haproxy::ProxyAddr;
use g3_socket::RawSocket;
use g3_types::net::TcpMiscSockOpts;

#[derive(Clone, Debug)]
Expand All @@ -29,7 +29,7 @@ pub struct ClientConnectionInfo {
sock_peer_addr: SocketAddr,
#[allow(unused)]
sock_local_addr: SocketAddr,
tcp_sock_raw_fd: Option<RawFd>,
tcp_raw_socket: Option<RawSocket>,
}

impl ClientConnectionInfo {
Expand All @@ -40,13 +40,13 @@ impl ClientConnectionInfo {
server_addr: local_addr,
sock_peer_addr: peer_addr,
sock_local_addr: local_addr,
tcp_sock_raw_fd: None,
tcp_raw_socket: None,
}
}

#[inline]
pub fn set_tcp_raw_fd(&mut self, raw_fd: RawFd) {
self.tcp_sock_raw_fd = Some(raw_fd);
pub fn set_tcp_raw_socket(&mut self, raw_fd: RawSocket) {
self.tcp_raw_socket = Some(raw_fd);
}

#[inline]
Expand Down Expand Up @@ -105,8 +105,8 @@ impl ClientConnectionInfo {
opts: &TcpMiscSockOpts,
default_set_nodelay: bool,
) -> io::Result<()> {
if let Some(raw_fd) = self.tcp_sock_raw_fd {
g3_socket::tcp::set_raw_opts(raw_fd, opts, default_set_nodelay)
if let Some(raw_socket) = &self.tcp_raw_socket {
raw_socket.set_tcp_misc_opts(opts, default_set_nodelay)
} else {
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion lib/g3-fluentd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ flume = { workspace = true, features = ["async"] }
rmp.workspace = true
rmp-serde.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["rt", "net", "time", "macros"] }
tokio = { workspace = true, features = ["rt", "net", "time", "macros", "io-util"] }
tokio-rustls.workspace = true
rand.workspace = true
digest.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion lib/g3-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ rust-version = "1.75.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libc.workspace = true
tokio = { workspace = true, features = ["net"] }
socket2 = { version = "0.5", features = ["all"] }
fastrand.workspace = true
g3-types.workspace = true

[target.'cfg(target_os = "linux")'.dependencies]
libc.workspace = true
85 changes: 0 additions & 85 deletions lib/g3-socket/src/guard.rs

This file was deleted.

5 changes: 4 additions & 1 deletion lib/g3-socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

#[cfg(target_os = "linux")]
mod sockopt;

pub mod guard;
mod raw;
pub use raw::RawSocket;

pub mod tcp;
pub mod udp;
pub mod util;
Loading

0 comments on commit 2167530

Please sign in to comment.