Skip to content

Commit

Permalink
poll-io support windows
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Mar 1, 2025
1 parent d86d2ce commit 0849775
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if [ "${NO_RUN}" != "1" ] && [ "${NO_RUN}" != "true" ]; then
export CARGO_NET_RETRY=5
export CARGO_NET_TIMEOUT=10

cargo install cross --git "https://github.com/cross-rs/cross" --rev "7b79041c9278769eca57fae10c74741f5aa5c14b"
cargo install cross --git "https://github.com/cross-rs/cross" --rev "4090beca3cfffa44371a5bba524de3a578aa46c3"
CARGO=cross

cargo clean
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::slice::{Iter, IterMut};

use mio::Token;
use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;

Expand Down Expand Up @@ -117,4 +119,30 @@ impl Events {
*status = unsafe { std::mem::zeroed() };
}
}

pub fn iter(&self) -> Iter<'_, Event> {
self.events.iter()
}

pub fn iter_mut(&mut self) -> IterMut<'_, Event> {
self.events.iter_mut()
}
}

impl<'a> IntoIterator for &'a Events {
type Item = &'a Event;
type IntoIter = Iter<'a, Event>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a> IntoIterator for &'a mut Events {
type Item = &'a mut Event;
type IntoIter = IterMut<'a, Event>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod waker;
pub use core::*;
use std::{
collections::VecDeque,
os::windows::prelude::RawSocket,
os::windows::prelude::{AsRawHandle, RawHandle, RawSocket},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -229,7 +229,7 @@ impl Poller {
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;

let mut afd_group = self.afd.lock().unwrap();
if afd_group.len() == 0 {
if afd_group.is_empty() {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
Expand Down Expand Up @@ -286,6 +286,12 @@ impl Drop for Poller {
}
}

impl AsRawHandle for Poller {
fn as_raw_handle(&self) -> RawHandle {
self.cp.as_raw_handle()
}
}

pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
let sock_ptr: *const Mutex<SockState> = ptr as *const _;
unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
Expand Down
File renamed without changes.
File renamed without changes.
23 changes: 9 additions & 14 deletions monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ use super::{
};
use crate::utils::slab::Slab;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(windows)]
pub(super) mod iocp;

#[cfg(feature = "sync")]
mod waker;
#[cfg(feature = "sync")]
Expand All @@ -32,9 +28,9 @@ pub(crate) struct LegacyInner {
#[cfg(unix)]
poll: mio::Poll,
#[cfg(windows)]
events: iocp::Events,
events: crate::driver::iocp::Events,
#[cfg(windows)]
poll: iocp::Poller,
poll: crate::driver::iocp::Poller,

#[cfg(feature = "sync")]
shared_waker: std::sync::Arc<waker::EventWaker>,
Expand Down Expand Up @@ -69,18 +65,17 @@ impl LegacyDriver {
#[cfg(unix)]
let poll = mio::Poll::new()?;
#[cfg(windows)]
let poll = iocp::Poller::new()?;
let poll = crate::driver::iocp::Poller::new()?;

#[cfg(all(unix, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
poll.registry(),
TOKEN_WAKEUP,
)?));
#[cfg(all(windows, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new(
&poll,
TOKEN_WAKEUP,
)?));
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
));
#[cfg(feature = "sync")]
let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
#[cfg(feature = "sync")]
Expand All @@ -93,7 +88,7 @@ impl LegacyDriver {
#[cfg(unix)]
poll,
#[cfg(windows)]
events: iocp::Events::with_capacity(entries as usize),
events: crate::driver::iocp::Events::with_capacity(entries as usize),
#[cfg(windows)]
poll,
#[cfg(feature = "sync")]
Expand Down Expand Up @@ -178,7 +173,7 @@ impl LegacyDriver {
#[cfg(windows)]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
Expand All @@ -198,7 +193,7 @@ impl LegacyDriver {
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/legacy/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::driver::unpark::Unpark;
pub(crate) struct EventWaker {
// raw waker
#[cfg(windows)]
waker: super::iocp::Waker,
waker: crate::driver::iocp::Waker,
#[cfg(unix)]
waker: mio::Waker,
// Atomic awake status
Expand All @@ -20,7 +20,7 @@ impl EventWaker {
}

#[cfg(windows)]
pub(crate) fn new(waker: super::iocp::Waker) -> Self {
pub(crate) fn new(waker: crate::driver::iocp::Waker) -> Self {
Self {
waker,
awake: std::sync::atomic::AtomicBool::new(true),
Expand Down
6 changes: 5 additions & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Monoio Driver.
#[allow(dead_code)]
pub(crate) mod op;
#[cfg(all(feature = "poll-io", unix))]
#[cfg(feature = "poll-io")]
pub(crate) mod poll;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod ready;
Expand All @@ -17,6 +17,10 @@ mod legacy;
#[cfg(all(target_os = "linux", feature = "iouring"))]
mod uring;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
pub(crate) mod iocp;

mod util;

use std::{
Expand Down
99 changes: 88 additions & 11 deletions monoio/src/driver/poll.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
use std::{io, task::Context, time::Duration};
use std::{
io,
ops::{Deref, DerefMut},
task::Context,
time::Duration,
};

#[cfg(unix)]
use mio::{event::Source, Events};
use mio::{Interest, Token};

use super::{op::MaybeFd, ready::Direction, scheduled_io::ScheduledIo};
#[cfg(windows)]
use crate::driver::iocp::{Events, Poller, SocketState};
use crate::{driver::op::CompletionMeta, utils::slab::Slab};

/// Poller with io dispatch.
// TODO: replace legacy impl with this Poll.
pub(crate) struct Poll {
pub(crate) io_dispatch: Slab<ScheduledIo>,
#[cfg(unix)]
poll: mio::Poll,
events: mio::Events,
#[cfg(unix)]
events: Events,
#[cfg(windows)]
poll: Poller,
#[cfg(windows)]
events: Events,
}

impl Poll {
#[inline]
pub(crate) fn with_capacity(capacity: usize) -> io::Result<Self> {
Ok(Self {
io_dispatch: Slab::new(),
#[cfg(unix)]
poll: mio::Poll::new()?,
events: mio::Events::with_capacity(capacity),
#[cfg(windows)]
poll: Poller::new()?,
events: Events::with_capacity(capacity),
})
}

Expand All @@ -41,14 +60,15 @@ impl Poll {
Ok(())
}

#[cfg(unix)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
interest: mio::Interest,
source: &mut impl Source,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
let registry = self.poll.registry();
match registry.register(source, mio::Token(token), interest) {
match registry.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Expand All @@ -57,11 +77,24 @@ impl Poll {
}
}

pub(crate) fn deregister(
#[cfg(windows)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
token: usize,
) -> io::Result<()> {
source: &mut SocketState,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
match self.poll.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Err(e)
}
}
}

#[cfg(unix)]
pub(crate) fn deregister(&mut self, source: &mut impl Source, token: usize) -> io::Result<()> {
match self.poll.registry().deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Expand All @@ -71,6 +104,18 @@ impl Poll {
}
}

#[cfg(windows)]
pub(crate) fn deregister(&mut self, source: &mut SocketState, token: usize) -> io::Result<()> {
match self.poll.deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
}

#[allow(dead_code)]
#[inline]
pub(crate) fn poll_syscall(
&mut self,
Expand Down Expand Up @@ -107,3 +152,35 @@ impl std::os::fd::AsRawFd for Poll {
self.poll.as_raw_fd()
}
}

#[cfg(unix)]
impl Deref for Poll {
type Target = mio::Poll;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for Poll {
#[inline]
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.poll.as_raw_handle()
}
}

#[cfg(windows)]
impl Deref for Poll {
type Target = Poller;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

impl DerefMut for Poll {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.poll
}
}
2 changes: 1 addition & 1 deletion monoio/src/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Ready {
pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED);

#[cfg(windows)]
pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready {
pub(crate) fn from_mio(event: &crate::driver::iocp::Event) -> Ready {
let mut ready = Ready::EMPTY;

if event.is_readable() {
Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::os::windows::io::{
};
use std::{cell::UnsafeCell, io, rc::Rc};

#[cfg(windows)]
use super::legacy::iocp::SocketState as RawFd;
use super::CURRENT;
#[cfg(windows)]
use crate::driver::iocp::SocketState as RawFd;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
Expand Down
9 changes: 6 additions & 3 deletions monoio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub async fn write<P: AsRef<Path>, C: IoBuf>(path: P, contents: C) -> (io::Resul
/// ```
#[cfg(feature = "unlinkat")]
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
Op::unlink(path)?.await.meta.result?;
crate::driver::op::Op::unlink(path)?.await.meta.result?;
Ok(())
}

Expand Down Expand Up @@ -250,7 +250,7 @@ pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// ```
#[cfg(feature = "unlinkat")]
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
Op::rmdir(path)?.await.meta.result?;
crate::driver::op::Op::rmdir(path)?.await.meta.result?;
Ok(())
}

Expand Down Expand Up @@ -281,6 +281,9 @@ pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// ```
#[cfg(feature = "renameat")]
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
Op::rename(from.as_ref(), to.as_ref())?.await.meta.result?;
crate::driver::op::Op::rename(from.as_ref(), to.as_ref())?
.await
.meta
.result?;
Ok(())
}
Loading

0 comments on commit 0849775

Please sign in to comment.