diff --git a/folly/experimental/io/IoUring.cpp b/folly/experimental/io/IoUring.cpp index 6274f8a81a8..9b68fdec466 100644 --- a/folly/experimental/io/IoUring.cpp +++ b/folly/experimental/io/IoUring.cpp @@ -113,7 +113,8 @@ void toStream(std::ostream& os, const struct io_uring_sqe& sqe) { namespace folly { -IoUringOp::IoUringOp(NotificationCallback cb) : AsyncBaseOp(std::move(cb)) {} +IoUringOp::IoUringOp(NotificationCallback cb, Options options) + : AsyncBaseOp(std::move(cb)), options_(options) {} void IoUringOp::reset(NotificationCallback cb) { CHECK_NE(state_, State::PENDING); @@ -128,49 +129,49 @@ void IoUringOp::pread(int fd, void* buf, size_t size, off_t start) { init(); iov_[0].iov_base = buf; iov_[0].iov_len = size; - io_uring_prep_readv(&sqe_, fd, iov_, 1, start); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_readv(&sqe_.sqe, fd, iov_, 1, start); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) { init(); - io_uring_prep_readv(&sqe_, fd, iov, iovcnt, start); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_readv(&sqe_.sqe, fd, iov, iovcnt, start); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::pread( int fd, void* buf, size_t size, off_t start, int buf_index) { init(); - io_uring_prep_read_fixed(&sqe_, fd, buf, size, start, buf_index); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_read_fixed(&sqe_.sqe, fd, buf, size, start, buf_index); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::pwrite(int fd, const void* buf, size_t size, off_t start) { init(); iov_[0].iov_base = const_cast(buf); iov_[0].iov_len = size; - io_uring_prep_writev(&sqe_, fd, iov_, 1, start); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_writev(&sqe_.sqe, fd, iov_, 1, start); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) { init(); - io_uring_prep_writev(&sqe_, fd, iov, iovcnt, start); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_writev(&sqe_.sqe, fd, iov, iovcnt, start); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::pwrite( int fd, const void* buf, size_t size, off_t start, int buf_index) { init(); - io_uring_prep_write_fixed(&sqe_, fd, buf, size, start, buf_index); - io_uring_sqe_set_data(&sqe_, this); + io_uring_prep_write_fixed(&sqe_.sqe, fd, buf, size, start, buf_index); + io_uring_sqe_set_data(&sqe_.sqe, this); } void IoUringOp::toStream(std::ostream& os) const { - os << "{" << state_ << ", "; + os << "{" << state_ << ", [" << getSqeSize() << "], "; if (state_ != AsyncBaseOp::State::UNINITIALIZED) { - ::toStream(os, sqe_); + ::toStream(os, sqe_.sqe); } if (state_ == AsyncBaseOp::State::COMPLETED) { @@ -189,12 +190,25 @@ std::ostream& operator<<(std::ostream& os, const IoUringOp& op) { return os; } -IoUring::IoUring(size_t capacity, PollMode pollMode, size_t maxSubmit) +IoUring::IoUring( + size_t capacity, + PollMode pollMode, + size_t maxSubmit, + IoUringOp::Options options) : AsyncBase(capacity, pollMode), - maxSubmit_((maxSubmit <= capacity) ? maxSubmit : capacity) { + maxSubmit_((maxSubmit <= capacity) ? maxSubmit : capacity), + options_(options) { ::memset(&ioRing_, 0, sizeof(ioRing_)); ::memset(¶ms_, 0, sizeof(params_)); + if (options_.sqe128) { + params_.flags |= IORING_SETUP_SQE128; + } + + if (options.cqe32) { + params_.flags |= IORING_SETUP_CQE32; + } + params_.flags |= IORING_SETUP_CQSIZE; params_.cq_entries = roundUpToNextPowerOfTwo(capacity_); } @@ -259,13 +273,18 @@ int IoUring::submitOne(AsyncBase::Op* op) { return -1; } + // we require same options for both the IoUringOp and the IoUring instance + if (iop->getOptions() != getOptions()) { + return -1; + } + SharedMutex::WriteHolder lk(submitMutex_); auto* sqe = io_uring_get_sqe(&ioRing_); if (!sqe) { return -1; } - *sqe = iop->getSqe(); + ::memcpy(sqe, &iop->getSqe(), iop->getSqeSize()); return io_uring_submit(&ioRing_); } @@ -280,12 +299,16 @@ int IoUring::submitRange(Range ops) { continue; } + if (iop->getOptions() != getOptions()) { + continue; + } + auto* sqe = io_uring_get_sqe(&ioRing_); if (!sqe) { break; } - *sqe = iop->getSqe(); + ::memcpy(sqe, &iop->getSqe(), iop->getSqeSize()); ++num; if (num % maxSubmit_ == 0 || (i + 1 == ops.size())) { auto ret = io_uring_submit(&ioRing_); @@ -315,6 +338,7 @@ Range IoUring::doWait( Op* op = reinterpret_cast(io_uring_cqe_get_data(cqe)); CHECK(op); auto res = cqe->res; + op->setCqe(cqe); io_uring_cqe_seen(&ioRing_, cqe); decrementPending(); switch (type) { diff --git a/folly/experimental/io/IoUring.h b/folly/experimental/io/IoUring.h index 515e5d10f35..9d197cbbe1e 100644 --- a/folly/experimental/io/IoUring.h +++ b/folly/experimental/io/IoUring.h @@ -37,7 +37,19 @@ class IoUringOp : public AsyncBaseOp { friend std::ostream& operator<<(std::ostream& stream, const IoUringOp& o); public: - explicit IoUringOp(NotificationCallback cb = NotificationCallback()); + struct Options { + Options() : sqe128(false), cqe32(false) {} + bool sqe128; + bool cqe32; + + bool operator==(const Options& options) const { + return sqe128 == options.sqe128 && cqe32 == options.cqe32; + } + }; + + IoUringOp( + NotificationCallback cb = NotificationCallback(), + Options options = Options()); IoUringOp(const IoUringOp&) = delete; IoUringOp& operator=(const IoUringOp&) = delete; ~IoUringOp() override; @@ -66,10 +78,47 @@ class IoUringOp : public AsyncBaseOp { void toStream(std::ostream& os) const override; - const struct io_uring_sqe& getSqe() const { return sqe_; } + void initBase() { init(); } + + struct io_uring_sqe& getSqe() { + return sqe_.sqe; + } + + size_t getSqeSize() const { + return options_.sqe128 ? 128 : sizeof(struct io_uring_sqe); + } + + const struct io_uring_cqe& getCqe() const { + return *reinterpret_cast(&cqe_); + } + + size_t getCqeSize() const { + return options_.cqe32 ? 32 : sizeof(struct io_uring_cqe); + } + + void setCqe(const struct io_uring_cqe* cqe) { + ::memcpy(&cqe_, cqe, getCqeSize()); + } + + const Options& getOptions() const { return options_; } private: - struct io_uring_sqe sqe_; + Options options_; + + // we use unions with the largest size to avoid + // indidual allocations for the sqe/cqe + union { + struct io_uring_sqe sqe; + uint8_t data[128]; + } sqe_; + + // we have to use a union here because of -Wgnu-variable-sized-type-not-at-end + //__u64 big_cqe[]; + union { + __u64 user_data; // first member from from io_uring_cqe + uint8_t data[32]; + } cqe_; + struct iovec iov_[1]; }; @@ -88,13 +137,18 @@ class IoUring : public AsyncBase { * The default IORING_MAX_ENTRIES value is usually 32K. */ explicit IoUring( - size_t capacity, PollMode pollMode = NOT_POLLABLE, size_t maxSubmit = 1); + size_t capacity, + PollMode pollMode = NOT_POLLABLE, + size_t maxSubmit = 1, + IoUringOp::Options options = IoUringOp::Options()); IoUring(const IoUring&) = delete; IoUring& operator=(const IoUring&) = delete; ~IoUring() override; static bool isAvailable(); + const IoUringOp::Options& getOptions() const { return options_; } + int register_buffers(const struct iovec* iovecs, unsigned int nr_iovecs); int unregister_buffers(); @@ -113,6 +167,7 @@ class IoUring : public AsyncBase { std::vector& result) override; size_t maxSubmit_; + IoUringOp::Options options_; struct io_uring_params params_; struct io_uring ioRing_; SharedMutex submitMutex_;