Skip to content

Commit

Permalink
Support Windows using IOCP (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Thalhammer committed Jul 13, 2024
1 parent 3156dc2 commit 52bf1dc
Show file tree
Hide file tree
Showing 22 changed files with 1,289 additions and 581 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compiler-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ jobs:
- { tag: "ubuntu-2004_clang-11", name: "Ubuntu 20.04 Clang 11", cxx: "/usr/bin/clang++-11", cc: "/usr/bin/clang-11", runs-on: "ubuntu-20.04" }
- { tag: "ubuntu-2004_clang-10", name: "Ubuntu 20.04 Clang 10", cxx: "/usr/bin/clang++-10", cc: "/usr/bin/clang-10", runs-on: "ubuntu-20.04" }
- { tag: "ubuntu-2004_gcc-10", name: "Ubuntu 20.04 G++ 10", cxx: "/usr/bin/g++-10", cc: "/usr/bin/gcc-10", runs-on: "ubuntu-20.04" }
#- { tag: "windows-2022_msvc17", name: "Windows Server 2022 MSVC 17", cxx: "", cc: "", runs-on: "windows-2022" }
#- { tag: "windows-2019_msvc16", name: "Windows Server 2019 MSVC 16", cxx: "", cc: "", runs-on: "windows-2019" }
- { tag: "windows-2022_msvc17", name: "Windows Server 2022 MSVC 17", cxx: "", cc: "", runs-on: "windows-2022" }
- { tag: "windows-2019_msvc16", name: "Windows Server 2019 MSVC 16", cxx: "", cc: "", runs-on: "windows-2019" }
- { tag: "macos-12_gcc-12", name: "MacOS 12 G++ 12", cxx: "g++-12", cc: "gcc-12", runs-on: "macos-12" }
#- { tag: "macos-12_gcc-13", name: "MacOS 12 G++ 13", cxx: "g++-13", cc: "gcc-13", runs-on: "macos-12" }
- { tag: "macos-12_gcc-14", name: "MacOS 12 G++ 14", cxx: "g++-14", cc: "gcc-14", runs-on: "macos-12" }
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ add_library(
${CMAKE_CURRENT_SOURCE_DIR}/src/address.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/dns.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/file.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_generic_unix.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_select.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_uring.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_iocp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_service.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/socket.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp)
target_link_libraries(asyncpp_io PUBLIC asyncpp OpenSSL::SSL Threads::Threads)
if(WIN32)
target_link_libraries(asyncpp_io PUBLIC wsock32 ws2_32 ntdll)
endif()
target_include_directories(asyncpp_io
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_features(asyncpp_io PUBLIC cxx_std_20)
Expand Down
12 changes: 6 additions & 6 deletions include/asyncpp/io/address.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace asyncpp::io {
constexpr auto parse_part = [](std::string_view::const_iterator& it, std::string_view::const_iterator end) {
if (it == end || (*it < '0' && *it > '9')) return -1;
int32_t result = 0;
while (*it >= '0' && *it <= '9') {
while (it != end && *it >= '0' && *it <= '9') {
result = (result * 10) + (*it - '0');
it++;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ namespace asyncpp::io {

constexpr std::span<const uint8_t, 16> data() const noexcept { return m_data; }
constexpr std::span<const uint8_t, 4> ipv4_data() const noexcept {
return std::span<const uint8_t, 4>{&m_data[12], &m_data[16]};
return std::span<const uint8_t, 4>{&m_data[12], &m_data[12] + 4};
}

constexpr uint64_t subnet_prefix() const noexcept {
Expand Down Expand Up @@ -189,7 +189,7 @@ namespace asyncpp::io {
}
constexpr ipv4_address mapped_ipv4() const noexcept {
if (!is_ipv4_mapped()) return ipv4_address();
return ipv4_address(std::span<const uint8_t, 4>(&m_data[12], &m_data[16]));
return ipv4_address(std::span<const uint8_t, 4>(&m_data[12], &m_data[12] + 4));
}

std::string to_string(bool full = false) const {
Expand Down Expand Up @@ -249,7 +249,7 @@ namespace asyncpp::io {
auto it = str.begin();
auto part_start = it;
bool is_v4_interop = false;
if (*it == ':') {
if (it != str.end() && *it == ':') {
dcidx = idx++;
it++;
if (it == str.end() || *it != ':') return std::nullopt;
Expand Down Expand Up @@ -508,7 +508,7 @@ namespace std {
template<>
struct hash<asyncpp::io::uds_address> {
size_t operator()(const asyncpp::io::uds_address& x) const noexcept {
size_t res = 0;
size_t res{};
for (auto e : x.data())
res = res ^ (e + 0x9e3779b99e3779b9ull + (res << 6) + (res >> 2));
return res;
Expand All @@ -518,7 +518,7 @@ namespace std {
template<>
struct hash<asyncpp::io::address> {
size_t operator()(const asyncpp::io::address& x) const noexcept {
size_t res;
size_t res{};
switch (x.type()) {
case asyncpp::io::address_type::ipv4: res = std::hash<asyncpp::io::ipv4_address>{}(x.ipv4()); break;
case asyncpp::io::address_type::ipv6: res = std::hash<asyncpp::io::ipv6_address>{}(x.ipv6()); break;
Expand Down
2 changes: 1 addition & 1 deletion include/asyncpp/io/detail/cancel_awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace asyncpp::io::detail {
bool await_ready() const noexcept { return m_child.await_ready(); }
bool await_suspend(coroutine_handle<> hdl) {
if (m_stop_token.stop_requested()) {
m_child.m_completion.result = -ECANCELED;
m_child.m_completion.result = std::make_error_code(std::errc::operation_canceled);
return false;
}
auto res = m_child.await_suspend(hdl);
Expand Down
66 changes: 59 additions & 7 deletions include/asyncpp/io/detail/io_engine.h
Original file line number Diff line number Diff line change
@@ -1,27 +1,62 @@
#pragma once
#include <asyncpp/io/endpoint.h>

#include <cstddef>
#include <ios>
#include <memory>
#include <system_error>

namespace asyncpp::io::detail {
class io_engine {
public:
#ifndef _WIN32
using file_handle_t = int;
constexpr static file_handle_t invalid_file_handle = -1;
using socket_handle_t = int;
constexpr static socket_handle_t invalid_socket_handle = -1;
#else
using file_handle_t = void*;
constexpr static file_handle_t invalid_file_handle = reinterpret_cast<void*>(static_cast<long long>(-1));
using socket_handle_t = unsigned long long;
constexpr static socket_handle_t invalid_socket_handle = ~static_cast<socket_handle_t>(0);

#endif
enum class fsync_flags { none, datasync };
enum class socket_type { stream, dgram, seqpacket };

struct completion_data {
completion_data(void (*cb)(void*) = nullptr, void* udata = nullptr) noexcept
: callback(cb), userdata(udata) {}

// Private data the engine can use to associate state
alignas(std::max_align_t) std::array<std::byte, 256> engine_state{};

// Info provided by caller
void (*callback)(void*);
void* userdata;
void (*callback)(void*){};
void* userdata{};

// Filled by io_engine
int result;
std::error_code result{};
union {
socket_handle_t result_handle{};
size_t result_size;
};

// Private data the engine can use to associate state
void* engine_state{};
template<typename T>
T* es_init() noexcept {
static_assert(std::is_standard_layout_v<T> && std::is_trivially_copyable_v<T> &&
std::is_trivially_destructible_v<T>);
static_assert(sizeof(T) <= std::tuple_size_v<decltype(engine_state)>);
engine_state.fill(std::byte{});
return new (engine_state.data()) T();
}
template<typename T>
T* es_get() noexcept {
static_assert(std::is_standard_layout_v<T> && std::is_trivially_copyable_v<T> &&
std::is_trivially_destructible_v<T>);
static_assert(sizeof(T) <= std::tuple_size_v<decltype(engine_state)>);
return reinterpret_cast<T*>(engine_state.data());
}
};

public:
Expand All @@ -33,6 +68,18 @@ namespace asyncpp::io::detail {
virtual void wake() = 0;

// Networking api
virtual socket_handle_t socket_create(address_type domain, socket_type type) = 0;
virtual std::pair<socket_handle_t, socket_handle_t> socket_create_connected_pair(address_type domain,
socket_type type) = 0;
virtual void socket_register(socket_handle_t socket) = 0;
virtual void socket_release(socket_handle_t socket) = 0;
virtual void socket_close(socket_handle_t socket) = 0;
virtual void socket_bind(socket_handle_t socket, endpoint ep) = 0;
virtual void socket_listen(socket_handle_t socket, size_t backlog) = 0;
virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0;
virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0;
virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0;
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;
virtual bool enqueue_recv(socket_handle_t socket, void* buf, size_t len, completion_data* cd) = 0;
Expand All @@ -43,8 +90,13 @@ namespace asyncpp::io::detail {
completion_data* cd) = 0;

// Filesystem IO
virtual bool enqueue_readv(file_handle_t fd, void* buf, size_t len, off_t offset, completion_data* cd) = 0;
virtual bool enqueue_writev(file_handle_t fd, const void* buf, size_t len, off_t offset,
virtual file_handle_t file_open(const char* filename, std::ios_base::openmode mode) = 0;
virtual void file_register(file_handle_t fd) = 0;
virtual void file_release(file_handle_t fd) = 0;
virtual void file_close(file_handle_t fd) = 0;
virtual uint64_t file_size(file_handle_t fd) = 0;
virtual bool enqueue_readv(file_handle_t fd, void* buf, size_t len, uint64_t offset, completion_data* cd) = 0;
virtual bool enqueue_writev(file_handle_t fd, const void* buf, size_t len, uint64_t offset,
completion_data* cd) = 0;
virtual bool enqueue_fsync(file_handle_t fd, fsync_flags flags, completion_data* cd) = 0;

Expand Down
10 changes: 8 additions & 2 deletions include/asyncpp/io/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ namespace asyncpp::io {
m_ipv6 = {addr.ipv6(), port};
m_type = address_type::ipv6;
break;
#ifndef _WIN32
case address_type::uds:
m_uds = addr.uds();
m_type = address_type::uds;
break;
#endif
}
}
explicit constexpr endpoint(ipv4_endpoint ep) noexcept : m_ipv4(ep), m_type(address_type::ipv4) {}
Expand All @@ -128,15 +130,19 @@ namespace asyncpp::io {
constexpr ipv4_endpoint ipv4() const noexcept {
switch (m_type) {
case address_type::ipv4: return m_ipv4;
case address_type::ipv6:
case address_type::ipv6: return {};
#ifndef _WIN32
case address_type::uds: return {};
#endif
}
}
constexpr ipv6_endpoint ipv6() const noexcept {
switch (m_type) {
case address_type::ipv4: return {};
case address_type::ipv6: return m_ipv6;
#ifndef _WIN32
case address_type::uds: return {};
#endif
}
}
#ifndef _WIN32
Expand Down Expand Up @@ -213,7 +219,7 @@ namespace std {
template<>
struct hash<asyncpp::io::endpoint> {
size_t operator()(const asyncpp::io::endpoint& x) const noexcept {
size_t res;
size_t res{};
switch (x.type()) {
case asyncpp::io::address_type::ipv4: res = std::hash<asyncpp::io::ipv4_endpoint>{}(x.ipv4()); break;
case asyncpp::io::address_type::ipv6: res = std::hash<asyncpp::io::ipv6_endpoint>{}(x.ipv6()); break;
Expand Down
33 changes: 15 additions & 18 deletions include/asyncpp/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_read_awaitable(io_engine* engine, io_engine::file_handle_t fd, void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
file_read_awaitable(io_engine* engine, io_engine::file_handle_t fd, void* buf, size_t len, uint64_t offset,
std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_buf(buf), m_len(len), m_offset(offset), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -112,10 +112,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_readv(m_fd, m_buf, m_len, m_offset, &m_completion);
}
size_t await_resume() {
if (m_completion.result >= 0) return static_cast<size_t>(m_completion.result);
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return m_completion.result_size;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
return 0;
}
};
Expand All @@ -140,8 +139,8 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_write_awaitable(io_engine* engine, io_engine::file_handle_t fd, const void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
file_write_awaitable(io_engine* engine, io_engine::file_handle_t fd, const void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_buf(buf), m_len(len), m_offset(offset), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -150,10 +149,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_writev(m_fd, m_buf, m_len, m_offset, &m_completion);
}
size_t await_resume() {
if (m_completion.result >= 0) return static_cast<size_t>(m_completion.result);
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return m_completion.result_size;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
return 0;
}
};
Expand All @@ -175,7 +173,7 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_fsync_awaitable(io_engine* engine, io_engine::file_handle_t fd, std::error_code* ec) noexcept
file_fsync_awaitable(io_engine* engine, io_engine::file_handle_t fd, std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -184,10 +182,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_fsync(m_fd, io_engine::fsync_flags::none, &m_completion);
}
void await_resume() {
if (m_completion.result >= 0) return;
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
}
};
} // namespace detail
Expand Down Expand Up @@ -269,7 +266,7 @@ namespace asyncpp::io {
file(const file&) = delete;
file(file&&) noexcept;
file& operator=(const file&) = delete;
file& operator=(file&&);
file& operator=(file&&) noexcept;
~file();

[[nodiscard]] io_service& service() const noexcept { return *m_io; }
Expand Down
Loading

0 comments on commit 52bf1dc

Please sign in to comment.