Skip to content

Commit

Permalink
Add support for IORING_SETUP_SQE128, IORING_SETUP_CQE32
Browse files Browse the repository at this point in the history
Summary: Add support for IORING_SETUP_SQE128, IORING_SETUP_CQE32

Reviewed By: jaesoo-fb

Differential Revision: D49178424

fbshipit-source-id: df08ccef2c11ed5e225c917660a2a718094b8212
  • Loading branch information
Dan Melnic authored and facebook-github-bot committed Sep 13, 2023
1 parent 1385306 commit d85a0f6
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 23 deletions.
62 changes: 43 additions & 19 deletions folly/experimental/io/IoUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ void toStream(std::ostream& os, const struct io_uring_sqe& sqe) {

namespace folly {

IoUringOp::IoUringOp(NotificationCallback cb) : AsyncBaseOp(std::move(cb)) {}
IoUringOp::IoUringOp(NotificationCallback cb, Options options)
: AsyncBaseOp(std::move(cb)), options_(options) {}

void IoUringOp::reset(NotificationCallback cb) {
CHECK_NE(state_, State::PENDING);
Expand All @@ -128,49 +129,49 @@ void IoUringOp::pread(int fd, void* buf, size_t size, off_t start) {
init();
iov_[0].iov_base = buf;
iov_[0].iov_len = size;
io_uring_prep_readv(&sqe_, fd, iov_, 1, start);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_readv(&sqe_.sqe, fd, iov_, 1, start);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_uring_prep_readv(&sqe_, fd, iov, iovcnt, start);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_readv(&sqe_.sqe, fd, iov, iovcnt, start);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::pread(
int fd, void* buf, size_t size, off_t start, int buf_index) {
init();
io_uring_prep_read_fixed(&sqe_, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_read_fixed(&sqe_.sqe, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
init();
iov_[0].iov_base = const_cast<void*>(buf);
iov_[0].iov_len = size;
io_uring_prep_writev(&sqe_, fd, iov_, 1, start);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_writev(&sqe_.sqe, fd, iov_, 1, start);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_uring_prep_writev(&sqe_, fd, iov, iovcnt, start);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_writev(&sqe_.sqe, fd, iov, iovcnt, start);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::pwrite(
int fd, const void* buf, size_t size, off_t start, int buf_index) {
init();
io_uring_prep_write_fixed(&sqe_, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_, this);
io_uring_prep_write_fixed(&sqe_.sqe, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_.sqe, this);
}

void IoUringOp::toStream(std::ostream& os) const {
os << "{" << state_ << ", ";
os << "{" << state_ << ", [" << getSqeSize() << "], ";

if (state_ != AsyncBaseOp::State::UNINITIALIZED) {
::toStream(os, sqe_);
::toStream(os, sqe_.sqe);
}

if (state_ == AsyncBaseOp::State::COMPLETED) {
Expand All @@ -189,12 +190,25 @@ std::ostream& operator<<(std::ostream& os, const IoUringOp& op) {
return os;
}

IoUring::IoUring(size_t capacity, PollMode pollMode, size_t maxSubmit)
IoUring::IoUring(
size_t capacity,
PollMode pollMode,
size_t maxSubmit,
IoUringOp::Options options)
: AsyncBase(capacity, pollMode),
maxSubmit_((maxSubmit <= capacity) ? maxSubmit : capacity) {
maxSubmit_((maxSubmit <= capacity) ? maxSubmit : capacity),
options_(options) {
::memset(&ioRing_, 0, sizeof(ioRing_));
::memset(&params_, 0, sizeof(params_));

if (options_.sqe128) {
params_.flags |= IORING_SETUP_SQE128;
}

if (options.cqe32) {
params_.flags |= IORING_SETUP_CQE32;
}

params_.flags |= IORING_SETUP_CQSIZE;
params_.cq_entries = roundUpToNextPowerOfTwo(capacity_);
}
Expand Down Expand Up @@ -259,13 +273,18 @@ int IoUring::submitOne(AsyncBase::Op* op) {
return -1;
}

// we require same options for both the IoUringOp and the IoUring instance
if (iop->getOptions() != getOptions()) {
return -1;
}

SharedMutex::WriteHolder lk(submitMutex_);
auto* sqe = io_uring_get_sqe(&ioRing_);
if (!sqe) {
return -1;
}

*sqe = iop->getSqe();
::memcpy(sqe, &iop->getSqe(), iop->getSqeSize());

return io_uring_submit(&ioRing_);
}
Expand All @@ -280,12 +299,16 @@ int IoUring::submitRange(Range<AsyncBase::Op**> ops) {
continue;
}

if (iop->getOptions() != getOptions()) {
continue;
}

auto* sqe = io_uring_get_sqe(&ioRing_);
if (!sqe) {
break;
}

*sqe = iop->getSqe();
::memcpy(sqe, &iop->getSqe(), iop->getSqeSize());
++num;
if (num % maxSubmit_ == 0 || (i + 1 == ops.size())) {
auto ret = io_uring_submit(&ioRing_);
Expand Down Expand Up @@ -315,6 +338,7 @@ Range<AsyncBase::Op**> IoUring::doWait(
Op* op = reinterpret_cast<Op*>(io_uring_cqe_get_data(cqe));
CHECK(op);
auto res = cqe->res;
op->setCqe(cqe);
io_uring_cqe_seen(&ioRing_, cqe);
decrementPending();
switch (type) {
Expand Down
63 changes: 59 additions & 4 deletions folly/experimental/io/IoUring.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,19 @@ class IoUringOp : public AsyncBaseOp {
friend std::ostream& operator<<(std::ostream& stream, const IoUringOp& o);

public:
explicit IoUringOp(NotificationCallback cb = NotificationCallback());
struct Options {
Options() : sqe128(false), cqe32(false) {}
bool sqe128;
bool cqe32;

bool operator==(const Options& options) const {
return sqe128 == options.sqe128 && cqe32 == options.cqe32;
}
};

IoUringOp(
NotificationCallback cb = NotificationCallback(),
Options options = Options());
IoUringOp(const IoUringOp&) = delete;
IoUringOp& operator=(const IoUringOp&) = delete;
~IoUringOp() override;
Expand Down Expand Up @@ -66,10 +78,47 @@ class IoUringOp : public AsyncBaseOp {

void toStream(std::ostream& os) const override;

const struct io_uring_sqe& getSqe() const { return sqe_; }
void initBase() { init(); }

struct io_uring_sqe& getSqe() {
return sqe_.sqe;
}

size_t getSqeSize() const {
return options_.sqe128 ? 128 : sizeof(struct io_uring_sqe);
}

const struct io_uring_cqe& getCqe() const {
return *reinterpret_cast<const struct io_uring_cqe*>(&cqe_);
}

size_t getCqeSize() const {
return options_.cqe32 ? 32 : sizeof(struct io_uring_cqe);
}

void setCqe(const struct io_uring_cqe* cqe) {
::memcpy(&cqe_, cqe, getCqeSize());
}

const Options& getOptions() const { return options_; }

private:
struct io_uring_sqe sqe_;
Options options_;

// we use unions with the largest size to avoid
// indidual allocations for the sqe/cqe
union {
struct io_uring_sqe sqe;
uint8_t data[128];
} sqe_;

// we have to use a union here because of -Wgnu-variable-sized-type-not-at-end
//__u64 big_cqe[];
union {
__u64 user_data; // first member from from io_uring_cqe
uint8_t data[32];
} cqe_;

struct iovec iov_[1];
};

Expand All @@ -88,13 +137,18 @@ class IoUring : public AsyncBase {
* The default IORING_MAX_ENTRIES value is usually 32K.
*/
explicit IoUring(
size_t capacity, PollMode pollMode = NOT_POLLABLE, size_t maxSubmit = 1);
size_t capacity,
PollMode pollMode = NOT_POLLABLE,
size_t maxSubmit = 1,
IoUringOp::Options options = IoUringOp::Options());
IoUring(const IoUring&) = delete;
IoUring& operator=(const IoUring&) = delete;
~IoUring() override;

static bool isAvailable();

const IoUringOp::Options& getOptions() const { return options_; }

int register_buffers(const struct iovec* iovecs, unsigned int nr_iovecs);

int unregister_buffers();
Expand All @@ -113,6 +167,7 @@ class IoUring : public AsyncBase {
std::vector<AsyncBase::Op*>& result) override;

size_t maxSubmit_;
IoUringOp::Options options_;
struct io_uring_params params_;
struct io_uring ioRing_;
SharedMutex submitMutex_;
Expand Down

0 comments on commit d85a0f6

Please sign in to comment.