Skip to content

Commit

Permalink
Merge pull request #501 from jagerman/libcurl-multi
Browse files Browse the repository at this point in the history
Refactor onion proxy requests to use curl multi mode
  • Loading branch information
jagerman authored Jul 22, 2024
2 parents a30d6b8 + 75eb69b commit 8616387
Show file tree
Hide file tree
Showing 16 changed files with 369 additions and 95 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif()
cmake_minimum_required(VERSION 3.13)

project(oxenss
VERSION 2.7.0
VERSION 2.8.0
LANGUAGES CXX C)

set(CMAKE_CXX_STANDARD 20)
Expand Down
6 changes: 2 additions & 4 deletions external/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ macro(system_or_submodule BIGNAME smallname pkgconf subdir)
endif()
endmacro()

set(OXEN_LOGGING_SOURCE_ROOT "${PROJECT_SOURCE_DIR}/oxenss" CACHE INTERNAL "")

system_or_submodule(OXENC oxenc liboxenc>=1.0.10 oxenc)

set(OXEN_LOGGING_SOURCE_ROOT "${OXEN_LOGGING_SOURCE_ROOT};${PROJECT_SOURCE_DIR}" CACHE INTERNAL "")

set(LIBQUIC_BUILD_TESTS OFF CACHE BOOL "")
system_or_submodule(OXENQUIC quic liboxenquic>=1.1.0 oxen-libquic)

Expand All @@ -81,6 +77,8 @@ endif()
if(NOT TARGET oxen::logging)
add_subdirectory(oxen-logging)
endif()
oxen_logging_add_source_dir("${PROJECT_SOURCE_DIR}")
oxen_logging_add_source_dir("${PROJECT_SOURCE_DIR}/oxenss")


# uSockets doesn't really have a proper build system (just a very simple Makefile) so build it
Expand Down
2 changes: 1 addition & 1 deletion external/oxen-libquic
2 changes: 1 addition & 1 deletion external/oxen-logging
Submodule oxen-logging updated 1 files
+4 −1 CMakeLists.txt
1 change: 1 addition & 0 deletions oxenss/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ endif()
add_subdirectory(common)
add_subdirectory(crypto)
add_subdirectory(daemon)
add_subdirectory(http)
add_subdirectory(logging)
add_subdirectory(rpc)
add_subdirectory(server)
Expand Down
7 changes: 6 additions & 1 deletion oxenss/daemon/oxen-storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,18 @@ int main(int argc, char* argv[]) {
ssl_dh,
{me.pubkey_legacy, private_key}};

