diff --git a/examples/boost/http_server_async.cpp b/examples/boost/http_server_async.cpp index eb41fff..d05eafb 100644 --- a/examples/boost/http_server_async.cpp +++ b/examples/boost/http_server_async.cpp @@ -419,7 +419,7 @@ int main(int argc, char *argv[]) auto const address = net::ip::make_address("127.0.0.1"); auto const port = 2000; auto const doc_root = std::make_shared("."); - auto const threads = std::max(1, 1); + auto const threads = std::max(1, 8); // The io_context is required for all I/O net::io_context ioc{threads}; diff --git a/meson.build b/meson.build index 9bb4415..06727e0 100644 --- a/meson.build +++ b/meson.build @@ -272,13 +272,13 @@ executable( # ) # HTTP server -# executable( -# 'http_async', -# files('examples/boost/http_server_async.cpp'), -# dependencies: [zstore_deps] + [dependency('_spdk')], -# link_args: spdk_link_args, -# install: true, -# ) +executable( + 'http_async', + files('examples/boost/http_server_async.cpp'), + dependencies: [zstore_deps] + [dependency('_spdk')], + link_args: spdk_link_args, + install: true, +) # executable( # 'http_small', # files('examples/boost/http_server_small.cpp'), @@ -286,13 +286,13 @@ executable( # link_args: spdk_link_args, # install: true, # ) -# executable( -# 'http_awaitable', -# files('examples/boost/http_server_awaitable.cc'), -# dependencies: [zstore_deps] + [dependency('_spdk')], -# link_args: spdk_link_args, -# install: true, -# ) +executable( + 'http_awaitable', + files('examples/boost/http_server_awaitable.cc'), + dependencies: [zstore_deps] + [dependency('_spdk')], + link_args: spdk_link_args, + install: true, +) # Tests # ----------- diff --git a/src/common.cc b/src/common.cc index 26ff57d..046759e 100644 --- a/src/common.cc +++ b/src/common.cc @@ -178,7 +178,7 @@ void spdk_nvme_zone_read_wrapper( uint32_t flags, std::move_only_function)> cb) { - log_debug("1111: offset {}, size {}", offset, size); + // log_debug("1111: offset {}, size {}", offset, size); auto cb_heap = new decltype(cb)(std::move(cb)); auto fn = new std::move_only_function([=]() { int rc = spdk_nvme_ns_cmd_read( @@ -307,16 +307,15 @@ auto zoneRead(void *arg1) -> net::awaitable> ioCtx.size, ioCtx.flags); if (res_cpl.has_error()) { // log_error("cpl error status"); + co_return outcome::failure(std::errc::io_error); } else cpl = res_cpl.value(); } if (spdk_nvme_cpl_is_error(cpl)) { - log_error("I/O error status: {}", - spdk_nvme_cpl_get_status_string(&cpl->status)); - // fprintf(stderr, "I/O failed, aborting run\n"); - // assert(0); - // exit(1); - log_debug("Unimplemented: put context back in pool"); + // log_error("I/O error status: {}", + // spdk_nvme_cpl_get_status_string(&cpl->status)); + // log_debug("Unimplemented: put context back in pool"); + co_return outcome::failure(std::errc::io_error); } // For read, we swap the read date into the request body diff --git a/src/include/http_server.h b/src/include/http_server.h index 883fe15..db44d55 100644 --- a/src/include/http_server.h +++ b/src/include/http_server.h @@ -66,7 +66,8 @@ auto awaitable_on_request(HttpRequest req, // entry.second_tgt(), entry.third_tgt()); auto [first, _, _] = entry; auto [tgt, lba, _] = first; - log_debug("Reading from tgt {} lba {}", tgt, lba); + // log_debug("Reading from tgt {} lba {}", tgt, lba); + auto dev1 = zctrl_.GetDevice(tgt); // auto dev2 = zctrl_.GetDevice(entry.second_tgt()); // auto dev3 = zctrl_.GetDevice(entry.third_tgt()); @@ -79,24 +80,33 @@ auto awaitable_on_request(HttpRequest req, // MakeReadRequest(&zctrl_, dev3, entry.third_lba(), // req).value(); - co_await zoneRead(s1); + auto res = co_await zoneRead(s1); // co_await (zoneRead(s1) && zoneRead(s2) && zoneRead(s3)); // log_debug("1111"); - ZstoreObject deserialized_obj; - bool success = ReadBufferToZstoreObject(s1->dataBuffer, s1->size, - deserialized_obj); - - // log_debug("1111"); - s1->Clear(); - zctrl_.mRequestContextPool->ReturnRequestContext(s1); - // s2->Clear(); - // zctrl_.mRequestContextPool->ReturnRequestContext(s2); - // s3->Clear(); - // zctrl_.mRequestContextPool->ReturnRequestContext(s3); - - co_return handle_request(std::move(req)); + if (res.has_value()) { + // log_debug("1111"); + // } + ZstoreObject deserialized_obj; + bool success = ReadBufferToZstoreObject( + s1->dataBuffer, s1->size, deserialized_obj); + + // log_debug("1111"); + s1->Clear(); + zctrl_.mRequestContextPool->ReturnRequestContext(s1); + // s2->Clear(); + // zctrl_.mRequestContextPool->ReturnRequestContext(s2); + // s3->Clear(); + // zctrl_.mRequestContextPool->ReturnRequestContext(s3); + + co_return handle_request(std::move(req)); + } else { + s1->Clear(); + zctrl_.mRequestContextPool->ReturnRequestContext(s1); + + co_return handle_request(std::move(req)); + } } else { log_error("Draining or not enough contexts"); } @@ -145,7 +155,8 @@ auto awaitable_on_request(HttpRequest req, auto dev3 = zctrl_.GetDevice(tgt3); // auto slot = MakeWriteRequest( - // &zctrl_, zctrl_.GetDevice(entry.first_tgt()), req, entry); + // &zctrl_, zctrl_.GetDevice(entry.first_tgt()), req, + // entry); ZstoreObject original_obj; original_obj.entry.type = LogEntryType::kData; @@ -178,7 +189,8 @@ auto awaitable_on_request(HttpRequest req, // co_await zoneAppend(s3); // log_debug("s3"); - // co_await (zoneAppend(s1) && zoneAppend(s2) && zoneAppend(s3)); + // co_await (zoneAppend(s1) && zoneAppend(s2) && + // zoneAppend(s3)); if (zctrl_.verbose) log_debug("6666"); @@ -195,7 +207,8 @@ auto awaitable_on_request(HttpRequest req, std::make_pair(tgt2, dev2->GetZoneId()), std::make_pair(tgt3, dev3->GetZoneId())), s1->append_lba, 1, 0, 1, 0, 1) - // s1->append_lba, 1, s2->append_lba, 1, s3->append_lba, 1) + // s1->append_lba, 1, s2->append_lba, 1, s3->append_lba, + // 1) .value(); // update lba in map auto rc = zctrl_.PutObject(key_hash, new_entry).value(); diff --git a/src/main.cc b/src/main.cc index 1dbf78e..ce9fd36 100644 --- a/src/main.cc +++ b/src/main.cc @@ -39,7 +39,7 @@ int main(int argc, char **argv) opts.name = "Zstore"; opts.mem_size = g_dpdk_mem; opts.hugepage_single_segments = g_dpdk_mem_single_seg; - opts.core_mask = "0xbf"; + opts.core_mask = "0xf"; // opts.core_mask = "0x7"; // b, 7, f opts.shm_id = -1; if (spdk_env_init(&opts) < 0) { @@ -58,7 +58,6 @@ int main(int argc, char **argv) log_info("Starting HTTP server with port 2000!\n"); - // gZstoreController->mIoc_.run(); // FIXME only tput on Zstore2Dev1 // while (1) { // auto etime = std::chrono::high_resolution_clock::now(); @@ -81,8 +80,10 @@ int main(int argc, char **argv) // } if (gZstoreController->mKeyExperiment == 1) { - while (1) - sleep(1); + log_info("1111"); + gZstoreController->mIoc_.run(); + // while (1) + // sleep(1); } else { sleep(30); if (gZstoreController->mKeyExperiment == 2 && diff --git a/src/zstore_controller.cc b/src/zstore_controller.cc index 7103089..2bba03e 100644 --- a/src/zstore_controller.cc +++ b/src/zstore_controller.cc @@ -497,11 +497,11 @@ int ZstoreController::Init(bool object, int key_experiment, int phase) auto const port = 2000; // The io_context is required for all I/O - // auto const num_threads = Configuration::GetNumHttpThreads(); - // net::io_context ioc{num_threads}; + auto const num_threads = 3; + net::io_context ioc{num_threads}; // Spawn a listening port - boost::asio::co_spawn(mIoc_, do_listen(tcp::endpoint{address, port}, *this), + boost::asio::co_spawn(ioc, do_listen(tcp::endpoint{address, port}, *this), [](std::exception_ptr e) { if (e) try { @@ -513,39 +513,42 @@ int ZstoreController::Init(bool object, int key_experiment, int phase) } }); - // std::vector threads(num_threads); - // for (unsigned i = 0; i < num_threads; ++i) { - // threads[i] = std::jthread([&ioc, i, &threads] { - // // Create a cpu_set_t object representing a set of CPUs. - // // Clear it and mark only CPU i as set. - // cpu_set_t cpuset; - // CPU_ZERO(&cpuset); - // CPU_SET(i + Configuration::GetIoThreadCoreId(), &cpuset); - // std::string name = "zstore_ioc" + std::to_string(i + 4); - // int rc = - // pthread_setname_np(threads[i].native_handle(), name.c_str()); - // if (rc != 0) { - // log_error("HTTP server: Error calling pthread_setname: {}", - // rc); - // } - // rc = pthread_setaffinity_np(threads[i].native_handle(), - // sizeof(cpu_set_t), &cpuset); - // if (rc != 0) { - // log_error( - // "HTTP server: Error calling pthread_setaffinity_np: {}", - // rc); - // } - // ioc.run(); - // }); - // } + std::vector threads(num_threads); + for (unsigned i = 0; i < num_threads; ++i) { + threads[i] = std::jthread([&ioc, i, &threads] { + // Create a cpu_set_t object representing a set of CPUs. + // Clear it and mark only CPU i as set. + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(i % 3 + Configuration::GetHttpThreadCoreId(), &cpuset); + std::string name = + "zstore_ioc" + + std::to_string(i + Configuration::GetHttpThreadCoreId()); + int rc = + pthread_setname_np(threads[i].native_handle(), name.c_str()); + if (rc != 0) { + log_error("HTTP server: Error calling pthread_setname: {}", rc); + } + rc = pthread_setaffinity_np(threads[i].native_handle(), + sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + log_error( + "HTTP server: Error calling pthread_setaffinity_np: {}", + rc); + } + log_info("HTTP server: Thread {} on core {}", i, + i % 3 + Configuration::GetHttpThreadCoreId()); + ioc.run(); + }); + } // This was using SPDK threads as HTTP threads - for (int threadId = 0; threadId < Configuration::GetNumHttpThreads(); - ++threadId) { - mHttpThread[threadId].group = spdk_nvme_poll_group_create(NULL, NULL); - mHttpThread[threadId].controller = this; - } - initHttpThread(); + // for (int threadId = 0; threadId < Configuration::GetNumHttpThreads(); + // ++threadId) { + // mHttpThread[threadId].group = spdk_nvme_poll_group_create(NULL, + // NULL); mHttpThread[threadId].controller = this; + // } + // initHttpThread(); log_info("Initialization complete. Launching workers."); @@ -578,6 +581,8 @@ int ZstoreController::Init(bool object, int key_experiment, int phase) log_info("ZstoreController Init finish"); + ioc.run(); + return rc; } diff --git a/utils/bench_read.sh b/utils/bench_read.sh index 5af68f5..3f27098 100755 --- a/utils/bench_read.sh +++ b/utils/bench_read.sh @@ -10,4 +10,6 @@ set -xeuo pipefail # sudo taskset -c 7,8,9,10,11 ~/tools/wrk/wrk -t16 -c1000 -d30s -s random-reads.lua http://127.0.0.1:2000 # sudo taskset -c 7,8,9,10,11 ~/tools/wrk/wrk -t16 -c1000 -d30s -s random-reads.lua http://127.0.0.1:2000 -sudo nice -n -20 taskset -c 8-11 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false +# sudo nice -n -20 taskset -c 8-11 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false +# sudo taskset -c 0,4,5 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false +sudo taskset -c 3,4,5 ~/tools/wrk/wrk -t16 -c800 -d10s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false