From de73b26ca4776ce05fb2e01ce0ae6a09be5206fd Mon Sep 17 00:00:00 2001 From: Shuwen Sun Date: Mon, 28 Oct 2024 18:13:38 +0000 Subject: [PATCH] added coawait error handling --- src/common.cc | 80 ++++++++++++++++++++++++++------------------ src/include/common.h | 3 +- src/main.cc | 14 +++++--- 3 files changed, 59 insertions(+), 38 deletions(-) diff --git a/src/common.cc b/src/common.cc index 6a06116..c222dba 100644 --- a/src/common.cc +++ b/src/common.cc @@ -15,16 +15,11 @@ void spdk_nvme_zone_append_wrapper( struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, uint32_t flags, - std::move_only_function cb) + std::move_only_function)> cb) { // log_debug("APPEND: offset {}, size {}", offset, size); auto cb_heap = new decltype(cb)(std::move(cb)); auto fn = new std::move_only_function([=]() { - // rc = spdk_nvme_zns_zone_append(entry->nvme.ns, worker->ns_ctx->qpair, - // task->buf, zslba, - // entry->io_size_blocks, io_complete, - // task, 0); - int rc = spdk_nvme_zns_zone_append( ns, qpair, data, offset, size, [](void *arg, const spdk_nvme_cpl *completion) mutable { @@ -33,7 +28,10 @@ void spdk_nvme_zone_append_wrapper( delete cb3; }, (void *)(cb_heap), flags); - assert(rc == 0); + if (rc != 0) { + (*cb_heap)(outcome::failure(std::errc::io_error)); + delete cb_heap; + } }); thread_send_msg( thread, @@ -48,7 +46,7 @@ void spdk_nvme_zone_append_wrapper( auto spdk_nvme_zone_append_async( struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, - uint32_t flags) -> net::awaitable + uint32_t flags) -> net::awaitable> { // log_debug("1111"); auto init = [](auto completion_handler, spdk_thread *thread, @@ -59,7 +57,7 @@ auto spdk_nvme_zone_append_async( }; return net::async_initiate( + void(Result)>( init, net::use_awaitable, thread, ns, qpair, data, offset, size, flags); } @@ -68,11 +66,11 @@ void spdk_nvme_zone_append_wrapper_inst( Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, uint32_t flags, - std::move_only_function cb) + std::move_only_function)> cb) { auto cb_heap = new std::pair< - std::move_only_function, - Timer &>(decltype(cb)(std::move(cb)), timer); + std::move_only_function)>, Timer &>( + decltype(cb)(std::move(cb)), timer); auto fn = new std::move_only_function([=, &timer]() { timer.t4 = std::chrono::high_resolution_clock::now(); int rc = spdk_nvme_zns_zone_append( @@ -85,7 +83,10 @@ void spdk_nvme_zone_append_wrapper_inst( delete tuple_; }, (void *)(cb_heap), flags); - // assert(rc == 0); + if (rc != 0) { + (cb_heap->first)(outcome::failure(std::errc::io_error)); + delete cb_heap; + } }); thread_send_msg( thread, @@ -100,7 +101,7 @@ void spdk_nvme_zone_append_wrapper_inst( auto spdk_nvme_zone_append_async_inst( Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, - uint32_t flags) -> net::awaitable + uint32_t flags) -> net::awaitable> { timer.t2 = std::chrono::high_resolution_clock::now(); auto init = [](auto completion_handler, Timer *timer, spdk_thread *thread, @@ -113,7 +114,7 @@ auto spdk_nvme_zone_append_async_inst( }; return net::async_initiate( + void(Result)>( init, net::use_awaitable, &timer, thread, ns, qpair, data, offset, size, flags); } @@ -122,16 +123,16 @@ auto zoneAppend(void *arg1) -> net::awaitable { RequestContext *ctx = reinterpret_cast(arg1); auto ioCtx = ctx->ioContext; - int rc = 0; assert(ctx->ctrl != nullptr); Timer timer; const spdk_nvme_cpl *cpl; if (Configuration::GetSamplingRate() > 0) { timer.t1 = std::chrono::high_resolution_clock::now(); - cpl = co_await spdk_nvme_zone_append_async_inst( + auto res_cpl = co_await spdk_nvme_zone_append_async_inst( timer, ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset, ioCtx.size, ioCtx.flags); + cpl = res_cpl.value(); timer.t6 = std::chrono::high_resolution_clock::now(); if (ctx->ctrl->mTotalCounts % Configuration::GetSamplingRate() == 0) @@ -141,10 +142,13 @@ auto zoneAppend(void *arg1) -> net::awaitable tdiff_us(timer.t4, timer.t3), tdiff_us(timer.t5, timer.t4), tdiff_us(timer.t6, timer.t5)); } else { - // log_debug("1111"); - cpl = co_await spdk_nvme_zone_append_async( + auto res_cpl = co_await spdk_nvme_zone_append_async( ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset, ioCtx.size, ioCtx.flags); + if (res_cpl.has_error()) { + // log_error("cpl error status"); + } else + cpl = res_cpl.value(); } if (spdk_nvme_cpl_is_error(cpl)) { log_error("I/O error status: {}", @@ -172,7 +176,7 @@ void spdk_nvme_zone_read_wrapper( struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, uint32_t flags, - std::move_only_function cb) + std::move_only_function)> cb) { // log_debug("1111: offset {}, size {}", offset, size); auto cb_heap = new decltype(cb)(std::move(cb)); @@ -185,7 +189,10 @@ void spdk_nvme_zone_read_wrapper( delete cb3; }, (void *)(cb_heap), flags); - // assert(rc == 0); + if (rc != 0) { + (*cb_heap)(outcome::failure(std::errc::io_error)); + delete cb_heap; + } }); thread_send_msg( thread, @@ -200,7 +207,7 @@ void spdk_nvme_zone_read_wrapper( auto spdk_nvme_zone_read_async( struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, - uint32_t flags) -> net::awaitable + uint32_t flags) -> net::awaitable> { auto init = [](auto completion_handler, spdk_thread *thread, spdk_nvme_ns *ns, spdk_nvme_qpair *qpair, void *data, @@ -210,7 +217,7 @@ auto spdk_nvme_zone_read_async( }; return net::async_initiate( + void(Result)>( init, net::use_awaitable, thread, ns, qpair, data, offset, size, flags); } @@ -219,11 +226,11 @@ void spdk_nvme_zone_read_wrapper_inst( Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, uint32_t flags, - std::move_only_function cb) + std::move_only_function)> cb) { auto cb_heap = new std::pair< - std::move_only_function, - Timer &>(decltype(cb)(std::move(cb)), timer); + std::move_only_function)>, Timer &>( + decltype(cb)(std::move(cb)), timer); auto fn = new std::move_only_function([=, &timer]() { timer.t4 = std::chrono::high_resolution_clock::now(); int rc = spdk_nvme_ns_cmd_read( @@ -236,7 +243,10 @@ void spdk_nvme_zone_read_wrapper_inst( delete tuple_; }, (void *)(cb_heap), flags); - // assert(rc == 0); + if (rc != 0) { + (cb_heap->first)(outcome::failure(std::errc::io_error)); + delete cb_heap; + } }); thread_send_msg( thread, @@ -251,7 +261,7 @@ void spdk_nvme_zone_read_wrapper_inst( auto spdk_nvme_zone_read_async_inst( Timer &timer, struct spdk_thread *thread, struct spdk_nvme_ns *ns, struct spdk_nvme_qpair *qpair, void *data, uint64_t offset, uint32_t size, - uint32_t flags) -> net::awaitable + uint32_t flags) -> net::awaitable> { timer.t2 = std::chrono::high_resolution_clock::now(); auto init = [](auto completion_handler, Timer *timer, spdk_thread *thread, @@ -264,25 +274,25 @@ auto spdk_nvme_zone_read_async_inst( }; return net::async_initiate( + void(Result)>( init, net::use_awaitable, &timer, thread, ns, qpair, data, offset, size, flags); } -auto zoneRead(void *arg1) -> net::awaitable +auto zoneRead(void *arg1) -> net::awaitable> { RequestContext *ctx = reinterpret_cast(arg1); auto ioCtx = ctx->ioContext; - int rc = 0; assert(ctx->ctrl != nullptr); Timer timer; const spdk_nvme_cpl *cpl; if (Configuration::GetSamplingRate() > 0) { timer.t1 = std::chrono::high_resolution_clock::now(); - cpl = co_await spdk_nvme_zone_read_async_inst( + auto res_cpl = co_await spdk_nvme_zone_read_async_inst( timer, ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset, ioCtx.size, ioCtx.flags); + cpl = res_cpl.value(); timer.t6 = std::chrono::high_resolution_clock::now(); if (ctx->ctrl->mTotalCounts % Configuration::GetSamplingRate() == 0) @@ -292,9 +302,13 @@ auto zoneRead(void *arg1) -> net::awaitable tdiff_us(timer.t4, timer.t3), tdiff_us(timer.t5, timer.t4), tdiff_us(timer.t6, timer.t5)); } else { - cpl = co_await spdk_nvme_zone_read_async( + auto res_cpl = co_await spdk_nvme_zone_read_async( ctx->io_thread, ioCtx.ns, ioCtx.qpair, ioCtx.data, ioCtx.offset, ioCtx.size, ioCtx.flags); + if (res_cpl.has_error()) { + // log_error("cpl error status"); + } else + cpl = res_cpl.value(); } if (spdk_nvme_cpl_is_error(cpl)) { log_error("I/O error status: {}", diff --git a/src/include/common.h b/src/include/common.h index ba9b788..2692777 100644 --- a/src/include/common.h +++ b/src/include/common.h @@ -35,7 +35,8 @@ int dispatchObjectWorker(void *args); void thread_send_msg(spdk_thread *thread, spdk_msg_fn fn, void *args); // void event_call(uint32_t core_id, spdk_event_fn fn, void *arg1, void *arg2); -auto zoneRead(void *args) -> net::awaitable; +auto zoneRead(void *args) -> net::awaitable>; +// auto zoneAppend(void *args) -> net::awaitable>; auto zoneAppend(void *args) -> net::awaitable; double GetTimestampInUs(); diff --git a/src/main.cc b/src/main.cc index 5bd941a..1dbf78e 100644 --- a/src/main.cc +++ b/src/main.cc @@ -80,10 +80,16 @@ int main(int argc, char **argv) // sleep(1); // } - sleep(30); - if (gZstoreController->mPhase == 2 && gZstoreController->mPhase == 1) { - log_info("Prepare phase is done, dumping all map and bloom filter"); - gZstoreController->DumpAllMap(); + if (gZstoreController->mKeyExperiment == 1) { + while (1) + sleep(1); + } else { + sleep(30); + if (gZstoreController->mKeyExperiment == 2 && + gZstoreController->mPhase == 1) { + log_info("Prepare phase is done, dumping all map and bloom filter"); + gZstoreController->DumpAllMap(); + } } gZstoreController->Drain();