auto quic = std::make_shared<server::QUIC>(
auto quic = std::make_unique<server::QUIC>(
service_node,
request_handler,
rate_limiter,
oxen::quic::Address{options.ip, options.omq_quic_port},
private_key_ed25519);
service_node.register_mq_server(quic.get());

auto http_client = std::make_shared<http::Client>(&quic->net());
service_node.set_http_client(http_client);
request_handler.set_http_client(http_client);

oxenmq_server.init(
&service_node,
&request_handler,
Expand Down Expand Up @@ -202,6 +206,7 @@ int main(int argc, char* argv[]) {
std::this_thread::sleep_for(100ms);

log::warning(logcat, "Received signal {}; shutting down...", signalled.load());
http_client.reset(); // Kills outgoing requests and prevents new ones
service_node.shutdown();
log::info(logcat, "Stopping https server");
https_server.shutdown(true);
Expand Down
20 changes: 20 additions & 0 deletions oxenss/http/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

if(NOT TARGET libevent::core)
add_library(libevent_core INTERFACE)
pkg_check_modules(LIBEVENT_core libevent_core>=2.1 IMPORTED_TARGET REQUIRED)
target_link_libraries(libevent_core INTERFACE PkgConfig::LIBEVENT_core)
add_library(libevent::core ALIAS libevent_core)
endif()

add_library(http STATIC
http_client.cpp)

target_link_libraries(http
PRIVATE
version
oxen::logging
libevent::core
PUBLIC
cpr::cpr
quic::quic
)
206 changes: 206 additions & 0 deletions oxenss/http/http_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#include "http_client.h"
#include <curl/curl.h>
#include <chrono>
#include <oxen/log.hpp>
#include <cpr/ssl_options.h>
#include <oxenss/version.h>

namespace oxenss::http {

namespace log = oxen::log;

auto logcat = log::Cat("http");

static void curl_perform(int fd, short event, void* void_ctx);

struct curl_context {
Client& client;
curl_socket_t sockfd;
event* evt;

curl_context(Client& client, curl_socket_t fd) :
client{client},
sockfd{fd},
evt{event_new(client.loop->loop().get(), sockfd, 0, Client::curl_perform_c, this)} {}
~curl_context() {
event_del(evt);
event_free(evt);
}
};

void Client::curl_perform_c(int /*fd*/, short event, void* cctx) {
int running_handles;
int flags = 0;
auto* ctx = static_cast<curl_context*>(cctx);
auto& client = ctx->client;

if (event & EV_READ)
flags |= CURL_CSELECT_IN;
if (event & EV_WRITE)
flags |= CURL_CSELECT_OUT;

curl_multi_socket_action(client.curl_multi, ctx->sockfd, flags, &running_handles);
// Can't use `ctx` anymore because it might have been destroyed during the above call (typically
// because the socket is no longer being polled).

client.check_multi_info();
}

void Client::on_timeout_c(evutil_socket_t /*fd*/, short /*events*/, void* arg) {
auto& client = *static_cast<Client*>(arg);
int running_handles;
curl_multi_socket_action(client.curl_multi, CURL_SOCKET_TIMEOUT, 0, &running_handles);
client.check_multi_info();
}

int Client::start_timeout_c(CURLM* /*multi*/, long timeout_ms, void* userp) {
auto& client = *static_cast<Client*>(userp);
evtimer_del(client.ev_timeout);
if (timeout_ms >= 0) {
timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
if (timeout_ms == 0)
tv.tv_usec = 1; /* 0 means call socket_action asap */
evtimer_add(client.ev_timeout, &tv);
}
return 0;
}

int Client::handle_socket_c(
CURL* /*easy*/, curl_socket_t s, int action, void* userp, void* socketp) {
auto& client = *static_cast<Client*>(userp);
auto* curl_ctx = static_cast<curl_context*>(socketp);
int events = 0;

switch (action) {
case CURL_POLL_IN:
case CURL_POLL_OUT:
case CURL_POLL_INOUT:
if (!curl_ctx) {
curl_ctx = new curl_context{client, s};
curl_multi_assign(client.curl_multi, s, curl_ctx);
}

if (action != CURL_POLL_IN)
events |= EV_WRITE;
if (action != CURL_POLL_OUT)
events |= EV_READ;

events |= EV_PERSIST;

event_del(curl_ctx->evt);
event_assign(
curl_ctx->evt,
client.loop->loop().get(),
curl_ctx->sockfd,
events,
Client::curl_perform_c,
curl_ctx);
event_add(curl_ctx->evt, NULL);

break;
case CURL_POLL_REMOVE:
if (curl_ctx) {
curl_multi_assign(client.curl_multi, s, nullptr);
delete curl_ctx;
}
break;
default: log::error(logcat, "Unexpected socket action {} from libcurl", action);
}

return 0;
}

void Client::check_multi_info() {
int pending;
while (CURLMsg* message = curl_multi_info_read(curl_multi, &pending)) {
if (message->msg == CURLMSG_DONE) {
cpr::Session* raw_sess;
curl_easy_getinfo(message->easy_handle, CURLINFO_PRIVATE, &raw_sess);
assert(raw_sess);
auto session = raw_sess->shared_from_this();
assert(session);
auto resp = session->Complete(message->data.result);
auto it = active_reqs.find(session);
assert(it != active_reqs.end());
if (it->second) {
try {
it->second(std::move(resp));
} catch (const std::exception& e) {
log::error(logcat, "HTTP response handler raised exception: {}", e.what());
}
}
active_reqs.erase(it);
curl_multi_remove_handle(curl_multi, message->easy_handle);
break;
} else {
log::warning(
logcat,
"Unexpected/unhandled curl-multi message type: {}",
static_cast<int>(message->msg));
}
}
}

Client::Client(oxen::quic::Network* loop_) :
loop{std::move(loop_)},
ev_timeout{evtimer_new(loop->loop().get(), Client::on_timeout_c, this)} {
assert(loop);
curl_multi = curl_multi_init();
curl_multi_setopt(curl_multi, CURLMOPT_SOCKETDATA, this);
curl_multi_setopt(curl_multi, CURLMOPT_SOCKETFUNCTION, Client::handle_socket_c);
curl_multi_setopt(curl_multi, CURLMOPT_TIMERDATA, this);
curl_multi_setopt(curl_multi, CURLMOPT_TIMERFUNCTION, Client::start_timeout_c);
}

Client::~Client() {
loop->call_get([this] {
alive.reset();
for (auto& [session, cb] : active_reqs)
curl_multi_remove_handle(curl_multi, session->GetCurlHolder()->handle);
active_reqs.clear();
curl_multi_cleanup(curl_multi);
event_free(ev_timeout);
});
}

void Client::post(
response_callback cb,
std::string url,
std::string payload,
std::chrono::milliseconds timeout,
std::optional<std::string> host_override,
bool https_disable_validation) {
auto sess = std::make_shared<cpr::Session>();
sess->SetUrl(std::move(url));
cpr::Header header{
{"User-Agent", fmt::format("Oxen Storage Server/{}", STORAGE_SERVER_VERSION_STRING)},
{"Content-Type", "application/octet-stream"}};
if (host_override)
header["Host"] = *host_override;
sess->SetHeader(std::move(header));
sess->SetTimeout(timeout);
auto ssl_opts = cpr::Ssl(cpr::ssl::TLSv1_2{}); // TLSv1_2 means "1.2 or later"
if (https_disable_validation) {
ssl_opts.SetOption(cpr::ssl::VerifyHost{false});
ssl_opts.SetOption(cpr::ssl::VerifyPeer{false});
ssl_opts.SetOption(cpr::ssl::VerifyStatus{false});
}
sess->SetSslOptions(std::move(ssl_opts));
sess->SetRedirect(cpr::Redirect{0L});
sess->SetBody(std::move(payload));
curl_easy_setopt(sess->GetCurlHolder()->handle, CURLOPT_PRIVATE, sess.get());
sess->PreparePost();
loop->call([this,
alive = std::weak_ptr{alive},
sess = std::move(sess),
cb = std::move(cb)]() mutable {
if (alive.expired())
return; // this got destroyed before we got into the call
curl_multi_add_handle(curl_multi, sess->GetCurlHolder()->handle);
active_reqs.emplace(std::move(sess), std::move(cb));
});
}

} // namespace oxenss::http
67 changes: 67 additions & 0 deletions oxenss/http/http_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>

#include <cpr/response.h>
#include <cpr/session.h>
#include <curl/curl.h>
#include <oxen/quic/network.hpp>

namespace oxenss::http {

/// Async client for making outbound storage server HTTP post requests.
class Client {
public:
using response_callback = std::function<void(cpr::Response r)>;

// Starts a new client, attaching itself to the event loop and ready for requests.
explicit Client(oxen::quic::Network* loop);

// Non-copyable, non-movable
Client(const Client&) = delete;
Client(Client&&) = delete;
Client& operator=(const Client&) = delete;
Client& operator=(Client&&) = delete;

// Kills all current requests and shuts down. Callbacks on pending requests are *not* invoked.
~Client();

// Initiates a new POST request. When the request complete (or times out) `cb` will be invoked
// with the cpr::Response object. Note that cb is invoked inside the event loop context, so it
// should try to be fast and definitely not do anything blocking.
void post(
response_callback cb,
std::string url,
std::string payload,
std::chrono::milliseconds timeout,
std::optional<std::string> host_override = std::nullopt,
bool https_disable_validation = false);

private:
// FIXME: in future (dev, as of writing) versions of libquic we should a shared_ptr<Loop>
// instead of this raw Network pointer, but currently Network doesn't give us a public interface
// to accessing its Loop, so we whole the bigger Network here instead. (It is still named
// "loop", though, because we really only want a loop and Network has proxy functions for all
// the Loop functions we use that should work fine without needing other changes when this gets
// fixed in the future).
oxen::quic::Network* loop;
event* ev_timeout;
std::shared_ptr<const bool> alive = std::make_shared<bool>(true);
CURLM* curl_multi;
std::unordered_map<std::shared_ptr<cpr::Session>, response_callback> active_reqs;

friend struct curl_context;

static void curl_perform_c(int fd, short event, void* cctx);
static void on_timeout_c(evutil_socket_t fd, short events, void* arg);
static int start_timeout_c(CURLM* multi, long timeout_ms, void* userp);
static int handle_socket_c(CURL* easy, curl_socket_t s, int action, void* self, void* socketp);

void check_multi_info();
};

} // namespace oxenss::http
1 change: 1 addition & 0 deletions oxenss/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ target_link_libraries(rpc
crypto
server
snode
http
utils
logging
version
Expand Down
Loading

0 comments on commit 8616387

Please sign in to comment.