Skip to content

Commit

Permalink
Support thread safety customization for connection_pool
Browse files Browse the repository at this point in the history
Use mutex customization provided by resource_pool.
  • Loading branch information
elsid committed Apr 6, 2020
1 parent bca7524 commit e716228
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 16 deletions.
33 changes: 21 additions & 12 deletions include/ozo/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <ozo/transaction_status.h>
#include <ozo/asio.h>
#include <ozo/connector.h>

#include <yamail/resource_pool/async/pool.hpp>
#include <ozo/core/thread_safety.h>
#include <ozo/detail/connection_pool.h>

namespace ozo {

Expand Down Expand Up @@ -258,9 +258,9 @@ class pooled_connection {
template <typename ...Ts>
struct is_connection<pooled_connection<Ts...>> : std::true_type {};

template <typename Pool>
struct connection_traits<yamail::resource_pool::handle<Pool>> :
connection_traits<typename yamail::resource_pool::handle<Pool>::value_type> {};
template <typename T>
struct connection_traits<yamail::resource_pool::handle<T>> :
connection_traits<typename yamail::resource_pool::handle<T>::value_type> {};

/**
* @brief Connection pool implementation
Expand All @@ -281,6 +281,8 @@ struct connection_traits<yamail::resource_pool::handle<Pool>> :
* `connection_pool` models `ConnectionSource` concept itself using underlying `ConnectionSource`.
*
* @tparam Source --- underlying `ConnectionSource` which is being used to create connection to a database.
* @tparam ThreadSafety --- admissibility to use in multithreaded environment without additional synchronization.
* Thread safe by default.
*
* ###Example
*
Expand All @@ -295,28 +297,30 @@ struct connection_traits<yamail::resource_pool::handle<Pool>> :
* @ingroup group-connection-types
* @models{ConnectionSource}
*/
template <typename Source>
template <typename Source, typename ThreadSafety = std::decay_t<decltype(thread_safe)>>
class connection_pool {
static_assert(ConnectionSource<Source>, "should model ConnectionSource concept");

public:
using connection_rep_type = ozo::connection_rep<typename ozo::unwrap_type<ozo::connection_type<Source>>::oid_map_type>;

using impl_type = yamail::resource_pool::async::pool<connection_rep_type>;
using impl_type = detail::get_connection_pool_impl_t<connection_rep_type, ThreadSafety>;
/**
* Construct a new connection pool object
*
* @param source --- `ConnectionSource` object which is being used to create connection to a database.
* @param config --- pool configuration.
* @param thread_safety -- admissibility to use in multithreaded environment without additional synchronization.
* Thread safe by default (`ozo::thread_safety<true>`).
*/
connection_pool(Source source, const connection_pool_config& config)
connection_pool(Source source, const connection_pool_config& config, const ThreadSafety& /*thread_safety*/ = ThreadSafety{})
: impl_(config.capacity, config.queue_capacity, config.idle_timeout),
source_(std::move(source)) {}

/**
* Type of connection depends on connection type of Source. The definition is used to model `ConnectionSource`
*/
using connection_type = std::shared_ptr<pooled_connection<typename impl_type::handle>>;
using connection_type = std::shared_ptr<pooled_connection<yamail::resource_pool::handle<connection_rep_type>>>;

/**
* Get connection is bound to the given `io_context` object.
Expand Down Expand Up @@ -387,14 +391,19 @@ constexpr auto ConnectionPool = is_connection_pool<std::decay_t<T>>::value;
*
* @param source --- connection source object which is being used to create connection to a database.
* @param config --- pool configuration.
* @param thread_safety --- admissibility to use in multithreaded environment without additional synchronization.
* Thread safe by default (`ozo::thread_safety<true>`).
*
* @return `ozo::connection_pool` object.
* @ingroup group-connection-functions
* @relates ozo::connection_pool
*/
template <typename ConnectionSource>
auto make_connection_pool(ConnectionSource&& source, const connection_pool_config& config) {
template <typename ConnectionSource, typename ThreadSafety = decltype(thread_safe)>
auto make_connection_pool(ConnectionSource&& source, const connection_pool_config& config,
const ThreadSafety& thread_safety = ThreadSafety{}) {
static_assert(ozo::ConnectionSource<ConnectionSource>, "source should model ConnectionSource concept");
return connection_pool<std::decay_t<ConnectionSource>>{std::forward<ConnectionSource>(source), config};
return connection_pool<std::decay_t<ConnectionSource>, std::decay_t<ThreadSafety>>{
std::forward<ConnectionSource>(source), config, thread_safety};
}

} // namespace ozo
Expand Down
25 changes: 25 additions & 0 deletions include/ozo/core/thread_safety.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <type_traits>

namespace ozo {

/**
* @brief defines admissibility to use in multithreaded environment without additional synchronization
*
* Represents binary state. True enables memory sychronization for underlying types to support
* safe access in multithreaded environment. False disables memory synchronization.
* @ingroup group-core-types
*
* @tparam enabled --- binary state.
*/
template <bool enabled>
struct thread_safety : std::bool_constant<enabled> {
constexpr auto operator !() const noexcept {
return thread_safety<!enabled>{};
}
};

constexpr thread_safety<true> thread_safe;

} // namespace ozo
28 changes: 28 additions & 0 deletions include/ozo/detail/connection_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <ozo/core/thread_safety.h>
#include <ozo/detail/stub_mutex.h>

#include <yamail/resource_pool/async/pool.hpp>

#include <mutex>

namespace ozo::detail {

template <typename ConnectionRepType, typename ThreadSafety>
struct get_connection_pool_impl;

template <typename ConnectionRepType>
struct get_connection_pool_impl<ConnectionRepType, thread_safety<true>> {
using type = yamail::resource_pool::async::pool<ConnectionRepType, std::mutex>;
};

template <typename ConnectionRepType>
struct get_connection_pool_impl<ConnectionRepType, thread_safety<false>> {
using type = yamail::resource_pool::async::pool<ConnectionRepType, stub_mutex>;
};

template <typename ConnectionRepType, typename ThreadSafety>
using get_connection_pool_impl_t = typename get_connection_pool_impl<ConnectionRepType, std::decay_t<ThreadSafety>>::type;

} // namespace ozo::detail
12 changes: 12 additions & 0 deletions include/ozo/detail/stub_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

namespace ozo::detail {

struct stub_mutex {
stub_mutex() = default;
stub_mutex(const stub_mutex&) = delete;
constexpr void lock() const noexcept {}
constexpr void unlock() const noexcept {}
};

} // namespace ozo::detail
8 changes: 4 additions & 4 deletions include/ozo/impl/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ auto wrap_pooled_connection_handler(const Executor& ex, Source&& source, TimeCon

namespace ozo {

template <typename Pool>
struct unwrap_impl<yamail::resource_pool::handle<Pool>> {
template <typename V>
struct unwrap_impl<yamail::resource_pool::handle<V>> {
template <typename T>
static constexpr decltype(auto) apply(T&& handle) {
return *handle;
}
};

template <typename Source>
template <typename Source, typename ThreadSafety>
template <typename TimeConstraint, typename Handler>
void connection_pool<Source>::operator ()(io_context& io, TimeConstraint t, Handler&& handler) {
void connection_pool<Source, ThreadSafety>::operator ()(io_context& io, TimeConstraint t, Handler&& handler) {
static_assert(ozo::TimeConstraint<TimeConstraint>, "should model TimeConstraint concept");
impl_.get_auto_recycle(
io,
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ if(OZO_BUILD_PG_TESTS)
integration/retry_integration.cpp
integration/cancel_integration.cpp
integration/role_based_integration.cpp
integration/connection_pool_integration.cpp
)
add_definitions(-DOZO_PG_TEST_CONNINFO="${OZO_PG_TEST_CONNINFO}")
endif()
Expand Down
150 changes: 150 additions & 0 deletions tests/integration/connection_pool_integration.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <ozo/connection_info.h>
#include <ozo/connection_pool.h>
#include <ozo/query_builder.h>
#include <ozo/request.h>
#include <ozo/shortcuts.h>

#include <boost/asio/spawn.hpp>

#include <algorithm>
#include <future>

#include <gtest/gtest.h>
#include <gmock/gmock.h>

namespace {

namespace asio = boost::asio;

using namespace testing;

TEST(connection_pool_integration, get_connection_twice_should_get_the_same) {
using namespace ozo::literals;
using namespace std::chrono_literals;

ozo::io_context io;
ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO);
ozo::connection_pool_config config;
config.capacity = 1;
config.queue_capacity = 0;
ozo::connection_pool pool(conn_info, config, !ozo::thread_safe);

asio::spawn(io, [&] (asio::yield_context yield) {
const auto get_pg_backend_pid = [&] (int& pg_backend_pid) {
ozo::rows_of<int> result;
ozo::error_code ec;
const auto conn = ozo::request(
pool[io],
"SELECT pg_backend_pid()"_SQL,
ozo::deadline(1s),
ozo::into(result),
yield[ec]
);

ASSERT_FALSE(ec) << ec.message();
ASSERT_FALSE(ozo::is_null_recursive(conn));
ASSERT_EQ(1u, result.size());

pg_backend_pid = std::get<0>(result[0]);
};

int pg_backend_pid1 = 0;
get_pg_backend_pid(pg_backend_pid1);
int pg_backend_pid2 = 0;
get_pg_backend_pid(pg_backend_pid2);

ASSERT_NE(pg_backend_pid1, 0);
EXPECT_EQ(pg_backend_pid1, pg_backend_pid2);
});

io.run();
}

TEST(connection_pool_integration, request_should_wait_until_connection_is_available) {
using namespace ozo::literals;
using namespace std::chrono_literals;

ozo::io_context io;
ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO);
ozo::connection_pool_config config;
config.capacity = 1;
config.queue_capacity = 1;
ozo::connection_pool pool(conn_info, config, !ozo::thread_safe);
std::array<int, 2> pg_backend_pids {{0, 0}};

for (auto& pg_backend_pid : pg_backend_pids) {
asio::spawn(io, [&, pg_backend_pid = &pg_backend_pid] (asio::yield_context yield) {
ozo::rows_of<int> result;
ozo::error_code ec;
const auto conn = ozo::request(
pool[io],
"SELECT pg_backend_pid()"_SQL,
ozo::deadline(1s),
ozo::into(result),
yield[ec]
);

ASSERT_FALSE(ec) << ec.message();
ASSERT_FALSE(ozo::is_null_recursive(conn));
ASSERT_EQ(1u, result.size());

*pg_backend_pid = std::get<0>(result[0]);
});
}

io.run();

ASSERT_NE(pg_backend_pids[0], 0);
EXPECT_EQ(pg_backend_pids[0], pg_backend_pids[1]);
}

TEST(connection_pool_integration, should_serve_concurrent_requests) {
using namespace ozo::literals;
using namespace std::chrono_literals;

std::vector<ozo::io_context> ios(3);
ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO);
ozo::connection_pool_config config;
config.capacity = 1;
config.queue_capacity = 2;
ozo::connection_pool pool(conn_info, config);
std::vector<std::future<int>> futures;

for (auto& io : ios) {
futures.emplace_back(std::async(std::launch::async, [&, io_ptr = &io] {
auto& io = *io_ptr;
auto guard = boost::asio::make_work_guard(io);
int pg_backend_pid = 0;

asio::spawn(io, [&] (asio::yield_context yield) {
ozo::rows_of<int> result;
ozo::error_code ec;
const auto conn = ozo::request(
pool[io],
"SELECT pg_backend_pid()"_SQL,
ozo::deadline(1s),
ozo::into(result),
yield[ec]
);

ASSERT_FALSE(ec) << ec.message();
ASSERT_FALSE(ozo::is_null_recursive(conn));
ASSERT_EQ(1u, result.size());

pg_backend_pid = std::get<0>(result[0]);
guard.reset();
});

io.run();

return pg_backend_pid;
}));
}

std::vector<int> results;
std::transform(futures.begin(), futures.end(), std::back_inserter(results), [] (auto& v) { return v.get(); });
ASSERT_NE(results.front(), 0);
EXPECT_THAT(results, Each(results.front()));
}

} // namespace

0 comments on commit e716228

Please sign in to comment.