From e716228678f11cd9029c45bf4e8500a4bd5f5f5f Mon Sep 17 00:00:00 2001 From: Roman Siromakha Date: Fri, 15 Nov 2019 21:37:55 +0100 Subject: [PATCH] Support thread safety customization for connection_pool Use mutex customization provided by resource_pool. --- include/ozo/connection_pool.h | 33 ++-- include/ozo/core/thread_safety.h | 25 +++ include/ozo/detail/connection_pool.h | 28 ++++ include/ozo/detail/stub_mutex.h | 12 ++ include/ozo/impl/connection_pool.h | 8 +- tests/CMakeLists.txt | 1 + .../connection_pool_integration.cpp | 150 ++++++++++++++++++ 7 files changed, 241 insertions(+), 16 deletions(-) create mode 100644 include/ozo/core/thread_safety.h create mode 100644 include/ozo/detail/connection_pool.h create mode 100644 include/ozo/detail/stub_mutex.h create mode 100644 tests/integration/connection_pool_integration.cpp diff --git a/include/ozo/connection_pool.h b/include/ozo/connection_pool.h index 96e1e4c51..f93f2aadb 100644 --- a/include/ozo/connection_pool.h +++ b/include/ozo/connection_pool.h @@ -4,8 +4,8 @@ #include #include #include - -#include +#include +#include namespace ozo { @@ -258,9 +258,9 @@ class pooled_connection { template struct is_connection> : std::true_type {}; -template -struct connection_traits> : - connection_traits::value_type> {}; +template +struct connection_traits> : + connection_traits::value_type> {}; /** * @brief Connection pool implementation @@ -281,6 +281,8 @@ struct connection_traits> : * `connection_pool` models `ConnectionSource` concept itself using underlying `ConnectionSource`. * * @tparam Source --- underlying `ConnectionSource` which is being used to create connection to a database. + * @tparam ThreadSafety --- admissibility to use in multithreaded environment without additional synchronization. + * Thread safe by default. * * ###Example * @@ -295,28 +297,30 @@ struct connection_traits> : * @ingroup group-connection-types * @models{ConnectionSource} */ -template +template > class connection_pool { static_assert(ConnectionSource, "should model ConnectionSource concept"); public: using connection_rep_type = ozo::connection_rep>::oid_map_type>; - using impl_type = yamail::resource_pool::async::pool; + using impl_type = detail::get_connection_pool_impl_t; /** * Construct a new connection pool object * * @param source --- `ConnectionSource` object which is being used to create connection to a database. * @param config --- pool configuration. + * @param thread_safety -- admissibility to use in multithreaded environment without additional synchronization. + * Thread safe by default (`ozo::thread_safety`). */ - connection_pool(Source source, const connection_pool_config& config) + connection_pool(Source source, const connection_pool_config& config, const ThreadSafety& /*thread_safety*/ = ThreadSafety{}) : impl_(config.capacity, config.queue_capacity, config.idle_timeout), source_(std::move(source)) {} /** * Type of connection depends on connection type of Source. The definition is used to model `ConnectionSource` */ - using connection_type = std::shared_ptr>; + using connection_type = std::shared_ptr>>; /** * Get connection is bound to the given `io_context` object. @@ -387,14 +391,19 @@ constexpr auto ConnectionPool = is_connection_pool>::value; * * @param source --- connection source object which is being used to create connection to a database. * @param config --- pool configuration. + * @param thread_safety --- admissibility to use in multithreaded environment without additional synchronization. + * Thread safe by default (`ozo::thread_safety`). + * * @return `ozo::connection_pool` object. * @ingroup group-connection-functions * @relates ozo::connection_pool */ -template -auto make_connection_pool(ConnectionSource&& source, const connection_pool_config& config) { +template +auto make_connection_pool(ConnectionSource&& source, const connection_pool_config& config, + const ThreadSafety& thread_safety = ThreadSafety{}) { static_assert(ozo::ConnectionSource, "source should model ConnectionSource concept"); - return connection_pool>{std::forward(source), config}; + return connection_pool, std::decay_t>{ + std::forward(source), config, thread_safety}; } } // namespace ozo diff --git a/include/ozo/core/thread_safety.h b/include/ozo/core/thread_safety.h new file mode 100644 index 000000000..99398d566 --- /dev/null +++ b/include/ozo/core/thread_safety.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace ozo { + +/** + * @brief defines admissibility to use in multithreaded environment without additional synchronization + * + * Represents binary state. True enables memory sychronization for underlying types to support + * safe access in multithreaded environment. False disables memory synchronization. + * @ingroup group-core-types + * + * @tparam enabled --- binary state. + */ +template +struct thread_safety : std::bool_constant { + constexpr auto operator !() const noexcept { + return thread_safety{}; + } +}; + +constexpr thread_safety thread_safe; + +} // namespace ozo diff --git a/include/ozo/detail/connection_pool.h b/include/ozo/detail/connection_pool.h new file mode 100644 index 000000000..b5329a49d --- /dev/null +++ b/include/ozo/detail/connection_pool.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +#include + +#include + +namespace ozo::detail { + +template +struct get_connection_pool_impl; + +template +struct get_connection_pool_impl> { + using type = yamail::resource_pool::async::pool; +}; + +template +struct get_connection_pool_impl> { + using type = yamail::resource_pool::async::pool; +}; + +template +using get_connection_pool_impl_t = typename get_connection_pool_impl>::type; + +} // namespace ozo::detail diff --git a/include/ozo/detail/stub_mutex.h b/include/ozo/detail/stub_mutex.h new file mode 100644 index 000000000..ba8aac51a --- /dev/null +++ b/include/ozo/detail/stub_mutex.h @@ -0,0 +1,12 @@ +#pragma once + +namespace ozo::detail { + +struct stub_mutex { + stub_mutex() = default; + stub_mutex(const stub_mutex&) = delete; + constexpr void lock() const noexcept {} + constexpr void unlock() const noexcept {} +}; + +} // namespace ozo::detail diff --git a/include/ozo/impl/connection_pool.h b/include/ozo/impl/connection_pool.h index be3fd5351..73ac17bcc 100644 --- a/include/ozo/impl/connection_pool.h +++ b/include/ozo/impl/connection_pool.h @@ -99,17 +99,17 @@ auto wrap_pooled_connection_handler(const Executor& ex, Source&& source, TimeCon namespace ozo { -template -struct unwrap_impl> { +template +struct unwrap_impl> { template static constexpr decltype(auto) apply(T&& handle) { return *handle; } }; -template +template template -void connection_pool::operator ()(io_context& io, TimeConstraint t, Handler&& handler) { +void connection_pool::operator ()(io_context& io, TimeConstraint t, Handler&& handler) { static_assert(ozo::TimeConstraint, "should model TimeConstraint concept"); impl_.get_auto_recycle( io, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d0372397f..3141832ce 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -71,6 +71,7 @@ if(OZO_BUILD_PG_TESTS) integration/retry_integration.cpp integration/cancel_integration.cpp integration/role_based_integration.cpp + integration/connection_pool_integration.cpp ) add_definitions(-DOZO_PG_TEST_CONNINFO="${OZO_PG_TEST_CONNINFO}") endif() diff --git a/tests/integration/connection_pool_integration.cpp b/tests/integration/connection_pool_integration.cpp new file mode 100644 index 000000000..97523c6f6 --- /dev/null +++ b/tests/integration/connection_pool_integration.cpp @@ -0,0 +1,150 @@ +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include + +namespace { + +namespace asio = boost::asio; + +using namespace testing; + +TEST(connection_pool_integration, get_connection_twice_should_get_the_same) { + using namespace ozo::literals; + using namespace std::chrono_literals; + + ozo::io_context io; + ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO); + ozo::connection_pool_config config; + config.capacity = 1; + config.queue_capacity = 0; + ozo::connection_pool pool(conn_info, config, !ozo::thread_safe); + + asio::spawn(io, [&] (asio::yield_context yield) { + const auto get_pg_backend_pid = [&] (int& pg_backend_pid) { + ozo::rows_of result; + ozo::error_code ec; + const auto conn = ozo::request( + pool[io], + "SELECT pg_backend_pid()"_SQL, + ozo::deadline(1s), + ozo::into(result), + yield[ec] + ); + + ASSERT_FALSE(ec) << ec.message(); + ASSERT_FALSE(ozo::is_null_recursive(conn)); + ASSERT_EQ(1u, result.size()); + + pg_backend_pid = std::get<0>(result[0]); + }; + + int pg_backend_pid1 = 0; + get_pg_backend_pid(pg_backend_pid1); + int pg_backend_pid2 = 0; + get_pg_backend_pid(pg_backend_pid2); + + ASSERT_NE(pg_backend_pid1, 0); + EXPECT_EQ(pg_backend_pid1, pg_backend_pid2); + }); + + io.run(); +} + +TEST(connection_pool_integration, request_should_wait_until_connection_is_available) { + using namespace ozo::literals; + using namespace std::chrono_literals; + + ozo::io_context io; + ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO); + ozo::connection_pool_config config; + config.capacity = 1; + config.queue_capacity = 1; + ozo::connection_pool pool(conn_info, config, !ozo::thread_safe); + std::array pg_backend_pids {{0, 0}}; + + for (auto& pg_backend_pid : pg_backend_pids) { + asio::spawn(io, [&, pg_backend_pid = &pg_backend_pid] (asio::yield_context yield) { + ozo::rows_of result; + ozo::error_code ec; + const auto conn = ozo::request( + pool[io], + "SELECT pg_backend_pid()"_SQL, + ozo::deadline(1s), + ozo::into(result), + yield[ec] + ); + + ASSERT_FALSE(ec) << ec.message(); + ASSERT_FALSE(ozo::is_null_recursive(conn)); + ASSERT_EQ(1u, result.size()); + + *pg_backend_pid = std::get<0>(result[0]); + }); + } + + io.run(); + + ASSERT_NE(pg_backend_pids[0], 0); + EXPECT_EQ(pg_backend_pids[0], pg_backend_pids[1]); +} + +TEST(connection_pool_integration, should_serve_concurrent_requests) { + using namespace ozo::literals; + using namespace std::chrono_literals; + + std::vector ios(3); + ozo::connection_info conn_info(OZO_PG_TEST_CONNINFO); + ozo::connection_pool_config config; + config.capacity = 1; + config.queue_capacity = 2; + ozo::connection_pool pool(conn_info, config); + std::vector> futures; + + for (auto& io : ios) { + futures.emplace_back(std::async(std::launch::async, [&, io_ptr = &io] { + auto& io = *io_ptr; + auto guard = boost::asio::make_work_guard(io); + int pg_backend_pid = 0; + + asio::spawn(io, [&] (asio::yield_context yield) { + ozo::rows_of result; + ozo::error_code ec; + const auto conn = ozo::request( + pool[io], + "SELECT pg_backend_pid()"_SQL, + ozo::deadline(1s), + ozo::into(result), + yield[ec] + ); + + ASSERT_FALSE(ec) << ec.message(); + ASSERT_FALSE(ozo::is_null_recursive(conn)); + ASSERT_EQ(1u, result.size()); + + pg_backend_pid = std::get<0>(result[0]); + guard.reset(); + }); + + io.run(); + + return pg_backend_pid; + })); + } + + std::vector results; + std::transform(futures.begin(), futures.end(), std::back_inserter(results), [] (auto& v) { return v.get(); }); + ASSERT_NE(results.front(), 0); + EXPECT_THAT(results, Each(results.front())); +} + +} // namespace