Skip to content

Commit

Permalink
Merge 'Maintain pending io_uring ops count for early return on run_on…
Browse files Browse the repository at this point in the history
…ce' from Arpit Saxena

Add a `pending_ops` field to `InnerLinuxIO` struct which is incremented
for each operation submitted to the ring and decremented when they are
taken off the completion queue. With this, we can exit from run_once if
there are no pending operations. Otherwise, in that case, it would hang
indefinitely due to call of `ring.submit_and_wait(1)`

Closes #349
  • Loading branch information
penberg committed Sep 27, 2024
2 parents 34e6973 + b7debab commit b8dffbf
Showing 1 changed file with 46 additions and 16 deletions.
62 changes: 46 additions & 16 deletions core/io/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ pub struct LinuxIO {
inner: Rc<RefCell<InnerLinuxIO>>,
}

pub struct InnerLinuxIO {
struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
}

struct InnerLinuxIO {
ring: WrappedIOUring,
iovecs: [iovec; MAX_IOVECS],
next_iovec: usize,
}
Expand All @@ -43,7 +48,7 @@ impl LinuxIO {
pub fn new() -> Result<Self> {
let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?;
let inner = InnerLinuxIO {
ring: ring,
ring: WrappedIOUring{ring, pending_ops: 0},
iovecs: [iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
Expand All @@ -66,6 +71,36 @@ impl InnerLinuxIO {
}
}

impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
unsafe {
self.ring.submission()
.push(entry)
.expect("submission queue is full");
}
self.pending_ops += 1;
}

fn wait_for_completion(&mut self) -> Result<()> {
self.ring.submit_and_wait(1)?;
Ok(())
}

fn get_completion(&mut self) -> Option<io_uring::cqueue::Entry> {
// NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators
let entry = self.ring.completion().next();
if entry.is_some() {
// consumed an entry from completion queue, update pending_ops
self.pending_ops -= 1;
}
entry
}

fn empty(&self) -> bool {
self.pending_ops == 0
}
}

impl IO for LinuxIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
Expand All @@ -91,8 +126,13 @@ impl IO for LinuxIO {
trace!("run_once()");
let mut inner = self.inner.borrow_mut();
let ring = &mut inner.ring;
ring.submit_and_wait(1)?;
while let Some(cqe) = ring.completion().next() {

if ring.empty() {
return Ok(())
}

ring.wait_for_completion()?;
while let Some(cqe) = ring.get_completion() {
let result = cqe.result();
if result < 0 {
return Err(LimboError::LinuxIOError(format!(
Expand Down Expand Up @@ -192,12 +232,7 @@ impl File for LinuxFile {
.build()
.user_data(ptr as u64)
};
let ring = &mut io.ring;
unsafe {
ring.submission()
.push(&read_e)
.expect("submission queue is full");
}
io.ring.submit_entry(&read_e);
Ok(())
}

Expand All @@ -218,12 +253,7 @@ impl File for LinuxFile {
.build()
.user_data(ptr as u64)
};
let ring = &mut io.ring;
unsafe {
ring.submission()
.push(&write)
.expect("submission queue is full");
}
io.ring.submit_entry(&write);
Ok(())
}
}
Expand Down

0 comments on commit b8dffbf

Please sign in to comment.