From a7d735d368c07aa89568b453f764e1cbd680d24f Mon Sep 17 00:00:00 2001 From: Arpit Saxena Date: Thu, 26 Sep 2024 20:17:29 +0530 Subject: [PATCH 1/2] Maintain pending io_uring ops count for early return on run_once Add a `pending_ops` field to `InnerLinuxIO` struct which is incremented for each operation submitted to the ring and decremented when they are taken off the completion queue. With this, we can exit from run_once if there are no pending operations. Otherwise, in that case, it would hang indefinitely due to call of `ring.submit_and_wait(1)` --- core/io/linux.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/io/linux.rs b/core/io/linux.rs index 48342e71b..a23ab5a2f 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -3,7 +3,7 @@ use crate::{LimboError, Result}; use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; -use std::cell::RefCell; +use std::cell::{RefCell, RefMut}; use std::fmt; use std::os::unix::io::AsRawFd; use std::rc::Rc; @@ -37,6 +37,7 @@ pub struct InnerLinuxIO { ring: io_uring::IoUring, iovecs: [iovec; MAX_IOVECS], next_iovec: usize, + pending_ops: usize, } impl LinuxIO { @@ -49,6 +50,7 @@ impl LinuxIO { iov_len: 0, }; MAX_IOVECS], next_iovec: 0, + pending_ops: 0, }; Ok(Self { inner: Rc::new(RefCell::new(inner)), @@ -89,10 +91,15 @@ impl IO for LinuxIO { fn run_once(&self) -> Result<()> { trace!("run_once()"); - let mut inner = self.inner.borrow_mut(); - let ring = &mut inner.ring; + let inner = self.inner.borrow_mut(); + let (mut pending_ops, mut ring) = RefMut::map_split(inner, |inner_ref: &mut InnerLinuxIO| (&mut inner_ref.pending_ops, &mut inner_ref.ring)); + if *pending_ops == 0 { + return Ok(()); + } + ring.submit_and_wait(1)?; while let Some(cqe) = ring.completion().next() { + *pending_ops -= 1; let result = cqe.result(); if result < 0 { return Err(LimboError::LinuxIOError(format!( @@ -198,6 +205,7 @@ impl File for LinuxFile { .push(&read_e) .expect("submission queue is full"); } + io.pending_ops += 1; Ok(()) } @@ -224,6 +232,7 @@ impl File for LinuxFile { .push(&write) .expect("submission queue is full"); } + io.pending_ops += 1; Ok(()) } } From b7debabd817d60e71f322ceffc7377555f6b9a86 Mon Sep 17 00:00:00 2001 From: Arpit Saxena Date: Sat, 28 Sep 2024 00:10:05 +0530 Subject: [PATCH 2/2] Wrap IoUring to ensure pending_ops is always correctly updated Adds a struct WrappedIOUring which contains a IoUring and a pending_ops field. Entry submission and popping from the queue is done through functions operating on it, which also maintains pending_ops count NOTE: This is a bit weird/hacky since in get_completion we create a CompletionQueue and just call its next(). If it were a normal iterator it would always return the same first item. However it is a queue posing as an iterator which makes this work. --- core/io/linux.rs | 73 +++++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/core/io/linux.rs b/core/io/linux.rs index a23ab5a2f..3542e2870 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -3,7 +3,7 @@ use crate::{LimboError, Result}; use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; -use std::cell::{RefCell, RefMut}; +use std::cell::RefCell; use std::fmt; use std::os::unix::io::AsRawFd; use std::rc::Rc; @@ -33,24 +33,27 @@ pub struct LinuxIO { inner: Rc>, } -pub struct InnerLinuxIO { +struct WrappedIOUring { ring: io_uring::IoUring, + pending_ops: usize, +} + +struct InnerLinuxIO { + ring: WrappedIOUring, iovecs: [iovec; MAX_IOVECS], next_iovec: usize, - pending_ops: usize, } impl LinuxIO { pub fn new() -> Result { let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?; let inner = InnerLinuxIO { - ring: ring, + ring: WrappedIOUring{ring, pending_ops: 0}, iovecs: [iovec { iov_base: std::ptr::null_mut(), iov_len: 0, }; MAX_IOVECS], next_iovec: 0, - pending_ops: 0, }; Ok(Self { inner: Rc::new(RefCell::new(inner)), @@ -68,6 +71,36 @@ impl InnerLinuxIO { } } +impl WrappedIOUring { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + unsafe { + self.ring.submission() + .push(entry) + .expect("submission queue is full"); + } + self.pending_ops += 1; + } + + fn wait_for_completion(&mut self) -> Result<()> { + self.ring.submit_and_wait(1)?; + Ok(()) + } + + fn get_completion(&mut self) -> Option { + // NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators + let entry = self.ring.completion().next(); + if entry.is_some() { + // consumed an entry from completion queue, update pending_ops + self.pending_ops -= 1; + } + entry + } + + fn empty(&self) -> bool { + self.pending_ops == 0 + } +} + impl IO for LinuxIO { fn open_file(&self, path: &str) -> Result> { trace!("open_file(path = {})", path); @@ -91,15 +124,15 @@ impl IO for LinuxIO { fn run_once(&self) -> Result<()> { trace!("run_once()"); - let inner = self.inner.borrow_mut(); - let (mut pending_ops, mut ring) = RefMut::map_split(inner, |inner_ref: &mut InnerLinuxIO| (&mut inner_ref.pending_ops, &mut inner_ref.ring)); - if *pending_ops == 0 { - return Ok(()); + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; + + if ring.empty() { + return Ok(()) } - ring.submit_and_wait(1)?; - while let Some(cqe) = ring.completion().next() { - *pending_ops -= 1; + ring.wait_for_completion()?; + while let Some(cqe) = ring.get_completion() { let result = cqe.result(); if result < 0 { return Err(LimboError::LinuxIOError(format!( @@ -199,13 +232,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let ring = &mut io.ring; - unsafe { - ring.submission() - .push(&read_e) - .expect("submission queue is full"); - } - io.pending_ops += 1; + io.ring.submit_entry(&read_e); Ok(()) } @@ -226,13 +253,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let ring = &mut io.ring; - unsafe { - ring.submission() - .push(&write) - .expect("submission queue is full"); - } - io.pending_ops += 1; + io.ring.submit_entry(&write); Ok(()) } }