Skip to content

Commit

Permalink
Feat: Added reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed Apr 18, 2024
1 parent b18afbb commit dfaeff3
Show file tree
Hide file tree
Showing 15 changed files with 419 additions and 100 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/lb-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches: [ "main", "ci", "dev" ]

jobs:
build:
test-build:
runs-on: ubuntu-latest

steps:
Expand All @@ -26,3 +26,12 @@ jobs:
- name: Do build
run: bash lbbuild.sh build Release


run-unit-tests:
runs-on: ubuntu-latest
needs: test-build
steps:
- uses: actions/checkout@v3
- name: Run unit tests
run: bash lbbuild.sh test Release

11 changes: 6 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ project(lb2)
message("Building with CMake version: ${CMAKE_VERSION}")
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# set(CMAKE_CXX_CPPLINT "cpplint")

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lb2)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lb2)
Expand All @@ -18,7 +18,7 @@ file(GLOB lb2_sources
src/lb/*.cpp
src/lb/tcp/*.cpp
)
add_library(lb2 SHARED ${lb2_sources})
add_library(lb2 ${lb2_sources})
target_include_directories(lb2 PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(
lb2 PUBLIC
Expand All @@ -44,14 +44,15 @@ add_custom_target(

# Main app
add_executable(lb_app src/main.cpp)
target_link_libraries(lb_app lb2)
target_compile_definitions(lb_app PUBLIC LB_APP_LOGGING)
target_link_libraries(lb_app PUBLIC lb2)
add_dependencies(lb_app copy_configs)

# Setting up unit-tests
enable_testing()
include(GoogleTest)
file(GLOB unit_tests_sources tests/*.cpp)
add_executable(lb_tests ${unit_tests_sources})
target_link_libraries(lb_tests PUBLIC lb2 ctre::ctre GTest::gtest_main)
target_link_libraries(lb_tests PUBLIC lb2 GTest::gtest spdlog::spdlog yaml-cpp::yaml-cpp)
add_dependencies(lb_tests copy_configs)
gtest_discover_tests(lb_tests)
gtest_discover_tests(lb_tests)
27 changes: 21 additions & 6 deletions lbbuild.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
#!/bin/bash

function BuildApp() {
echo "Build type: $BUILD_TYPE"
mkdir -pv build
conan profile detect --force
conan install . --output-folder=build --build=missing --settings=build_type=$BUILD_TYPE
cd build
cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_TOOLCHAIN_FILE=conan_toolchain.cmake
cmake --build . -j $(nproc)
}

if ! command -v conan &> /dev/null
then
Expand All @@ -18,13 +27,19 @@ CMD=$1
case $CMD in
build )
BUILD_TYPE="${2:-Debug}"
echo "Build type: $BUILD_TYPE"
mkdir -pv build
conan profile detect --force
conan install . --output-folder=build --build=missing --settings=build_type=$BUILD_TYPE
BuildApp
exit $?
;;
test )
BUILD_TYPE="${2:-Debug}"
BuildApp || exit $?
cd build
cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_TOOLCHAIN_FILE=conan_toolchain.cmake
cmake --build . -j $(nproc)
LB_CONFIG="${3:-lb2/configs/config.yaml}"
export LB_CONFIG
ctest --no-compress-output --rerun-failed --output-log-on-failure --output-on-failure
exit $?
;;
* )
;;
esac

85 changes: 60 additions & 25 deletions src/lb/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,35 @@ void Application::RegisterConnector(tcp::Connector* connector)
connector_ptr = connector;
}

void MakeThisThreadMaximumPriority()
{
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO);
pthread_setschedparam(pthread_self(), SCHED_FIFO, &param);
}

void ThreadRoutine(boost::asio::io_context& ioc, boost::barrier& barrier)
{
barrier.wait();
MakeThisThreadMaximumPriority();

Application& app = Application::GetInstance();
try {
INFO("Starting io_context");
ioc.run();
INFO("Exiting io_context");
} catch (const std::exception& exc) {
CRITICAL("Exception at io_context: {}", exc.what());
app.SetExitCode(EXIT_FAILURE);
app.Terminate();
} catch (...) {
CRITICAL("Unknown exception at io_context");
app.Terminate();
app.SetExitCode(EXIT_FAILURE);
}
app.SetExitCode(EXIT_SUCCESS);
}

void Application::Start()
{
INFO("Starting app");
Expand All @@ -219,19 +248,10 @@ void Application::Start()
boost::barrier barrier(threads_num);
for (std::size_t i = 0; i + 1 < threads_num; ++i) {
threads.create_thread([this, &barrier](){
barrier.wait();
nice(-19);
INFO("Starting io_context");
io_context.run();
INFO("io_context stopped");
ThreadRoutine(io_context, barrier);
});
}
barrier.wait();
nice(-19);
INFO("Main thread starting io_context");
io_context.run();
INFO("Main thread io_context stopped");
threads.interrupt_all();
ThreadRoutine(io_context, barrier);
threads.join_all();
INFO("Finishing app");
}
Expand All @@ -246,6 +266,31 @@ tcp::Connector& Application::Connector()
return *connector_ptr;
}

void Application::SetExitCode(int code)
{
if (code != EXIT_SUCCESS) {
exit_code = code;
}
}

int Application::GetExitCode() const
{
return exit_code;
}

void ConfigureApplication(lb::Application& app, const std::string& config_path) {
try {
app.LoadConfig(config_path);
spdlog::info("Start configuring logs");
ConfigureLogging(app.Config());
DEBUG("Logging configured");
} catch (const std::exception& exc) {
CRITICAL("Exception while configuring app: {}", exc.what());
CRITICAL("Exit code: {}", EXIT_FAILURE);
std::abort();
}
}

int run(int argc, char** argv)
{
opt::options_description desc("Allowed options");
Expand Down Expand Up @@ -274,20 +319,10 @@ int run(int argc, char** argv)
}
std::string config_path = parsed_options["config"].as<std::string>();
Application& app = Application::GetInstance();
try {
app.LoadConfig(config_path);
spdlog::info("Start configuring logs");
ConfigureLogging(app.Config());
DEBUG("Logging configured");
app.Start();
} catch (const std::exception& exc) {
CRITICAL("Exception: {}", exc.what());
CRITICAL("Exit code: {}", EXIT_FAILURE);
return EXIT_FAILURE;
}

INFO("Exit code: {}", EXIT_SUCCESS);
return EXIT_SUCCESS;
ConfigureApplication(app, config_path);
app.Start();
INFO("Exit code: {}", app.GetExitCode());
return app.GetExitCode();
}

} // namespace lb
11 changes: 10 additions & 1 deletion src/lb/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <yaml-cpp/yaml.h>

#include <lb/tcp/connector.hpp>
#include <atomic>

namespace lb {

Expand All @@ -27,8 +28,12 @@ class Application {
tcp::Connector& Connector();

void Terminate();

void SetExitCode(int code);

int GetExitCode() const;
private:
friend int run(int argc, char** argv);
friend void ConfigureApplication(Application& app, const std::string& config_path);
Application();
void LoadConfig(const std::string& config_path);
void RegisterConnector(tcp::Connector* connector);
Expand All @@ -37,8 +42,12 @@ class Application {
boost::asio::io_context io_context;
boost::thread_group threads;
tcp::Connector* connector_ptr; // not owner
std::atomic<int> exit_code = EXIT_SUCCESS;
};

void ConfigureApplication(Application& app, const std::string& config_path);

int run(int argc, char** argv);


} // namespace lb
4 changes: 3 additions & 1 deletion src/lb/logging.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once


#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#include <spdlog/spdlog.h>
#include <lb/formatters.hpp>


#define TRACE(...) SPDLOG_LOGGER_TRACE(spdlog::get("multi-sink"), __VA_ARGS__)
#define DEBUG(...) SPDLOG_LOGGER_DEBUG(spdlog::get("multi-sink"), __VA_ARGS__)
#define INFO(...) SPDLOG_LOGGER_INFO(spdlog::get("multi-sink"), __VA_ARGS__)
Expand All @@ -21,4 +23,4 @@
#define SCRITICAL(...) SPDLOG_LOGGER_CRITICAL(spdlog::get("multi-sink"), STR_WITH_STACKTRACE(__VA_ARGS__))

#define EXCEPTION(...) throw std::runtime_error(fmt::format(__VA_ARGS__))
#define STACKTRACE(...) throw std::runtime_error(STR_WITH_STACKTRACE(__VA_ARGS__));
#define STACKTRACE(...) throw std::runtime_error(STR_WITH_STACKTRACE(__VA_ARGS__))
16 changes: 12 additions & 4 deletions src/lb/tcp/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
{
// TODO: selection of backend
DEBUG("In connector");
Backend backend = selector->SelectBackend(client_socket);
Backend backend = selector->SelectBackend(client_socket.remote_endpoint());

if (backend.IsIpEndpoint()) {
DEBUG("Is ip endpoint");
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(boost::asio::make_strand(ioc));
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(client_socket.get_executor());

server_socket->async_connect(
backend.AsEndpoint(),
[this, server_socket, client_socket=std::move(client_socket)] (const boost::system::error_code& error) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] (const boost::system::error_code& error) mutable
{
if (error) {
ERROR("{}", error.message());
if (error == boost::asio::error::connection_refused) {
selector->ExcludeBackend(backend);
MakeAndRunSession(std::move(client_socket));
}
return;
}
auto connection = std::make_shared<Session>(std::move(client_socket), std::move(*server_socket));
Expand All @@ -41,10 +45,14 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
ResolverQuery resolve_query{url.Hostname(), url.Port()};
resolver.async_resolve(
resolve_query,
[this, server_socket, client_socket=std::move(client_socket)](const boost::system::error_code& error, ResolverResults resolver_results) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)](const boost::system::error_code& error, ResolverResults resolver_results) mutable
{
if (error) {
ERROR("Resolve error: {}", error.message());
if (error == boost::asio::error::connection_refused) {
selector->ExcludeBackend(backend);
MakeAndRunSession(std::move(client_socket));
}
return;
}
DEBUG("Resolved successfully!");
Expand Down
25 changes: 24 additions & 1 deletion src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ const Backend::UrlType& Backend::AsUrl() const
return std::get<UrlType>(value);
}

bool Backend::operator==(const Backend& other) const
{
if (IsUrl() && other.IsUrl()) {
return AsUrl() == other.AsUrl();
}

if (IsIpEndpoint() && other.IsIpEndpoint()) {
return AsEndpoint() == other.AsEndpoint();
}

return false;
}

std::ostream& operator<<(std::ostream& out, const Backend& backend)
{
if (backend.IsIpEndpoint()) {
Expand Down Expand Up @@ -114,14 +127,24 @@ void RoundRobinSelector::Configure(const YAML::Node &balancing_node)
}
}

Backend RoundRobinSelector::SelectBackend(const SocketType &client_socket)
Backend RoundRobinSelector::SelectBackend(const EndpointType &notused)
{
boost::mutex::scoped_lock lock(mutex);
counter = (counter + 1) % backends_.size();
DEBUG("Round robin selected: {}", counter);
return backends_[counter];
}

void RoundRobinSelector::ExcludeBackend(const Backend& backend)
{
boost::mutex::scoped_lock lock(mutex);
backends_.erase(std::remove(backends_.begin(), backends_.end(), backend), backends_.end());
INFO("Excluded backend: {}", backend);
if (backends_.empty()) {
EXCEPTION("All backends are excluded!");
}
}

SelectorType RoundRobinSelector::Type() const
{
return SelectorType::ROUND_ROBIN;
Expand Down
8 changes: 6 additions & 2 deletions src/lb/tcp/selectors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Backend {
const UrlType& AsUrl() const;
const EndpointType& AsEndpoint() const;

bool operator==(const Backend& other) const;
private:
std::variant<EndpointType, UrlType> value;
};
Expand All @@ -38,7 +39,8 @@ enum class SelectorType {

struct ISelector {
virtual void Configure(const YAML::Node& config) = 0;
virtual Backend SelectBackend(const boost::asio::ip::tcp::socket& client_socket) = 0;
virtual Backend SelectBackend(const boost::asio::ip::tcp::endpoint& client_socket) = 0;
virtual void ExcludeBackend(const Backend& backend) = 0;
virtual SelectorType Type() const = 0;
virtual ~ISelector() = default;
};
Expand All @@ -54,7 +56,9 @@ class RoundRobinSelector final : public ISelector {
public:
void Configure(const YAML::Node& config) override;

Backend SelectBackend(const SocketType& client_socket) override;
Backend SelectBackend(const EndpointType& client_endpoint) override;

void ExcludeBackend(const Backend& backend) override;

SelectorType Type() const override;

Expand Down
Loading

0 comments on commit dfaeff3

Please sign in to comment.