Skip to content

Commit

Permalink
Merge pull request #3 from appsignal/nix_010
Browse files Browse the repository at this point in the history
Upgrade nix
  • Loading branch information
thijsc authored Feb 4, 2025
2 parents 76af5fe + cd25024 commit a06be05
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ exclude = [

[dependencies]
log = "*"
nix = "0.9.0"
nix = "0.23"
libc = "*"
bytes = "0.3.0"
net2 = { version = "*", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl UdpSocket {
}

pub fn recv_from(&self, buf: &mut [u8])
-> io::Result<Option<(usize, SocketAddr)>> {
-> io::Result<Option<(usize, Option<SocketAddr>)>> {
self.sys.recv_from(buf)
}

Expand Down
20 changes: 10 additions & 10 deletions src/sys/unix/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,27 @@ fn ioevent_to_epoll(interest: EventSet, opts: PollOpt) -> EpollFlags {
let mut kind = EpollFlags::empty();

if interest.is_readable() {
kind.insert(EPOLLIN);
kind.insert(EpollFlags::EPOLLIN);
}

if interest.is_writable() {
kind.insert(EPOLLOUT);
kind.insert(EpollFlags::EPOLLOUT);
}

if interest.is_hup() {
kind.insert(EPOLLRDHUP);
kind.insert(EpollFlags::EPOLLRDHUP);
}

if opts.is_edge() {
kind.insert(EPOLLET);
kind.insert(EpollFlags::EPOLLET);
}

if opts.is_oneshot() {
kind.insert(EPOLLONESHOT);
kind.insert(EpollFlags::EPOLLONESHOT);
}

if opts.is_level() {
kind.remove(EPOLLET);
kind.remove(EpollFlags::EPOLLET);
}

kind
Expand Down Expand Up @@ -151,20 +151,20 @@ impl Events {
let epoll = self.events[idx].events();
let mut kind = EventSet::none();

if epoll.contains(EPOLLIN) {
if epoll.contains(EpollFlags::EPOLLIN) {
kind = kind | EventSet::readable();
}

if epoll.contains(EPOLLOUT) {
if epoll.contains(EpollFlags::EPOLLOUT) {
kind = kind | EventSet::writable();
}

// EPOLLHUP - Usually means a socket error happened
if epoll.contains(EPOLLERR) {
if epoll.contains(EpollFlags::EPOLLERR) {
kind = kind | EventSet::error();
}

if epoll.contains(EPOLLRDHUP) | epoll.contains(EPOLLHUP) {
if epoll.contains(EpollFlags::EPOLLRDHUP) | epoll.contains(EpollFlags::EPOLLHUP) {
kind = kind | EventSet::hup();
}

Expand Down
23 changes: 11 additions & 12 deletions src/sys/unix/kqueue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use {io, EventSet, PollOpt, Token};
use event::IoEvent;
use nix::sys::event::{EventFilter, EventFlag, FilterFlag, KEvent, kqueue, kevent, kevent_ts};
use nix::sys::event::{EV_ADD, EV_CLEAR, EV_DELETE, EV_DISABLE, EV_ENABLE, EV_EOF, EV_ERROR, EV_ONESHOT};
use libc::{timespec, time_t, c_long};
use std::{fmt, slice};
use std::os::unix::io::RawFd;
Expand Down Expand Up @@ -74,27 +73,27 @@ impl Selector {
}

pub fn deregister(&mut self, fd: RawFd) -> io::Result<()> {
self.ev_push(fd, 0, EventFilter::EVFILT_READ, EV_DELETE);
self.ev_push(fd, 0, EventFilter::EVFILT_WRITE, EV_DELETE);
self.ev_push(fd, 0, EventFilter::EVFILT_READ, EventFlag::EV_DELETE);
self.ev_push(fd, 0, EventFilter::EVFILT_WRITE, EventFlag::EV_DELETE);

self.flush_changes()
}

fn ev_register(&mut self, fd: RawFd, token: usize, filter: EventFilter, enable: bool, opts: PollOpt) {
let mut flags = EV_ADD;
let mut flags = EventFlag::EV_ADD;

if enable {
flags = flags | EV_ENABLE;
flags = flags | EventFlag::EV_ENABLE;
} else {
flags = flags | EV_DISABLE;
flags = flags | EventFlag::EV_DISABLE;
}

if opts.contains(PollOpt::edge()) {
flags = flags | EV_CLEAR;
flags = flags | EventFlag::EV_CLEAR;
}

if opts.contains(PollOpt::oneshot()) {
flags = flags | EV_ONESHOT;
flags = flags | EventFlag::EV_ONESHOT;
}

self.ev_push(fd, token, filter, flags);
Expand Down Expand Up @@ -163,17 +162,17 @@ impl Events {

}

if e.flags().contains(EV_ERROR) {
if e.flags().contains(EventFlag::EV_ERROR) {
self.events[idx].kind.insert(EventSet::error());
}

if e.filter() == EventFilter::EVFILT_READ {
if e.filter() == Ok(EventFilter::EVFILT_READ) {
self.events[idx].kind.insert(EventSet::readable());
} else if e.filter() == EventFilter::EVFILT_WRITE {
} else if e.filter() == Ok(EventFilter::EVFILT_WRITE) {
self.events[idx].kind.insert(EventSet::writable());
}

if e.flags().contains(EV_EOF) {
if e.flags().contains(EventFlag::EV_EOF) {
self.events[idx].kind.insert(EventSet::hup());

// When the read end of the socket is closed, EV_EOF is set on
Expand Down
53 changes: 31 additions & 22 deletions src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,51 +30,60 @@ pub use self::udp::UdpSocket;
pub use self::uds::UnixSocket;

pub fn pipe() -> ::io::Result<(Io, Io)> {
use nix::fcntl::{O_NONBLOCK, O_CLOEXEC};
use nix::unistd::pipe2;
// Create pipe without flags
let (rd, wr) = nix::pipe().map_err(from_nix_error)?;

let (rd, wr) = pipe2(O_NONBLOCK | O_CLOEXEC)
.map_err(from_nix_error)?;
// Set O_CLOEXEC and O_NONBLOCK manually
for &fd in &[rd, wr] {
let flags = nix::OFlag::from_bits_truncate(nix::fcntl(fd, nix::FcntlArg::F_GETFL).map_err(from_nix_error)?);
nix::fcntl(fd, nix::FcntlArg::F_SETFL(flags | nix::OFlag::O_CLOEXEC | nix::OFlag::O_NONBLOCK)).map_err(from_nix_error)?;
}

Ok((Io::from_raw_fd(rd), Io::from_raw_fd(wr)))
}

pub fn from_nix_error(err: ::nix::Error) -> ::io::Error {
match err {
nix::Error::Sys(errno) => {
::io::Error::from_raw_os_error(errno as i32)
},
_ => {
::io::Error::new(::io::ErrorKind::Other, err)
}
nix::Error::EINVAL
| nix::Error::EAGAIN
| nix::Error::EINTR
| nix::Error::EIO
| nix::Error::ECONNRESET
| nix::Error::EADDRINUSE
| nix::Error::EADDRNOTAVAIL
| nix::Error::EPIPE
| nix::Error::ETIMEDOUT
| nix::Error::EINPROGRESS
| nix::Error::ECONNREFUSED
| nix::Error::ENOENT => std::io::Error::from_raw_os_error(err as i32),
_ => std::io::Error::new(std::io::ErrorKind::Other, err),
}
}

mod nix {
pub use nix::Error;
pub use nix::libc::c_int;
pub use nix::errno::EINPROGRESS;
pub use nix::{cmsg_space, Error};
pub use nix::libc::{c_int, linger};
pub use nix::fcntl::{fcntl, FcntlArg, OFlag};
pub use nix::sys::socket::MsgFlags;
pub use nix::errno::Errno;
pub use nix::sys::socket::{
sockopt,
AddressFamily,
SockAddr,
SockFlag,
SockType,
InetAddr,
IpMembershipRequest,
Ipv6MembershipRequest,
Ipv4Addr,
Ipv6Addr,
ControlMessage,
CmsgSpace,
MSG_DONTWAIT,
SOCK_NONBLOCK,
SOCK_CLOEXEC,
accept4,
ControlMessageOwned,
accept,
bind,
connect,
getsockname,
getsockopt,
ip_mreq,
ipv6_mreq,
linger,
listen,
recvfrom,
recvmsg,
Expand All @@ -85,5 +94,5 @@ mod nix {
};
pub use nix::sys::time::TimeVal;
pub use nix::sys::uio::IoVec;
pub use nix::unistd::dup;
pub use nix::unistd::{dup, pipe};
}
46 changes: 28 additions & 18 deletions src/sys/unix/net.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
use {io};
use sys::unix::{nix, Io};
use std::net::SocketAddr;
use std::os::unix::io::{AsRawFd, RawFd};
use sys::unix::{nix, Io};

pub fn socket(family: nix::AddressFamily, ty: nix::SockType, nonblock: bool) -> io::Result<RawFd> {
let opts = if nonblock {
nix::SOCK_NONBLOCK | nix::SOCK_CLOEXEC
} else {
nix::SOCK_CLOEXEC
};
// Create the socket
let sock_fd = nix::socket(family, ty, nix::SockFlag::empty(), None)
.map_err(super::from_nix_error)?;

nix::socket(family, ty, opts, 0)
.map_err(super::from_nix_error)
if nonblock {
// Set the socket to nonblocking mode using fcntl
let flags = nix::OFlag::from_bits_truncate(
nix::fcntl(sock_fd, nix::FcntlArg::F_GETFL).map_err(super::from_nix_error)?
);
nix::fcntl(sock_fd, nix:: FcntlArg::F_SETFL(flags | nix::OFlag::O_NONBLOCK))
.map_err(super::from_nix_error)?;
}

Ok(sock_fd)
}

pub fn connect(io: &Io, addr: &nix::SockAddr) -> io::Result<bool> {
match nix::connect(io.as_raw_fd(), addr) {
Ok(_) => Ok(true),
Err(e) => {
match e {
nix::Error::Sys(nix::EINPROGRESS) => Ok(false),
nix::Errno::EINPROGRESS => Ok(false),
_ => Err(super::from_nix_error(e))
}
}
Expand All @@ -37,27 +43,31 @@ pub fn listen(io: &Io, backlog: usize) -> io::Result<()> {
}

pub fn accept(io: &Io, nonblock: bool) -> io::Result<RawFd> {
let opts = if nonblock {
nix::SOCK_NONBLOCK | nix::SOCK_CLOEXEC
} else {
nix::SOCK_CLOEXEC
};
let sock_fd = nix::accept(io.as_raw_fd()).map_err(super::from_nix_error)?;

nix::accept4(io.as_raw_fd(), opts)
.map_err(super::from_nix_error)
if nonblock {
// Set the socket to nonblocking mode using fcntl
let flags = nix::OFlag::from_bits_truncate(
nix::fcntl(sock_fd, nix::FcntlArg::F_GETFL).map_err(super::from_nix_error)?
);
nix::fcntl(sock_fd, nix::FcntlArg::F_SETFL(flags | nix::OFlag::O_NONBLOCK))
.map_err(super::from_nix_error)?;
}

Ok(sock_fd)
}

// UDP & UDS
#[inline]
pub fn recvfrom(io: &Io, buf: &mut [u8]) -> io::Result<(usize, nix::SockAddr)> {
pub fn recvfrom(io: &Io, buf: &mut [u8]) -> io::Result<(usize, Option<nix::SockAddr>)> {
nix::recvfrom(io.as_raw_fd(), buf)
.map_err(super::from_nix_error)
}

// UDP & UDS
#[inline]
pub fn sendto(io: &Io, buf: &[u8], target: &nix::SockAddr) -> io::Result<usize> {
nix::sendto(io.as_raw_fd(), buf, target, nix::MSG_DONTWAIT)
nix::sendto(io.as_raw_fd(), buf, target, nix::MsgFlags::MSG_DONTWAIT)
.map_err(super::from_nix_error)
}

Expand Down
5 changes: 3 additions & 2 deletions src/sys/unix/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::os::unix::io::{RawFd, FromRawFd, AsRawFd};

use libc;
use net2::TcpStreamExt;
use nix::fcntl::fcntl;
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};
use nix::fcntl::OFlag;

use {io, Evented, EventSet, PollOpt, Selector, Token};
use sys::unix::eventedfd::EventedFd;
Expand All @@ -25,7 +26,7 @@ pub struct TcpListener {
}

fn set_nonblock(s: &dyn AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(super::from_nix_error)
fcntl(s.as_raw_fd(), F_SETFL(OFlag::O_NONBLOCK)).map_err(super::from_nix_error)
.map(|_| ())
}

Expand Down
13 changes: 6 additions & 7 deletions src/sys/unix/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ impl UdpSocket {
.map_non_block()
}

pub fn recv_from(&self, buf: &mut [u8])
-> io::Result<Option<(usize, SocketAddr)>> {
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<Option<(usize, Option<SocketAddr>)>> {
net::recvfrom(&self.io, buf)
.map(|(cnt, addr)| (cnt, net::to_std_addr(addr)))
.map(|(cnt, addr_opt)| (cnt, addr_opt.map(net::to_std_addr)))
.map_non_block()
}

Expand All @@ -79,15 +78,15 @@ impl UdpSocket {
match *multi {
IpAddr::V4(ref addr) => {
// Create the request
let req = nix::ip_mreq::new(nix::Ipv4Addr::from_std(addr), None);
let req = nix::IpMembershipRequest::new(nix::Ipv4Addr::from_std(addr), None);

// Set the socket option
nix::setsockopt(self.as_raw_fd(), nix::sockopt::IpAddMembership, &req)
.map_err(super::from_nix_error)
}
IpAddr::V6(ref addr) => {
// Create the request
let req = nix::ipv6_mreq::new(nix::Ipv6Addr::from_std(addr));
let req = nix::Ipv6MembershipRequest::new(nix::Ipv6Addr::from_std(addr));

// Set the socket option
nix::setsockopt(self.as_raw_fd(), nix::sockopt::Ipv6AddMembership, &req)
Expand All @@ -100,15 +99,15 @@ impl UdpSocket {
match *multi {
IpAddr::V4(ref addr) => {
// Create the request
let req = nix::ip_mreq::new(nix::Ipv4Addr::from_std(addr), None);
let req = nix::IpMembershipRequest::new(nix::Ipv4Addr::from_std(addr), None);

// Set the socket option
nix::setsockopt(self.as_raw_fd(), nix::sockopt::IpDropMembership, &req)
.map_err(super::from_nix_error)
}
IpAddr::V6(ref addr) => {
// Create the request
let req = nix::ipv6_mreq::new(nix::Ipv6Addr::from_std(addr));
let req = nix::Ipv6MembershipRequest::new(nix::Ipv6Addr::from_std(addr));

// Set the socket option
nix::setsockopt(self.as_raw_fd(), nix::sockopt::Ipv6DropMembership, &req)
Expand Down
4 changes: 2 additions & 2 deletions src/sys/unix/uds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ impl UnixSocket {

pub fn read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
let iov = [nix::IoVec::from_mut_slice(buf)];
let mut cmsgspace: nix::CmsgSpace<[RawFd; 1]> = nix::CmsgSpace::new();
let mut cmsgspace = nix::cmsg_space!([RawFd; 1]);
let msg = nix::recvmsg(self.io.as_raw_fd(), &iov, Some(&mut cmsgspace), MsgFlags::empty())
.map_err(super::from_nix_error)?;
let mut fd = None;
for cmsg in msg.cmsgs() {
if let nix::ControlMessage::ScmRights(fds) = cmsg {
if let nix::ControlMessageOwned::ScmRights(fds) = cmsg {
// statically, there is room for at most one fd
if fds.len() == 1 {
fd = Some(fds[0]);
Expand Down
Loading

0 comments on commit a06be05

Please sign in to comment.