Skip to content

Commit

Permalink
made a start on fleshing out the processing of the operation. #30
Browse files Browse the repository at this point in the history
  • Loading branch information
JackKelly committed Jan 31, 2024
1 parent 649f8a0 commit 7ab6b44
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
12 changes: 8 additions & 4 deletions src/io_uring_local.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use bytes::Bytes;
use object_store::{path::Path, Result};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
use url::Url;

use crate::io_uring_thread::WorkerThread;
use crate::io_uring_thread::{WorkerThread, worker_thread_func};
use crate::operation_future::{Operation, OperationFuture};

#[derive(Debug)]
Expand Down Expand Up @@ -33,14 +35,16 @@ impl Default for IoUringLocal {
impl IoUringLocal {
/// Create new filesystem storage with no prefix
pub fn new() -> Self {
// TODO: Set up thread and Sender!
let (tx, rx) = channel();
let thread_handle = thread::spawn(move || worker_thread_func(rx));

Self {
config: Arc::new(Config {
root: Url::parse("file:///").unwrap(),
}),
worker_thread: WorkerThread {
handle: todo!(),
sender: todo!(),
handle: thread_handle,
sender: tx,
},
}
}
Expand Down
85 changes: 83 additions & 2 deletions src/io_uring_thread.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,93 @@
use std::{
sync::{mpsc::Sender, Arc},
sync::{mpsc::{Receiver, Sender}, Arc},
thread::JoinHandle,
};
use io_uring::{cqueue, opcode, types, IoUring};

use crate::operation_future::SharedState;
use crate::operation_future::{Operation, SharedState};

#[derive(Debug)]
pub(crate) struct WorkerThread {
pub(crate) handle: JoinHandle<()>,
pub(crate) sender: Sender<Arc<SharedState>>, // Channel to send ops to the worker thread
}

pub(crate) fn worker_thread_func(rx: Receiver<Arc<SharedState>>) {
const CQ_RING_SIZE: u32 = 16; // TODO: This should be user-configurable.
let mut ring = IoUring::new(CQ_RING_SIZE).unwrap();
let mut n_tasks_in_flight_in_io_uring: u32 = 0;

loop {
// Keep io_uring's submission queue topped up:
// `try_iter` "will return None if there are no more pending values or if the channel has
// hung up. The iterator will never panic! or block"
for shared_state in rx.try_iter() {
if n_tasks_in_flight_in_io_uring >= CQ_RING_SIZE {
break;
}

let op = shared_state.get_operation();
let entry = op.prepare_io_uring_entry();
println!("Submitting op={:?}", op);
submit_to_io_uring(entry, &mut ring);

// Increment counters
n_tasks_in_flight_in_io_uring += 1;
}

ring.submit_and_wait(1).unwrap(); // TODO: Handle error!

println!("After ring.submit_and_wait");

// Spawn tasks to the Rayon ThreadPool to process data:
for cqe in ring.completion() {
n_tasks_in_flight_in_io_uring -= 1;
todo!(); // TODO: Get the associated Future and `set_result_and_wake()`
}
}
}

fn submit_operation_to_io_uring(op: Operation, ring: &mut IoUring) {
// TODO: Open file using io_uring. See issue #1
let fd = fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_DIRECT)
.open(task)
.unwrap();

// Save information about this task in an OperationDescriptor on the heap,
// so the processing thread can access this information later.
// Later, we'll get a raw pointer to this OperationDescriptor, and pass this raw pointer
// through to the worker thread, via io_uring's `user_data` (which is what `user_data`
// is mostly intended for, according to the `io_uring` docs). We get a raw pointer by calling
// `into_raw()`, which consumes the OperationDescriptor but doesn't de-allocated it, which is exactly
// what we want. We want ownership of the OperationDescriptor to "tunnel through" io_uring.
// Rust will guarantee that we can't touch the buffer until it re-emerges from io_uring.
// And we want Rayon's worker thread (that processes the CQE) to decide whether
// to drop the buffer (after moving data elsewhere) or keep the buffer
// (if we're passing the buffer back to the user).
let mut op_descriptor = Box::new(OperationDescriptor {
// TODO: Allocate the correct sized buffer for the task.
// Or don't allocate at all, if the user has already allocated.
buf: vec![0u8; 1024],
path: task.clone(),
task_i,
fd,
});

// Note that the developer needs to ensure
// that the entry pushed into submission queue is valid (e.g. fd, buffer).
let read_e = opcode::Read::new(
types::Fd(op_descriptor.fd.as_raw_fd()),
op_descriptor.buf.as_mut_ptr(),
op_descriptor.buf.len() as _,
)
.build()
.user_data(Box::into_raw(op_descriptor) as u64);

unsafe {
ring.submission()
.push(&read_e)
.expect("submission queue full")
}
}
18 changes: 18 additions & 0 deletions src/operation_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ pub(crate) enum Operation {
Get { location: Path },
}

pub(crate) enum PreparedEntry {
Get{
operation: Operation,
entry: Entry,
buffer: &[u8],
},
}

impl Operation {
pub(crate) fn prepare_io_uring_entry(&self) -> PreparedEntry {
match self {
Get => {
todo!();
}
}
}
}

#[derive(Debug)]
pub(crate) struct OperationFuture {
pub(crate) shared_state: Arc<SharedState>,
Expand Down

0 comments on commit 7ab6b44

Please sign in to comment.