Skip to content

Commit

Permalink
Checkpoint: foo
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwens committed Oct 30, 2024
1 parent 1f096ed commit 1c17c6e
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 79 deletions.
2 changes: 1 addition & 1 deletion examples/boost/http_server_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ int main(int argc, char *argv[])
auto const address = net::ip::make_address("127.0.0.1");
auto const port = 2000;
auto const doc_root = std::make_shared<std::string>(".");
auto const threads = std::max<int>(1, 1);
auto const threads = std::max<int>(1, 8);

// The io_context is required for all I/O
net::io_context ioc{threads};
Expand Down
28 changes: 14 additions & 14 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -272,27 +272,27 @@ executable(
# )

# HTTP server
# executable(
# 'http_async',
# files('examples/boost/http_server_async.cpp'),
# dependencies: [zstore_deps] + [dependency('_spdk')],
# link_args: spdk_link_args,
# install: true,
# )
executable(
'http_async',
files('examples/boost/http_server_async.cpp'),
dependencies: [zstore_deps] + [dependency('_spdk')],
link_args: spdk_link_args,
install: true,
)
# executable(
# 'http_small',
# files('examples/boost/http_server_small.cpp'),
# dependencies: [zstore_deps] + [dependency('_spdk')],
# link_args: spdk_link_args,
# install: true,
# )
# executable(
# 'http_awaitable',
# files('examples/boost/http_server_awaitable.cc'),
# dependencies: [zstore_deps] + [dependency('_spdk')],
# link_args: spdk_link_args,
# install: true,
# )
executable(
'http_awaitable',
files('examples/boost/http_server_awaitable.cc'),
dependencies: [zstore_deps] + [dependency('_spdk')],
link_args: spdk_link_args,
install: true,
)

# Tests
# -----------
Expand Down
13 changes: 6 additions & 7 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void spdk_nvme_zone_read_wrapper(
uint32_t flags,
std::move_only_function<void(Result<const spdk_nvme_cpl *>)> cb)
{
log_debug("1111: offset {}, size {}", offset, size);
// log_debug("1111: offset {}, size {}", offset, size);
auto cb_heap = new decltype(cb)(std::move(cb));
auto fn = new std::move_only_function<void(void)>([=]() {
int rc = spdk_nvme_ns_cmd_read(
Expand Down Expand Up @@ -307,16 +307,15 @@ auto zoneRead(void *arg1) -> net::awaitable<Result<void>>
ioCtx.size, ioCtx.flags);
if (res_cpl.has_error()) {
// log_error("cpl error status");
co_return outcome::failure(std::errc::io_error);
} else
cpl = res_cpl.value();
}
if (spdk_nvme_cpl_is_error(cpl)) {
log_error("I/O error status: {}",
spdk_nvme_cpl_get_status_string(&cpl->status));
// fprintf(stderr, "I/O failed, aborting run\n");
// assert(0);
// exit(1);
log_debug("Unimplemented: put context back in pool");
// log_error("I/O error status: {}",
// spdk_nvme_cpl_get_status_string(&cpl->status));
// log_debug("Unimplemented: put context back in pool");
co_return outcome::failure(std::errc::io_error);
}

// For read, we swap the read date into the request body
Expand Down
49 changes: 31 additions & 18 deletions src/include/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ auto awaitable_on_request(HttpRequest req,
// entry.second_tgt(), entry.third_tgt());
auto [first, _, _] = entry;
auto [tgt, lba, _] = first;
log_debug("Reading from tgt {} lba {}", tgt, lba);
// log_debug("Reading from tgt {} lba {}", tgt, lba);

auto dev1 = zctrl_.GetDevice(tgt);
// auto dev2 = zctrl_.GetDevice(entry.second_tgt());
// auto dev3 = zctrl_.GetDevice(entry.third_tgt());
Expand All @@ -79,24 +80,33 @@ auto awaitable_on_request(HttpRequest req,
// MakeReadRequest(&zctrl_, dev3, entry.third_lba(),
// req).value();

co_await zoneRead(s1);
auto res = co_await zoneRead(s1);
// co_await (zoneRead(s1) && zoneRead(s2) && zoneRead(s3));

// log_debug("1111");

ZstoreObject deserialized_obj;
bool success = ReadBufferToZstoreObject(s1->dataBuffer, s1->size,
deserialized_obj);

// log_debug("1111");
s1->Clear();
zctrl_.mRequestContextPool->ReturnRequestContext(s1);
// s2->Clear();
// zctrl_.mRequestContextPool->ReturnRequestContext(s2);
// s3->Clear();
// zctrl_.mRequestContextPool->ReturnRequestContext(s3);

co_return handle_request(std::move(req));
if (res.has_value()) {
// log_debug("1111");
// }
ZstoreObject deserialized_obj;
bool success = ReadBufferToZstoreObject(
s1->dataBuffer, s1->size, deserialized_obj);

// log_debug("1111");
s1->Clear();
zctrl_.mRequestContextPool->ReturnRequestContext(s1);
// s2->Clear();
// zctrl_.mRequestContextPool->ReturnRequestContext(s2);
// s3->Clear();
// zctrl_.mRequestContextPool->ReturnRequestContext(s3);

co_return handle_request(std::move(req));
} else {
s1->Clear();
zctrl_.mRequestContextPool->ReturnRequestContext(s1);

co_return handle_request(std::move(req));
}
} else {
log_error("Draining or not enough contexts");
}
Expand Down Expand Up @@ -145,7 +155,8 @@ auto awaitable_on_request(HttpRequest req,
auto dev3 = zctrl_.GetDevice(tgt3);

// auto slot = MakeWriteRequest(
// &zctrl_, zctrl_.GetDevice(entry.first_tgt()), req, entry);
// &zctrl_, zctrl_.GetDevice(entry.first_tgt()), req,
// entry);

ZstoreObject original_obj;
original_obj.entry.type = LogEntryType::kData;
Expand Down Expand Up @@ -178,7 +189,8 @@ auto awaitable_on_request(HttpRequest req,
// co_await zoneAppend(s3);
// log_debug("s3");

// co_await (zoneAppend(s1) && zoneAppend(s2) && zoneAppend(s3));
// co_await (zoneAppend(s1) && zoneAppend(s2) &&
// zoneAppend(s3));

if (zctrl_.verbose)
log_debug("6666");
Expand All @@ -195,7 +207,8 @@ auto awaitable_on_request(HttpRequest req,
std::make_pair(tgt2, dev2->GetZoneId()),
std::make_pair(tgt3, dev3->GetZoneId())),
s1->append_lba, 1, 0, 1, 0, 1)
// s1->append_lba, 1, s2->append_lba, 1, s3->append_lba, 1)
// s1->append_lba, 1, s2->append_lba, 1, s3->append_lba,
// 1)
.value();
// update lba in map
auto rc = zctrl_.PutObject(key_hash, new_entry).value();
Expand Down
9 changes: 5 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int main(int argc, char **argv)
opts.name = "Zstore";
opts.mem_size = g_dpdk_mem;
opts.hugepage_single_segments = g_dpdk_mem_single_seg;
opts.core_mask = "0xbf";
opts.core_mask = "0xf";
// opts.core_mask = "0x7"; // b, 7, f
opts.shm_id = -1;
if (spdk_env_init(&opts) < 0) {
Expand All @@ -58,7 +58,6 @@ int main(int argc, char **argv)

log_info("Starting HTTP server with port 2000!\n");

// gZstoreController->mIoc_.run();
// FIXME only tput on Zstore2Dev1
// while (1) {
// auto etime = std::chrono::high_resolution_clock::now();
Expand All @@ -81,8 +80,10 @@ int main(int argc, char **argv)
// }

if (gZstoreController->mKeyExperiment == 1) {
while (1)
sleep(1);
log_info("1111");
gZstoreController->mIoc_.run();
// while (1)
// sleep(1);
} else {
sleep(30);
if (gZstoreController->mKeyExperiment == 2 &&
Expand Down
73 changes: 39 additions & 34 deletions src/zstore_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,11 @@ int ZstoreController::Init(bool object, int key_experiment, int phase)
auto const port = 2000;

// The io_context is required for all I/O
// auto const num_threads = Configuration::GetNumHttpThreads();
// net::io_context ioc{num_threads};
auto const num_threads = 3;
net::io_context ioc{num_threads};

// Spawn a listening port
boost::asio::co_spawn(mIoc_, do_listen(tcp::endpoint{address, port}, *this),
boost::asio::co_spawn(ioc, do_listen(tcp::endpoint{address, port}, *this),
[](std::exception_ptr e) {
if (e)
try {
Expand All @@ -513,39 +513,42 @@ int ZstoreController::Init(bool object, int key_experiment, int phase)
}
});

// std::vector<std::jthread> threads(num_threads);
// for (unsigned i = 0; i < num_threads; ++i) {
// threads[i] = std::jthread([&ioc, i, &threads] {
// // Create a cpu_set_t object representing a set of CPUs.
// // Clear it and mark only CPU i as set.
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET(i + Configuration::GetIoThreadCoreId(), &cpuset);
// std::string name = "zstore_ioc" + std::to_string(i + 4);
// int rc =
// pthread_setname_np(threads[i].native_handle(), name.c_str());
// if (rc != 0) {
// log_error("HTTP server: Error calling pthread_setname: {}",
// rc);
// }
// rc = pthread_setaffinity_np(threads[i].native_handle(),
// sizeof(cpu_set_t), &cpuset);
// if (rc != 0) {
// log_error(
// "HTTP server: Error calling pthread_setaffinity_np: {}",
// rc);
// }
// ioc.run();
// });
// }
std::vector<std::jthread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = std::jthread([&ioc, i, &threads] {
// Create a cpu_set_t object representing a set of CPUs.
// Clear it and mark only CPU i as set.
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i % 3 + Configuration::GetHttpThreadCoreId(), &cpuset);
std::string name =
"zstore_ioc" +
std::to_string(i + Configuration::GetHttpThreadCoreId());
int rc =
pthread_setname_np(threads[i].native_handle(), name.c_str());
if (rc != 0) {
log_error("HTTP server: Error calling pthread_setname: {}", rc);
}
rc = pthread_setaffinity_np(threads[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
log_error(
"HTTP server: Error calling pthread_setaffinity_np: {}",
rc);
}
log_info("HTTP server: Thread {} on core {}", i,
i % 3 + Configuration::GetHttpThreadCoreId());
ioc.run();
});
}

// This was using SPDK threads as HTTP threads
for (int threadId = 0; threadId < Configuration::GetNumHttpThreads();
++threadId) {
mHttpThread[threadId].group = spdk_nvme_poll_group_create(NULL, NULL);
mHttpThread[threadId].controller = this;
}
initHttpThread();
// for (int threadId = 0; threadId < Configuration::GetNumHttpThreads();
// ++threadId) {
// mHttpThread[threadId].group = spdk_nvme_poll_group_create(NULL,
// NULL); mHttpThread[threadId].controller = this;
// }
// initHttpThread();

log_info("Initialization complete. Launching workers.");

Expand Down Expand Up @@ -578,6 +581,8 @@ int ZstoreController::Init(bool object, int key_experiment, int phase)

log_info("ZstoreController Init finish");

ioc.run();

return rc;
}

Expand Down
4 changes: 3 additions & 1 deletion utils/bench_read.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ set -xeuo pipefail
# sudo taskset -c 7,8,9,10,11 ~/tools/wrk/wrk -t16 -c1000 -d30s -s random-reads.lua http://127.0.0.1:2000
# sudo taskset -c 7,8,9,10,11 ~/tools/wrk/wrk -t16 -c1000 -d30s -s random-reads.lua http://127.0.0.1:2000

sudo nice -n -20 taskset -c 8-11 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false
# sudo nice -n -20 taskset -c 8-11 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false
# sudo taskset -c 0,4,5 ~/tools/wrk/wrk -t16 -c800 -d30s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false
sudo taskset -c 3,4,5 ~/tools/wrk/wrk -t16 -c800 -d10s -s random-reads.lua http://127.0.0.1:2000 -- 100000 false

0 comments on commit 1c17c6e

Please sign in to comment.