diff --git a/src/rmq/rmqa/rmqa_rabbitcontext.cpp b/src/rmq/rmqa/rmqa_rabbitcontext.cpp index 8568378..441a2a3 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontext.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontext.cpp @@ -60,6 +60,19 @@ bsl::shared_ptr RabbitContext::createVHostConnection( .managedPtr())); } +bsl::shared_ptr RabbitContext::createVHostConnection( + const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback) +{ + return bsl::shared_ptr( + new VHost(d_impl + ->createVHostConnection( + userDefinedName, endpoint, credentials, errorCallback) + .managedPtr())); +} + bsl::shared_ptr RabbitContext::createVHostConnection(const bsl::string& userDefinedName, const rmqt::VHostInfo& vhostInfo) diff --git a/src/rmq/rmqa/rmqa_rabbitcontext.h b/src/rmq/rmqa/rmqa_rabbitcontext.h index 33a0e84..c987132 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontext.h +++ b/src/rmq/rmqa/rmqa_rabbitcontext.h @@ -80,6 +80,12 @@ class RabbitContext { const bsl::shared_ptr& endpoint, const bsl::shared_ptr& credentials); + bsl::shared_ptr + createVHostConnection(const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback); + /// \brief Connect to a RabbitMQ broker /// /// \param vhostInfo identifies the broker to connect to & authentication diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp index aa918dd..19aaed0 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp @@ -168,7 +168,6 @@ RabbitContextImpl::RabbitContextImpl( d_eventLoop->resolver( options.shuffleConnectionEndpoints().value_or(false)), d_eventLoop->timerFactory(), - d_onError, metricPublisher, d_connectionMonitor, options.clientProperties(), @@ -281,6 +280,29 @@ bsl::shared_ptr RabbitContextImpl::createVHostConnection( bdlf::PlaceHolders::_1)); } +bsl::shared_ptr RabbitContextImpl::createVHostConnection( + const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback) +{ + // Override the error callback provided in RabbitContext + // by a per vhost error callback + d_onError = bdlf::BindUtil::bind(&handleErrorCbOnEventLoop, + bsl::ref(d_threadPool), + errorCallback, + bdlf::PlaceHolders::_1, + bdlf::PlaceHolders::_2); + + return bsl::make_shared( + bdlf::BindUtil::bind(&RabbitContextImpl::createNewConnection, + this, + userDefinedName, + endpoint, + credentials, + bdlf::PlaceHolders::_1)); +} + bsl::shared_ptr RabbitContextImpl::createVHostConnection(const bsl::string& userDefinedName, const rmqt::VHostInfo& vhostInfo) @@ -333,7 +355,7 @@ rmqt::Future RabbitContextImpl::createNewConnection( } bsl::shared_ptr amqpConn = - d_connectionFactory->create(endpoint, credentials, name); + d_connectionFactory->create(endpoint, credentials, d_onError, name); // The cancel function is given `amqpConn` which is what keeps it alive // until the shared_ptr is retrieved in diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h index f9271be..1b2babc 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h @@ -54,6 +54,12 @@ class RabbitContextImpl : public rmqp::RabbitContext { const bsl::shared_ptr& credentials) BSLS_KEYWORD_OVERRIDE; + bsl::shared_ptr createVHostConnection( + const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback) BSLS_KEYWORD_OVERRIDE; + bsl::shared_ptr createVHostConnection( const bsl::string& userDefinedName, const rmqt::VHostInfo& endpoint) BSLS_KEYWORD_OVERRIDE; diff --git a/src/rmq/rmqamqp/rmqamqp_connection.cpp b/src/rmq/rmqamqp/rmqamqp_connection.cpp index b13efcd..d962141 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.cpp +++ b/src/rmq/rmqamqp/rmqamqp_connection.cpp @@ -983,13 +983,11 @@ bsl::string Connection::connectionDebugName() const Connection::Factory::Factory( const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, - const rmqt::ErrorCallback& errorCb, const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, const bsl::optional& connectionErrorThreshold) -: d_errorCb(errorCb) -, d_clientProperties(clientProperties) +: d_clientProperties(clientProperties) , d_metricPublisher(metricPublisher) , d_resolver(resolver) , d_timerFactory(timerFactory) @@ -1001,11 +999,12 @@ Connection::Factory::Factory( bsl::shared_ptr Connection::Factory::create( const bsl::shared_ptr& endpoint, const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback, const bsl::string& name) { bsl::shared_ptr result(new rmqamqp::Connection( d_resolver, - newRetryHandler(), + newRetryHandler(errorCallback), newHeartBeatManager(), d_timerFactory, newChannelFactory(), @@ -1020,18 +1019,19 @@ bsl::shared_ptr Connection::Factory::create( return result; } -bsl::shared_ptr Connection::Factory::newRetryHandler() +bsl::shared_ptr +Connection::Factory::newRetryHandler(const rmqt::ErrorCallback& errorCallback) { return d_connectionErrorThreshold ? bsl::shared_ptr( bsl::make_shared( d_timerFactory, - d_errorCb, + errorCallback, bsl::make_shared(), *d_connectionErrorThreshold)) : bsl::make_shared( d_timerFactory, - d_errorCb, + errorCallback, bsl::make_shared()); } diff --git a/src/rmq/rmqamqp/rmqamqp_connection.h b/src/rmq/rmqamqp/rmqamqp_connection.h index 49d84e6..56ce901 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.h +++ b/src/rmq/rmqamqp/rmqamqp_connection.h @@ -310,7 +310,6 @@ class Connection::Factory { Factory(const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, - const rmqt::ErrorCallback& errorCb, const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, @@ -321,10 +320,12 @@ class Connection::Factory { virtual bsl::shared_ptr create(const bsl::shared_ptr& endpoint, const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback, const bsl::string& name = ""); protected: - virtual bsl::shared_ptr newRetryHandler(); + virtual bsl::shared_ptr + newRetryHandler(const rmqt::ErrorCallback& errorCallback); virtual bsl::shared_ptr newHeartBeatManager(); virtual bsl::shared_ptr newChannelFactory(); @@ -332,7 +333,6 @@ class Connection::Factory { Factory(const Factory&) BSLS_KEYWORD_DELETED; Factory& operator=(const Factory&) BSLS_KEYWORD_DELETED; - const rmqt::ErrorCallback d_errorCb; const rmqt::FieldTable d_clientProperties; const bsl::shared_ptr d_metricPublisher; const bsl::shared_ptr d_resolver; diff --git a/src/rmq/rmqp/rmqp_rabbitcontext.h b/src/rmq/rmqp/rmqp_rabbitcontext.h index 8820dc7..c83f2ac 100644 --- a/src/rmq/rmqp/rmqp_rabbitcontext.h +++ b/src/rmq/rmqp/rmqp_rabbitcontext.h @@ -55,6 +55,12 @@ class RabbitContext { const bsl::shared_ptr& endpoint, const bsl::shared_ptr& credentials) = 0; + virtual bsl::shared_ptr + createVHostConnection(const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback) = 0; + /// \brief Connect to a RabbitMQ broker /// /// \param vhostInfo identifies the broker to connect to & authentication diff --git a/src/rmqtestmocks/rmqtestmocks_mockrabbitcontext.h b/src/rmqtestmocks/rmqtestmocks_mockrabbitcontext.h index 3e1e778..6b734e8 100644 --- a/src/rmqtestmocks/rmqtestmocks_mockrabbitcontext.h +++ b/src/rmqtestmocks/rmqtestmocks_mockrabbitcontext.h @@ -54,6 +54,13 @@ class MockRabbitContext : public rmqp::RabbitContext { const bsl::shared_ptr& endpoint, const bsl::shared_ptr& credentials)); + MOCK_METHOD4(createVHostConnection, + bsl::shared_ptr( + const bsl::string& userDefinedName, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::ErrorCallback& errorCallback)); + MOCK_METHOD2( createVHostConnection, bsl::shared_ptr(const bsl::string& userDefinedName, diff --git a/src/tests/rmqamqp/rmqamqp_connection.t.cpp b/src/tests/rmqamqp/rmqamqp_connection.t.cpp index 2c99b0b..2a465a6 100644 --- a/src/tests/rmqamqp/rmqamqp_connection.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_connection.t.cpp @@ -322,7 +322,6 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { ConnectionFactory( const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, - const rmqt::ErrorCallback& errorCb, const bsl::shared_ptr& metricPublisher, const rmqt::FieldTable& clientProperties, const bsl::shared_ptr& retryHandler, @@ -330,7 +329,6 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { const bsl::shared_ptr& channelFactory) : rmqamqp::Connection::Factory(resolver, timerFactory, - errorCb, metricPublisher, bsl::make_shared(), clientProperties, @@ -340,7 +338,8 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { , d_channelFactory(channelFactory) { } - bsl::shared_ptr newRetryHandler() BSLS_KEYWORD_OVERRIDE + bsl::shared_ptr newRetryHandler( + const rmqt::ErrorCallback& errorCallback) BSLS_KEYWORD_OVERRIDE { return d_retryHandler; } @@ -406,7 +405,6 @@ class ConnectionTests : public ::testing::Test { , d_clientProperties(generateDefaultClientProperties()) , d_factory(bsl::make_shared(d_resolver, d_timerFactory, - d_errorCallback, d_metricPublisher, d_clientProperties, d_retryHandler, @@ -636,7 +634,7 @@ class ConnectionTests : public ::testing::Test { createAndStartConnection(const bsl::string& name = "test-connection") { bsl::shared_ptr conn = - d_factory->create(d_endpoint, d_credentials, name); + d_factory->create(d_endpoint, d_credentials, d_errorCallback, name); conn->startFirstConnection(d_onConnectCb); return conn; @@ -683,7 +681,6 @@ TEST_F(ConnectionTests, ClientProperties) rmqt::FieldValue(bsl::string("BAR")); // Add one more d_factory = bsl::make_shared(d_resolver, d_timerFactory, - d_errorCallback, d_metricPublisher, overriddenClientProperties, d_retryHandler, @@ -721,7 +718,6 @@ TEST_F(ConnectionTests, ClientPropertiesCantOverrideReservedOnes) bsl::string("Should get overriden by library")); // Add one more d_factory = bsl::make_shared(d_resolver, d_timerFactory, - d_errorCallback, d_metricPublisher, overriddenClientProperties, d_retryHandler,