From 22405e6cfbb957e53e870d17707cc1eaa38d3893 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 16 Dec 2023 20:56:22 +0100 Subject: [PATCH] Accepts as valid responses to staged requests. Before these changes the request had to be marked as written in order to interpret incoming responses as belonging to that request. On fast networks however, like on localhost and underload the responses might arrive before the write operation completed. --- README.md | 9 ++ .../boost/redis/detail/connection_base.hpp | 131 +++++++++--------- include/boost/redis/request.hpp | 28 ++-- test/test_conn_exec.cpp | 39 +++++- test/test_conn_exec_retry.cpp | 4 +- test/test_conn_quit.cpp | 2 +- test/test_conn_reconnect.cpp | 2 +- 7 files changed, 128 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 69335443..436aa687 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog +### Boost 1.85 + +* Fixes [issue 170](https://github.com/boostorg/redis/issues/170). + Under load and on low-latency networks it is possible to start + receiving responses before the write operation completed and while + the request is still marked as staged and not written. This messes + up with the heuristics that classifies responses as unsolicied or + not. + ### Boost 1.84 (First release in Boost) * Deprecates the `async_receive` overload that takes a response. Users diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index a954c0c8..3e9a461a 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -113,7 +113,7 @@ struct exec_op { asio::coroutine coro{}; template - void operator()(Self& self , system::error_code ec = {}) + void operator()(Self& self , system::error_code ec = {}, std::size_t = 0) { BOOST_ASIO_CORO_REENTER (coro) { @@ -130,7 +130,6 @@ struct exec_op { EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD info_->async_wait(std::move(self)); - BOOST_ASSERT(ec == asio::error::operation_aborted); if (info_->ec_) { self.complete(info_->ec_, 0); @@ -140,18 +139,18 @@ struct exec_op { if (info_->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } if (is_cancelled(self)) { - if (info_->is_written()) { + if (!info_->is_waiting()) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { // Cancellation requires closing the connection // otherwise it stays in inconsistent state. conn_->cancel(operation::run); - return self.complete(ec, 0); + return self.complete(asio::error::operation_aborted, 0); } else { // Can't implement other cancelation types, ignoring. self.get_cancellation_state().clear(); @@ -163,7 +162,7 @@ struct exec_op { } else { // Cancelation can be honored. conn_->remove_request(info_); - self.complete(ec, 0); + self.complete(asio::error::operation_aborted, 0); return; } } @@ -516,6 +515,7 @@ class connection_base { using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; + using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} @@ -527,10 +527,10 @@ class connection_base { { BOOST_ASSERT(ptr != nullptr); - if (ptr->is_written()) { - return !ptr->req_->get_config().cancel_if_unresponded; - } else { + if (ptr->is_waiting()) { return !ptr->req_->get_config().cancel_on_connection_lost; + } else { + return !ptr->req_->get_config().cancel_if_unresponded; } }; @@ -544,7 +544,7 @@ class connection_base { reqs_.erase(point, std::end(reqs_)); std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return ptr->reset_status(); + return ptr->mark_waiting(); }); return ret; @@ -555,7 +555,7 @@ class connection_base { auto f = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); - return ptr->is_written(); + return !ptr->is_waiting(); }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); @@ -615,25 +615,15 @@ class connection_base { using node_type = resp3::basic_node; using wrapped_adapter_type = std::function; - enum class action - { - stop, - proceed, - none, - }; - explicit req_info(request const& req, adapter_type adapter, executor_type ex) - : timer_{ex} - , action_{action::none} + : notifier_{ex, 1} , req_{&req} , adapter_{} , expected_responses_{req.get_expected_responses()} - , status_{status::none} + , status_{status::waiting} , ec_{{}} , read_size_{0} { - timer_.expires_at((std::chrono::steady_clock::time_point::max)()); - adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { auto const i = req_->get_expected_responses() - expected_responses_; @@ -643,18 +633,16 @@ class connection_base { auto proceed() { - timer_.cancel(); - action_ = action::proceed; + notifier_.try_send(std::error_code{}, 0); } void stop() { - timer_.cancel(); - action_ = action::stop; + notifier_.close(); } - [[nodiscard]] auto is_waiting_write() const noexcept - { return !is_written() && !is_staged(); } + [[nodiscard]] auto is_waiting() const noexcept + { return status_ == status::waiting; } [[nodiscard]] auto is_written() const noexcept { return status_ == status::written; } @@ -668,27 +656,26 @@ class connection_base { void mark_staged() noexcept { status_ = status::staged; } - void reset_status() noexcept - { status_ = status::none; } + void mark_waiting() noexcept + { status_ = status::waiting; } [[nodiscard]] auto stop_requested() const noexcept - { return action_ == action::stop;} + { return !notifier_.is_open();} template auto async_wait(CompletionToken token) { - return timer_.async_wait(std::move(token)); + return notifier_.async_receive(std::move(token)); } //private: enum class status - { none + { waiting , staged , written }; - timer_type timer_; - action action_; + exec_notifier_type notifier_; request const* req_; wrapped_adapter_type adapter_; @@ -716,7 +703,7 @@ class connection_base { void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); + return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { @@ -737,7 +724,7 @@ class connection_base { if (info->req_->has_hello_priority()) { auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { - return e->is_waiting_write(); + return e->is_waiting(); }); std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); @@ -781,7 +768,7 @@ class connection_base { // Coalesces the requests and marks them staged. After a // successful write staged requests will be marked as written. auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { - return !ri->is_waiting_write(); + return !ri->is_waiting(); }); std::for_each(point, std::cend(reqs_), [this](auto const& ri) { @@ -798,7 +785,14 @@ class connection_base { bool is_waiting_response() const noexcept { - return !std::empty(reqs_) && reqs_.front()->is_written(); + if (std::empty(reqs_)) + return false; + + // Under load and on low-latency networks we might start + // receiving responses before the write operation completed and + // the request is still maked as staged and not written. See + // https://github.com/boostorg/redis/issues/170 + return !reqs_.front()->is_waiting(); } void close() @@ -814,36 +808,39 @@ class connection_base { auto is_next_push() { - // We handle unsolicited events in the following way - // - // 1. Its resp3 type is a push. - // - // 2. A non-push type is received with an empty requests - // queue. I have noticed this is possible (e.g. -MISCONF). - // I expect them to have type push so we can distinguish - // them from responses to commands, but it is a - // simple-error. If we are lucky enough to receive them - // when the command queue is empty we can treat them as - // server pushes, otherwise it is impossible to handle - // them properly - // - // 3. The request does not expect any response but we got - // one. This may happen if for example, subscribe with - // wrong syntax. - // - // Useful links: + BOOST_ASSERT(!read_buffer_.empty()); + + // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 - // - - BOOST_ASSERT(!read_buffer_.empty()); - - return - (resp3::to_type(read_buffer_.front()) == resp3::type::push) - || reqs_.empty() - || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0) - || !is_waiting_response(); // Added to deal with MONITOR. + // - https://github.com/boostorg/redis/issues/170 + + // The message's resp3 type is a push. + if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + return true; + + // This is non-push type and the requests queue is empty. I have + // noticed this is possible, for example with -MISCONF. I don't + // know why they are not sent with a push type so we can + // distinguish them from responses to commands. If we are lucky + // enough to receive them when the command queue is empty they + // can be treated as server pushes, otherwise it is impossible + // to handle them properly + if (reqs_.empty()) + return true; + + // The request does not expect any response but we got one. This + // may happen if for example, subscribe with wrong syntax. + if (reqs_.front()->expected_responses_ == 0) + return true; + + // Added to deal with MONITOR and also to fix PR170 which + // happens under load and on low-latency networks, where we + // might start receiving responses before the write operation + // completed and the request is still maked as staged and not + // written. + return reqs_.front()->is_waiting(); } auto get_suggested_buffer_growth() const noexcept diff --git a/include/boost/redis/request.hpp b/include/boost/redis/request.hpp index ebc94a22..0e62e0a9 100644 --- a/include/boost/redis/request.hpp +++ b/include/boost/redis/request.hpp @@ -47,31 +47,31 @@ class request { public: /// Request configuration options. struct config { - /** \brief If `true` - * `boost::redis::connection::async_exec` will complete with error if the - * connection is lost. Affects only requests that haven't been - * sent yet. + /** \brief If `true` calls to `connection::async_exec` will + * complete with error if the connection is lost while the + * request hasn't been sent yet. */ bool cancel_on_connection_lost = true; - /** \brief If `true` the request will complete with - * boost::redis::error::not_connected if `async_exec` is called before - * the connection with Redis was established. + /** \brief If `true` `connection::async_exec` will complete with + * `boost::redis::error::not_connected` if the call happens + * before the connection with Redis was established. */ bool cancel_if_not_connected = false; - /** \brief If `false` `boost::redis::connection::async_exec` will not + /** \brief If `false` `connection::async_exec` will not * automatically cancel this request if the connection is lost. * Affects only requests that have been written to the socket - * but remained unresponded when `boost::redis::connection::async_run` - * completed. + * but remained unresponded when + * `boost::redis::connection::async_run` completed. */ bool cancel_if_unresponded = true; - /** \brief If this request has a `HELLO` command and this flag is - * `true`, the `boost::redis::connection` will move it to the front of - * the queue of awaiting requests. This makes it possible to - * send `HELLO` and authenticate before other commands are sent. + /** \brief If this request has a `HELLO` command and this flag + * is `true`, the `boost::redis::connection` will move it to the + * front of the queue of awaiting requests. This makes it + * possible to send `HELLO` and authenticate before other + * commands are sent. */ bool hello_with_priority = true; }; diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 5e135cac..c3f04134 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -6,6 +6,7 @@ #include #include +#include #define BOOST_TEST_MODULE conn-exec #include #include @@ -17,12 +18,13 @@ // container. namespace net = boost::asio; +using boost::redis::config; using boost::redis::connection; -using boost::redis::request; -using boost::redis::response; using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::operation; +using boost::redis::request; +using boost::redis::response; // Sends three requests where one of them has a hello with a priority // set, which means it should be executed first. @@ -153,3 +155,36 @@ BOOST_AUTO_TEST_CASE(correct_database) BOOST_CHECK_EQUAL(cfg.database_index.value(), index); } +BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170) +{ + // See https://github.com/boostorg/redis/issues/170 + + std::string payload; + payload.resize(1024); + std::fill(std::begin(payload), std::end(payload), 'A'); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + auto cfg = make_test_config(); + cfg.health_check_interval = std::chrono::seconds(0); + conn->async_run(cfg, {}, net::detached); + + int counter = 0; + int const repeat = 8000; + + for (int i = 0; i < repeat; ++i) { + auto req = std::make_shared(); + req->push("PING", payload); + conn->async_exec(*req, ignore, [req, &counter, conn](auto ec, auto) { + BOOST_TEST(!ec); + if (++counter == repeat) + conn->cancel(); + }); + } + + ioc.run(); + + BOOST_CHECK_EQUAL(counter, repeat); +} + diff --git a/test/test_conn_exec_retry.cpp b/test/test_conn_exec_retry.cpp index c464b342..99f68c39 100644 --- a/test/test_conn_exec_retry.cpp +++ b/test/test_conn_exec_retry.cpp @@ -57,12 +57,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false) auto c2 = [&](auto ec, auto){ std::cout << "c2" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c1 = [&](auto ec, auto){ std::cout << "c1" << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c0 = [&](auto ec, auto){ diff --git a/test/test_conn_quit.cpp b/test/test_conn_quit.cpp index fcd580b9..9d5dd2f3 100644 --- a/test/test_conn_quit.cpp +++ b/test/test_conn_quit.cpp @@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits) auto c3 = [](auto ec, auto) { std::clog << "c3: " << ec.message() << std::endl; - BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted); }; auto c2 = [&](auto ec, auto) diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index 5b45a127..d4605d5a 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -99,7 +99,7 @@ auto async_test_reconnect_timeout() -> net::awaitable std::cout << "ccc" << std::endl; - BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled); + BOOST_CHECK_EQUAL(ec1, boost::asio::error::operation_aborted); } BOOST_AUTO_TEST_CASE(test_reconnect_and_idle)