Skip to content

Commit

Permalink
added coawait error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwens committed Oct 28, 2024
1 parent 9ef7ef9 commit de73b26
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 38 deletions.
80 changes: 47 additions & 33 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ void spdk_nvme_zone_append_wrapper(
struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags,
std::move_only_function<void(const spdk_nvme_cpl *completion)> cb)
std::move_only_function<void(Result<const spdk_nvme_cpl *>)> cb)
{
// log_debug("APPEND: offset {}, size {}", offset, size);
auto cb_heap = new decltype(cb)(std::move(cb));
auto fn = new std::move_only_function<void(void)>([=]() {
// rc = spdk_nvme_zns_zone_append(entry->nvme.ns, worker->ns_ctx->qpair,
// task->buf, zslba,
// entry->io_size_blocks, io_complete,
// task, 0);

int rc = spdk_nvme_zns_zone_append(
ns, qpair, data, offset, size,
[](void *arg, const spdk_nvme_cpl *completion) mutable {
Expand All @@ -33,7 +28,10 @@ void spdk_nvme_zone_append_wrapper(
delete cb3;
},
(void *)(cb_heap), flags);
assert(rc == 0);
if (rc != 0) {
(*cb_heap)(outcome::failure(std::errc::io_error));
delete cb_heap;
}
});
thread_send_msg(
thread,
Expand All @@ -48,7 +46,7 @@ void spdk_nvme_zone_append_wrapper(
auto spdk_nvme_zone_append_async(
struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags) -> net::awaitable<const spdk_nvme_cpl *>
uint32_t flags) -> net::awaitable<Result<const spdk_nvme_cpl *>>
{
// log_debug("1111");
auto init = [](auto completion_handler, spdk_thread *thread,
Expand All @@ -59,7 +57,7 @@ auto spdk_nvme_zone_append_async(
};

return net::async_initiate<decltype(net::use_awaitable),
void(const spdk_nvme_cpl *)>(
void(Result<const spdk_nvme_cpl *>)>(
init, net::use_awaitable, thread, ns, qpair, data, offset, size, flags);
}

Expand All @@ -68,11 +66,11 @@ void spdk_nvme_zone_append_wrapper_inst(
Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags,
std::move_only_function<void(const spdk_nvme_cpl *completion)> cb)
std::move_only_function<void(Result<const spdk_nvme_cpl *>)> cb)
{
auto cb_heap = new std::pair<
std::move_only_function<void(const spdk_nvme_cpl *completion)>,
Timer &>(decltype(cb)(std::move(cb)), timer);
std::move_only_function<void(Result<const spdk_nvme_cpl *>)>, Timer &>(
decltype(cb)(std::move(cb)), timer);
auto fn = new std::move_only_function<void(void)>([=, &timer]() {
timer.t4 = std::chrono::high_resolution_clock::now();
int rc = spdk_nvme_zns_zone_append(
Expand All @@ -85,7 +83,10 @@ void spdk_nvme_zone_append_wrapper_inst(
delete tuple_;
},
(void *)(cb_heap), flags);
// assert(rc == 0);
if (rc != 0) {
(cb_heap->first)(outcome::failure(std::errc::io_error));
delete cb_heap;
}
});
thread_send_msg(
thread,
Expand All @@ -100,7 +101,7 @@ void spdk_nvme_zone_append_wrapper_inst(
auto spdk_nvme_zone_append_async_inst(
Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags) -> net::awaitable<const spdk_nvme_cpl *>
uint32_t flags) -> net::awaitable<Result<const spdk_nvme_cpl *>>
{
timer.t2 = std::chrono::high_resolution_clock::now();
auto init = [](auto completion_handler, Timer *timer, spdk_thread *thread,
Expand All @@ -113,7 +114,7 @@ auto spdk_nvme_zone_append_async_inst(
};

return net::async_initiate<decltype(net::use_awaitable),
void(const spdk_nvme_cpl *)>(
void(Result<const spdk_nvme_cpl *>)>(
init, net::use_awaitable, &timer, thread, ns, qpair, data, offset, size,
flags);
}
Expand All @@ -122,16 +123,16 @@ auto zoneAppend(void *arg1) -> net::awaitable<void>
{
RequestContext *ctx = reinterpret_cast<RequestContext *>(arg1);
auto ioCtx = ctx->ioContext;
int rc = 0;
assert(ctx->ctrl != nullptr);

Timer timer;
const spdk_nvme_cpl *cpl;
if (Configuration::GetSamplingRate() > 0) {
timer.t1 = std::chrono::high_resolution_clock::now();
cpl = co_await spdk_nvme_zone_append_async_inst(
auto res_cpl = co_await spdk_nvme_zone_append_async_inst(
timer, ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data,
ioCtx.offset, ioCtx.size, ioCtx.flags);
cpl = res_cpl.value();
timer.t6 = std::chrono::high_resolution_clock::now();

if (ctx->ctrl->mTotalCounts % Configuration::GetSamplingRate() == 0)
Expand All @@ -141,10 +142,13 @@ auto zoneAppend(void *arg1) -> net::awaitable<void>
tdiff_us(timer.t4, timer.t3), tdiff_us(timer.t5, timer.t4),
tdiff_us(timer.t6, timer.t5));
} else {
// log_debug("1111");
cpl = co_await spdk_nvme_zone_append_async(
auto res_cpl = co_await spdk_nvme_zone_append_async(
ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset,
ioCtx.size, ioCtx.flags);
if (res_cpl.has_error()) {
// log_error("cpl error status");
} else
cpl = res_cpl.value();
}
if (spdk_nvme_cpl_is_error(cpl)) {
log_error("I/O error status: {}",
Expand Down Expand Up @@ -172,7 +176,7 @@ void spdk_nvme_zone_read_wrapper(
struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags,
std::move_only_function<void(const spdk_nvme_cpl *completion)> cb)
std::move_only_function<void(Result<const spdk_nvme_cpl *>)> cb)
{
// log_debug("1111: offset {}, size {}", offset, size);
auto cb_heap = new decltype(cb)(std::move(cb));
Expand All @@ -185,7 +189,10 @@ void spdk_nvme_zone_read_wrapper(
delete cb3;
},
(void *)(cb_heap), flags);
// assert(rc == 0);
if (rc != 0) {
(*cb_heap)(outcome::failure(std::errc::io_error));
delete cb_heap;
}
});
thread_send_msg(
thread,
Expand All @@ -200,7 +207,7 @@ void spdk_nvme_zone_read_wrapper(
auto spdk_nvme_zone_read_async(
struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags) -> net::awaitable<const spdk_nvme_cpl *>
uint32_t flags) -> net::awaitable<Result<const spdk_nvme_cpl *>>
{
auto init = [](auto completion_handler, spdk_thread *thread,
spdk_nvme_ns *ns, spdk_nvme_qpair *qpair, void *data,
Expand All @@ -210,7 +217,7 @@ auto spdk_nvme_zone_read_async(
};

return net::async_initiate<decltype(net::use_awaitable),
void(const spdk_nvme_cpl *)>(
void(Result<const spdk_nvme_cpl *>)>(
init, net::use_awaitable, thread, ns, qpair, data, offset, size, flags);
}

Expand All @@ -219,11 +226,11 @@ void spdk_nvme_zone_read_wrapper_inst(
Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags,
std::move_only_function<void(const spdk_nvme_cpl *completion)> cb)
std::move_only_function<void(Result<const spdk_nvme_cpl *>)> cb)
{
auto cb_heap = new std::pair<
std::move_only_function<void(const spdk_nvme_cpl *completion)>,
Timer &>(decltype(cb)(std::move(cb)), timer);
std::move_only_function<void(Result<const spdk_nvme_cpl *>)>, Timer &>(
decltype(cb)(std::move(cb)), timer);
auto fn = new std::move_only_function<void(void)>([=, &timer]() {
timer.t4 = std::chrono::high_resolution_clock::now();
int rc = spdk_nvme_ns_cmd_read(
Expand All @@ -236,7 +243,10 @@ void spdk_nvme_zone_read_wrapper_inst(
delete tuple_;
},
(void *)(cb_heap), flags);
// assert(rc == 0);
if (rc != 0) {
(cb_heap->first)(outcome::failure(std::errc::io_error));
delete cb_heap;
}
});
thread_send_msg(
thread,
Expand All @@ -251,7 +261,7 @@ void spdk_nvme_zone_read_wrapper_inst(
auto spdk_nvme_zone_read_async_inst(
Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns,
struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size,
uint32_t flags) -> net::awaitable<const spdk_nvme_cpl *>
uint32_t flags) -> net::awaitable<Result<const spdk_nvme_cpl *>>
{
timer.t2 = std::chrono::high_resolution_clock::now();
auto init = [](auto completion_handler, Timer *timer, spdk_thread *thread,
Expand All @@ -264,25 +274,25 @@ auto spdk_nvme_zone_read_async_inst(
};

return net::async_initiate<decltype(net::use_awaitable),
void(const spdk_nvme_cpl *)>(
void(Result<const spdk_nvme_cpl *>)>(
init, net::use_awaitable, &timer, thread, ns, qpair, data, offset, size,
flags);
}

auto zoneRead(void *arg1) -> net::awaitable<void>
auto zoneRead(void *arg1) -> net::awaitable<Result<void>>
{
RequestContext *ctx = reinterpret_cast<RequestContext *>(arg1);
auto ioCtx = ctx->ioContext;
int rc = 0;
assert(ctx->ctrl != nullptr);

Timer timer;
const spdk_nvme_cpl *cpl;
if (Configuration::GetSamplingRate() > 0) {
timer.t1 = std::chrono::high_resolution_clock::now();
cpl = co_await spdk_nvme_zone_read_async_inst(
auto res_cpl = co_await spdk_nvme_zone_read_async_inst(
timer, ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data,
ioCtx.offset, ioCtx.size, ioCtx.flags);
cpl = res_cpl.value();
timer.t6 = std::chrono::high_resolution_clock::now();

if (ctx->ctrl->mTotalCounts % Configuration::GetSamplingRate() == 0)
Expand All @@ -292,9 +302,13 @@ auto zoneRead(void *arg1) -> net::awaitable<void>
tdiff_us(timer.t4, timer.t3), tdiff_us(timer.t5, timer.t4),
tdiff_us(timer.t6, timer.t5));
} else {
cpl = co_await spdk_nvme_zone_read_async(
auto res_cpl = co_await spdk_nvme_zone_read_async(
ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset,
ioCtx.size, ioCtx.flags);
if (res_cpl.has_error()) {
// log_error("cpl error status");
} else
cpl = res_cpl.value();
}
if (spdk_nvme_cpl_is_error(cpl)) {
log_error("I/O error status: {}",
Expand Down
3 changes: 2 additions & 1 deletion src/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ int dispatchObjectWorker(void *args);
void thread_send_msg(spdk_thread *thread, spdk_msg_fn fn, void *args);
// void event_call(uint32_t core_id, spdk_event_fn fn, void *arg1, void *arg2);

auto zoneRead(void *args) -> net::awaitable<void>;
auto zoneRead(void *args) -> net::awaitable<Result<void>>;
// auto zoneAppend(void *args) -> net::awaitable<Result<void>>;
auto zoneAppend(void *args) -> net::awaitable<void>;

double GetTimestampInUs();
Expand Down
14 changes: 10 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ int main(int argc, char **argv)
// sleep(1);
// }

sleep(30);
if (gZstoreController->mPhase == 2 && gZstoreController->mPhase == 1) {
log_info("Prepare phase is done, dumping all map and bloom filter");
gZstoreController->DumpAllMap();
if (gZstoreController->mKeyExperiment == 1) {
while (1)
sleep(1);
} else {
sleep(30);
if (gZstoreController->mKeyExperiment == 2 &&
gZstoreController->mPhase == 1) {
log_info("Prepare phase is done, dumping all map and bloom filter");
gZstoreController->DumpAllMap();
}
}

gZstoreController->Drain();
Expand Down

0 comments on commit de73b26

Please sign in to comment.