From 0849775347d5fa513674268537f7fc3678e90d3b Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Sat, 1 Mar 2025 08:08:23 +0800 Subject: [PATCH] poll-io support windows --- .github/workflows/ci.sh | 2 +- monoio/src/driver/{legacy => }/iocp/afd.rs | 0 monoio/src/driver/{legacy => }/iocp/core.rs | 0 monoio/src/driver/{legacy => }/iocp/event.rs | 28 ++++++ monoio/src/driver/{legacy => }/iocp/mod.rs | 10 +- monoio/src/driver/{legacy => }/iocp/state.rs | 0 monoio/src/driver/{legacy => }/iocp/waker.rs | 0 monoio/src/driver/legacy/mod.rs | 23 ++--- monoio/src/driver/legacy/waker.rs | 4 +- monoio/src/driver/mod.rs | 6 +- monoio/src/driver/poll.rs | 99 +++++++++++++++++--- monoio/src/driver/ready.rs | 2 +- monoio/src/driver/shared_fd.rs | 4 +- monoio/src/fs/mod.rs | 9 +- monoio/src/utils/ctrlc.rs | 14 +++ monoio/tests/ctrlc_legacy.rs | 5 +- 16 files changed, 166 insertions(+), 40 deletions(-) rename monoio/src/driver/{legacy => }/iocp/afd.rs (100%) rename monoio/src/driver/{legacy => }/iocp/core.rs (100%) rename monoio/src/driver/{legacy => }/iocp/event.rs (83%) rename monoio/src/driver/{legacy => }/iocp/mod.rs (97%) rename monoio/src/driver/{legacy => }/iocp/state.rs (100%) rename monoio/src/driver/{legacy => }/iocp/waker.rs (100%) diff --git a/.github/workflows/ci.sh b/.github/workflows/ci.sh index ab0228b0..4ddcbac6 100644 --- a/.github/workflows/ci.sh +++ b/.github/workflows/ci.sh @@ -9,7 +9,7 @@ if [ "${NO_RUN}" != "1" ] && [ "${NO_RUN}" != "true" ]; then export CARGO_NET_RETRY=5 export CARGO_NET_TIMEOUT=10 - cargo install cross --git "https://github.com/cross-rs/cross" --rev "7b79041c9278769eca57fae10c74741f5aa5c14b" + cargo install cross --git "https://github.com/cross-rs/cross" --rev "4090beca3cfffa44371a5bba524de3a578aa46c3" CARGO=cross cargo clean diff --git a/monoio/src/driver/legacy/iocp/afd.rs b/monoio/src/driver/iocp/afd.rs similarity index 100% rename from monoio/src/driver/legacy/iocp/afd.rs rename to monoio/src/driver/iocp/afd.rs diff --git a/monoio/src/driver/legacy/iocp/core.rs b/monoio/src/driver/iocp/core.rs similarity index 100% rename from monoio/src/driver/legacy/iocp/core.rs rename to monoio/src/driver/iocp/core.rs diff --git a/monoio/src/driver/legacy/iocp/event.rs b/monoio/src/driver/iocp/event.rs similarity index 83% rename from monoio/src/driver/legacy/iocp/event.rs rename to monoio/src/driver/iocp/event.rs index 0f962ff8..b7f0e045 100644 --- a/monoio/src/driver/legacy/iocp/event.rs +++ b/monoio/src/driver/iocp/event.rs @@ -1,3 +1,5 @@ +use std::slice::{Iter, IterMut}; + use mio::Token; use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY; @@ -117,4 +119,30 @@ impl Events { *status = unsafe { std::mem::zeroed() }; } } + + pub fn iter(&self) -> Iter<'_, Event> { + self.events.iter() + } + + pub fn iter_mut(&mut self) -> IterMut<'_, Event> { + self.events.iter_mut() + } +} + +impl<'a> IntoIterator for &'a Events { + type Item = &'a Event; + type IntoIter = Iter<'a, Event>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a> IntoIterator for &'a mut Events { + type Item = &'a mut Event; + type IntoIter = IterMut<'a, Event>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } } diff --git a/monoio/src/driver/legacy/iocp/mod.rs b/monoio/src/driver/iocp/mod.rs similarity index 97% rename from monoio/src/driver/legacy/iocp/mod.rs rename to monoio/src/driver/iocp/mod.rs index 3aad2057..58b8c146 100644 --- a/monoio/src/driver/legacy/iocp/mod.rs +++ b/monoio/src/driver/iocp/mod.rs @@ -7,7 +7,7 @@ mod waker; pub use core::*; use std::{ collections::VecDeque, - os::windows::prelude::RawSocket, + os::windows::prelude::{AsRawHandle, RawHandle, RawSocket}, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -229,7 +229,7 @@ impl Poller { const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; let mut afd_group = self.afd.lock().unwrap(); - if afd_group.len() == 0 { + if afd_group.is_empty() { self._alloc_afd_group(&mut afd_group)?; } else { // + 1 reference in Vec @@ -286,6 +286,12 @@ impl Drop for Poller { } } +impl AsRawHandle for Poller { + fn as_raw_handle(&self) -> RawHandle { + self.cp.as_raw_handle() + } +} + pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin>> { let sock_ptr: *const Mutex = ptr as *const _; unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) } diff --git a/monoio/src/driver/legacy/iocp/state.rs b/monoio/src/driver/iocp/state.rs similarity index 100% rename from monoio/src/driver/legacy/iocp/state.rs rename to monoio/src/driver/iocp/state.rs diff --git a/monoio/src/driver/legacy/iocp/waker.rs b/monoio/src/driver/iocp/waker.rs similarity index 100% rename from monoio/src/driver/legacy/iocp/waker.rs rename to monoio/src/driver/iocp/waker.rs diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 9d0f796e..91b7a908 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -16,10 +16,6 @@ use super::{ }; use crate::utils::slab::Slab; -#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] -#[cfg(windows)] -pub(super) mod iocp; - #[cfg(feature = "sync")] mod waker; #[cfg(feature = "sync")] @@ -32,9 +28,9 @@ pub(crate) struct LegacyInner { #[cfg(unix)] poll: mio::Poll, #[cfg(windows)] - events: iocp::Events, + events: crate::driver::iocp::Events, #[cfg(windows)] - poll: iocp::Poller, + poll: crate::driver::iocp::Poller, #[cfg(feature = "sync")] shared_waker: std::sync::Arc, @@ -69,7 +65,7 @@ impl LegacyDriver { #[cfg(unix)] let poll = mio::Poll::new()?; #[cfg(windows)] - let poll = iocp::Poller::new()?; + let poll = crate::driver::iocp::Poller::new()?; #[cfg(all(unix, feature = "sync"))] let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new( @@ -77,10 +73,9 @@ impl LegacyDriver { TOKEN_WAKEUP, )?)); #[cfg(all(windows, feature = "sync"))] - let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new( - &poll, - TOKEN_WAKEUP, - )?)); + let shared_waker = std::sync::Arc::new(waker::EventWaker::new( + crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?, + )); #[cfg(feature = "sync")] let (waker_sender, waker_receiver) = flume::unbounded::(); #[cfg(feature = "sync")] @@ -93,7 +88,7 @@ impl LegacyDriver { #[cfg(unix)] poll, #[cfg(windows)] - events: iocp::Events::with_capacity(entries as usize), + events: crate::driver::iocp::Events::with_capacity(entries as usize), #[cfg(windows)] poll, #[cfg(feature = "sync")] @@ -178,7 +173,7 @@ impl LegacyDriver { #[cfg(windows)] pub(crate) fn register( this: &Rc>, - state: &mut iocp::SocketState, + state: &mut crate::driver::iocp::SocketState, interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; @@ -198,7 +193,7 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - state: &mut iocp::SocketState, + state: &mut crate::driver::iocp::SocketState, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 40290e96..4cb6d7ea 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -3,7 +3,7 @@ use crate::driver::unpark::Unpark; pub(crate) struct EventWaker { // raw waker #[cfg(windows)] - waker: super::iocp::Waker, + waker: crate::driver::iocp::Waker, #[cfg(unix)] waker: mio::Waker, // Atomic awake status @@ -20,7 +20,7 @@ impl EventWaker { } #[cfg(windows)] - pub(crate) fn new(waker: super::iocp::Waker) -> Self { + pub(crate) fn new(waker: crate::driver::iocp::Waker) -> Self { Self { waker, awake: std::sync::atomic::AtomicBool::new(true), diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index b05d0769..443abff5 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -1,7 +1,7 @@ /// Monoio Driver. #[allow(dead_code)] pub(crate) mod op; -#[cfg(all(feature = "poll-io", unix))] +#[cfg(feature = "poll-io")] pub(crate) mod poll; #[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) mod ready; @@ -17,6 +17,10 @@ mod legacy; #[cfg(all(target_os = "linux", feature = "iouring"))] mod uring; +#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] +#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] +pub(crate) mod iocp; + mod util; use std::{ diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs index b918f95b..2bdfb91f 100644 --- a/monoio/src/driver/poll.rs +++ b/monoio/src/driver/poll.rs @@ -1,14 +1,30 @@ -use std::{io, task::Context, time::Duration}; +use std::{ + io, + ops::{Deref, DerefMut}, + task::Context, + time::Duration, +}; + +#[cfg(unix)] +use mio::{event::Source, Events}; +use mio::{Interest, Token}; use super::{op::MaybeFd, ready::Direction, scheduled_io::ScheduledIo}; +#[cfg(windows)] +use crate::driver::iocp::{Events, Poller, SocketState}; use crate::{driver::op::CompletionMeta, utils::slab::Slab}; /// Poller with io dispatch. -// TODO: replace legacy impl with this Poll. pub(crate) struct Poll { pub(crate) io_dispatch: Slab, + #[cfg(unix)] poll: mio::Poll, - events: mio::Events, + #[cfg(unix)] + events: Events, + #[cfg(windows)] + poll: Poller, + #[cfg(windows)] + events: Events, } impl Poll { @@ -16,8 +32,11 @@ impl Poll { pub(crate) fn with_capacity(capacity: usize) -> io::Result { Ok(Self { io_dispatch: Slab::new(), + #[cfg(unix)] poll: mio::Poll::new()?, - events: mio::Events::with_capacity(capacity), + #[cfg(windows)] + poll: Poller::new()?, + events: Events::with_capacity(capacity), }) } @@ -41,14 +60,15 @@ impl Poll { Ok(()) } + #[cfg(unix)] pub(crate) fn register( &mut self, - source: &mut impl mio::event::Source, - interest: mio::Interest, + source: &mut impl Source, + interest: Interest, ) -> io::Result { let token = self.io_dispatch.insert(ScheduledIo::new()); let registry = self.poll.registry(); - match registry.register(source, mio::Token(token), interest) { + match registry.register(source, Token(token), interest) { Ok(_) => Ok(token), Err(e) => { self.io_dispatch.remove(token); @@ -57,11 +77,24 @@ impl Poll { } } - pub(crate) fn deregister( + #[cfg(windows)] + pub(crate) fn register( &mut self, - source: &mut impl mio::event::Source, - token: usize, - ) -> io::Result<()> { + source: &mut SocketState, + interest: Interest, + ) -> io::Result { + let token = self.io_dispatch.insert(ScheduledIo::new()); + match self.poll.register(source, Token(token), interest) { + Ok(_) => Ok(token), + Err(e) => { + self.io_dispatch.remove(token); + Err(e) + } + } + } + + #[cfg(unix)] + pub(crate) fn deregister(&mut self, source: &mut impl Source, token: usize) -> io::Result<()> { match self.poll.registry().deregister(source) { Ok(_) => { self.io_dispatch.remove(token); @@ -71,6 +104,18 @@ impl Poll { } } + #[cfg(windows)] + pub(crate) fn deregister(&mut self, source: &mut SocketState, token: usize) -> io::Result<()> { + match self.poll.deregister(source) { + Ok(_) => { + self.io_dispatch.remove(token); + Ok(()) + } + Err(e) => Err(e), + } + } + + #[allow(dead_code)] #[inline] pub(crate) fn poll_syscall( &mut self, @@ -107,3 +152,35 @@ impl std::os::fd::AsRawFd for Poll { self.poll.as_raw_fd() } } + +#[cfg(unix)] +impl Deref for Poll { + type Target = mio::Poll; + + fn deref(&self) -> &Self::Target { + &self.poll + } +} + +#[cfg(windows)] +impl std::os::windows::io::AsRawHandle for Poll { + #[inline] + fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { + self.poll.as_raw_handle() + } +} + +#[cfg(windows)] +impl Deref for Poll { + type Target = Poller; + + fn deref(&self) -> &Self::Target { + &self.poll + } +} + +impl DerefMut for Poll { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.poll + } +} diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index df25b8a3..7d067fa5 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -46,7 +46,7 @@ impl Ready { pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); #[cfg(windows)] - pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready { + pub(crate) fn from_mio(event: &crate::driver::iocp::Event) -> Ready { let mut ready = Ready::EMPTY; if event.is_readable() { diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 0a57b799..677d0b7a 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -6,9 +6,9 @@ use std::os::windows::io::{ }; use std::{cell::UnsafeCell, io, rc::Rc}; -#[cfg(windows)] -use super::legacy::iocp::SocketState as RawFd; use super::CURRENT; +#[cfg(windows)] +use crate::driver::iocp::SocketState as RawFd; // Tracks in-flight operations on a file descriptor. Ensures all in-flight // operations complete before submitting the close. diff --git a/monoio/src/fs/mod.rs b/monoio/src/fs/mod.rs index bbe6641c..01de6002 100644 --- a/monoio/src/fs/mod.rs +++ b/monoio/src/fs/mod.rs @@ -219,7 +219,7 @@ pub async fn write, C: IoBuf>(path: P, contents: C) -> (io::Resul /// ``` #[cfg(feature = "unlinkat")] pub async fn remove_file>(path: P) -> io::Result<()> { - Op::unlink(path)?.await.meta.result?; + crate::driver::op::Op::unlink(path)?.await.meta.result?; Ok(()) } @@ -250,7 +250,7 @@ pub async fn remove_file>(path: P) -> io::Result<()> { /// ``` #[cfg(feature = "unlinkat")] pub async fn remove_dir>(path: P) -> io::Result<()> { - Op::rmdir(path)?.await.meta.result?; + crate::driver::op::Op::rmdir(path)?.await.meta.result?; Ok(()) } @@ -281,6 +281,9 @@ pub async fn remove_dir>(path: P) -> io::Result<()> { /// ``` #[cfg(feature = "renameat")] pub async fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> { - Op::rename(from.as_ref(), to.as_ref())?.await.meta.result?; + crate::driver::op::Op::rename(from.as_ref(), to.as_ref())? + .await + .meta + .result?; Ok(()) } diff --git a/monoio/src/utils/ctrlc.rs b/monoio/src/utils/ctrlc.rs index f72b7f21..0e9446c0 100644 --- a/monoio/src/utils/ctrlc.rs +++ b/monoio/src/utils/ctrlc.rs @@ -61,4 +61,18 @@ impl CtrlC { _private: PhantomData, }) } + + /// Ctrl+C to current progress. + pub fn ctrlc() { + let pid = std::process::id() as _; + unsafe { + #[cfg(unix)] + libc::kill(pid, libc::SIGINT); + #[cfg(windows)] + windows_sys::Win32::System::Console::GenerateConsoleCtrlEvent( + windows_sys::Win32::System::Console::CTRL_C_EVENT, + pid, + ); + }; + } } diff --git a/monoio/tests/ctrlc_legacy.rs b/monoio/tests/ctrlc_legacy.rs index 2a0fd53b..ee001624 100644 --- a/monoio/tests/ctrlc_legacy.rs +++ b/monoio/tests/ctrlc_legacy.rs @@ -1,13 +1,12 @@ #[cfg(feature = "signal")] #[monoio::test(driver = "legacy")] async fn test_ctrlc_legacy() { - use libc::{getpid, kill, SIGINT}; use monoio::utils::CtrlC; let c = CtrlC::new().unwrap(); - std::thread::spawn(|| unsafe { + std::thread::spawn(|| { std::thread::sleep(std::time::Duration::from_millis(500)); - kill(getpid(), SIGINT); + CtrlC::ctrlc(); }); c.await;