Skip to content

Commit

Permalink
add cancellation support into io_uring_context
Browse files Browse the repository at this point in the history
* `open_listening_socket`
* `async_read_only_file`
* `async_write_only_file`
  • Loading branch information
janondrusek committed Oct 18, 2023
1 parent 5ce9aeb commit ed0a3b5
Show file tree
Hide file tree
Showing 2 changed files with 382 additions and 6 deletions.
243 changes: 237 additions & 6 deletions include/unifex/linux/io_uring_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ class io_uring_context::read_sender {

void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());

stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_READV;
sqe.fd = fd_;
Expand All @@ -460,9 +461,57 @@ class io_uring_context::read_sender {
}
}

void request_stop() noexcept {
if (char expected = 1; !refCount_.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) {
// lost race with on_read_complete
UNIFEX_ASSERT(expected == 0);
return;
}
if (context_.is_running_on_io_thread()) {
request_stop_local();
} else {
request_stop_remote();
}
}

void request_stop_local() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_ASYNC_CANCEL;
sqe.fd = -1;
sqe.off = 0;
auto op = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));
// sqe.addr is the user_data to look for and cancel
sqe.addr = op;
sqe.len = 0;
auto cop = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(&cop_));
sqe.user_data = cop;
cop_.execute_ = &cancel_operation::on_stop_complete;
};

if (!context_.try_submit_io(populateSqe)) {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_pending_io(&cop_);
}
}

void request_stop_remote() noexcept {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_remote(&cop_);
}

static void on_read_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if (self.refCount_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
// stop callback is running, must complete the op
return;
}
self.stopCallback_.destruct();
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
} else if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(std::move(self.receiver_), ssize_t(self.result_)))) {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} else {
Expand All @@ -481,11 +530,39 @@ class io_uring_context::read_sender {
}
}

struct cancel_operation final : completion_base {
operation& op_;

explicit cancel_operation(operation& op) noexcept : op_(op) {}
// intrusive list breaks if the same operation is submitted twice
// break the cycle: `on_stop_complete` delegates to the parent operation
static void on_stop_complete(operation_base* op) noexcept {
operation::on_read_complete(&static_cast<cancel_operation*>(op)->op_);
}

static void on_schedule_stop_complete(operation_base* op) noexcept {
static_cast<cancel_operation*>(op)->op_.request_stop_local();
}
};

struct cancel_callback final {
operation& op_;

void operator()() noexcept {
op_.request_stop();
}
};

io_uring_context& context_;
int fd_;
offset_t offset_;
iovec buffer_[1];
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
std::atomic_char refCount_{1};
cancel_operation cop_{*this};
};

public:
Expand Down Expand Up @@ -555,7 +632,8 @@ class io_uring_context::write_sender {

void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());

stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_WRITEV;
sqe.fd = fd_;
Expand All @@ -574,9 +652,57 @@ class io_uring_context::write_sender {
}
}

void request_stop() noexcept {
if (char expected = 1; !refCount_.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) {
// lost race with on_write_complete
UNIFEX_ASSERT(expected == 0);
return;
}
if (context_.is_running_on_io_thread()) {
request_stop_local();
} else {
request_stop_remote();
}
}

void request_stop_local() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_ASYNC_CANCEL;
sqe.fd = -1;
sqe.off = 0;
auto op = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));
// sqe.addr is the user_data to look for and cancel
sqe.addr = op;
sqe.len = 0;
auto cop = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(&cop_));
sqe.user_data = cop;
cop_.execute_ = &cancel_operation::on_stop_complete;
};

if (!context_.try_submit_io(populateSqe)) {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_pending_io(&cop_);
}
}

void request_stop_remote() noexcept {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_remote(&cop_);
}

static void on_write_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if (self.refCount_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
// stop callback is running, must complete the op
return;
}
self.stopCallback_.destruct();
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
} else if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(std::move(self.receiver_), ssize_t(self.result_)))) {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} else {
Expand All @@ -595,11 +721,39 @@ class io_uring_context::write_sender {
}
}

struct cancel_operation final : completion_base {
operation& op_;

explicit cancel_operation(operation& op) noexcept : op_(op) {}
// intrusive list breaks if the same operation is submitted twice
// break the cycle: `on_stop_complete` delegates to the parent operation
static void on_stop_complete(operation_base* op) noexcept {
operation::on_write_complete(&static_cast<cancel_operation*>(op)->op_);
}

static void on_schedule_stop_complete(operation_base* op) noexcept {
static_cast<cancel_operation*>(op)->op_.request_stop_local();
}
};

struct cancel_callback final {
operation& op_;

void operator()() noexcept {
op_.request_stop();
}
};

io_uring_context& context_;
int fd_;
offset_t offset_;
iovec buffer_[1];
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
std::atomic_char refCount_{1};
cancel_operation cop_{*this};
};

public:
Expand Down Expand Up @@ -989,7 +1143,8 @@ class io_uring_context::accept_sender {

void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());

stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
auto populateSqe = [this](io_uring_sqe& sqe) noexcept {
sqe.opcode = IORING_OP_ACCEPT;
sqe.accept_flags = SOCK_NONBLOCK;
Expand All @@ -1007,9 +1162,57 @@ class io_uring_context::accept_sender {
}
}

void request_stop() noexcept {
if (char expected = 1; !refCount_.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) {
// lost race with on_accept
UNIFEX_ASSERT(expected == 0);
return;
}
if (context_.is_running_on_io_thread()) {
request_stop_local();
} else {
request_stop_remote();
}
}

void request_stop_local() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_ASYNC_CANCEL;
sqe.fd = -1;
sqe.off = 0;
auto op = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));
// sqe.addr is the user_data to look for and cancel
sqe.addr = op;
sqe.len = 0;
auto cop = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(&cop_));
sqe.user_data = cop;
cop_.execute_ = &cancel_operation::on_stop_complete;
};

if (!context_.try_submit_io(populateSqe)) {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_pending_io(&cop_);
}
}

void request_stop_remote() noexcept {
cop_.execute_ = &cancel_operation::on_schedule_stop_complete;
context_.schedule_remote(&cop_);
}

static void on_accept(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if (self.refCount_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
// stop callback is running, must complete the op
return;
}
self.stopCallback_.destruct();
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
} else if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(
std::move(self.receiver_), async_read_write_file{self.context_, self.result_}))) {
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
Expand All @@ -1031,9 +1234,37 @@ class io_uring_context::accept_sender {
}
}

struct cancel_operation final : completion_base {
operation& op_;

explicit cancel_operation(operation& op) noexcept : op_(op) {}
// intrusive list breaks if the same operation is submitted twice
// break the cycle: `on_stop_complete` delegates to the parent operation
static void on_stop_complete(operation_base* op) noexcept {
operation::on_accept(&static_cast<cancel_operation*>(op)->op_);
}

static void on_schedule_stop_complete(operation_base* op) noexcept {
static_cast<cancel_operation*>(op)->op_.request_stop_local();
}
};

struct cancel_callback final {
operation& op_;

void operator()() noexcept {
op_.request_stop();
}
};

io_uring_context& context_;
int fd_;
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
std::atomic_char refCount_{1};
cancel_operation cop_{*this};
};

public:
Expand Down
Loading

0 comments on commit ed0a3b5

Please sign in to comment.