diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 2f3f6e46..3ed5bcf7 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -198,35 +198,65 @@ void ZmqClient::sendMsg( } bool ZmqClient::wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos, + std::vector& buffer) + { + SWSS_LOG_ENTER(); + int rc; + for (int i = 0; true ; ++i) + { + rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); + if (rc < 0) + { + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + { + continue; + } + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + } + if (rc >= (int)buffer.size()) + { + SWSS_LOG_THROW( + "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", + (int)buffer.size(), rc); + } + break; + } + buffer.at(rc) = 0; // make sure that we end string with zero before parse + kcos.clear(); + BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); + return true; + } } diff --git a/common/zmqclient.h b/common/zmqclient.h index ac021461..79b4d766 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -12,6 +12,7 @@ namespace swss { class ZmqClient { public: + ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); @@ -24,9 +25,13 @@ class ZmqClient const std::string& tableName, const std::vector& kcos, std::vector& sendbuffer); + bool wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos, + std::vector& buffer); private: diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index d43e7843..c171163f 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -171,6 +171,18 @@ void ZmqProducerStateTable::send(const std::vector &kcos } } +bool ZmqProducerStateTable::wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos) + +{ + + return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); + +} + size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) @@ -182,11 +194,4 @@ size_t ZmqProducerStateTable::dbUpdaterQueueSize() return m_asyncDBUpdater->queueSize(); } -bool ZmqProducerStateTable::wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos) -{ - return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); -} - } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index f498fb95..3c794237 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -37,13 +37,15 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); - size_t dbUpdaterQueueSize(); - // This method should only be used if the ZmqClient enables one-to-one sync. + virtual bool wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos); + size_t dbUpdaterQueueSize(); private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 3575131c..dbca01df 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -12,7 +12,7 @@ using namespace std; namespace swss { ZmqServer::ZmqServer(const std::string& endpoint) - : ZmqServer(endpoint, "") + : m_endpoint(endpoint) { } @@ -34,6 +34,9 @@ ZmqServer::~ZmqServer() m_mqPollThread->join(); zmq_close(m_socket); + + zmq_ctx_destroy(m_context); + } void ZmqServer::registerMessageHandler( @@ -109,21 +112,24 @@ void ZmqServer::mqPollThread() int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", m_endpoint.c_str(), - zmq_errno()); + zmq_errno(), + strerror(zmq_errno())); } // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); while (m_runThread) { + m_allowZmqPoll = false; + // receive message rc = zmq_poll(&poll_item, 1, 1000); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) @@ -134,7 +140,7 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); if (rc < 0) { int zmq_err = zmq_errno(); @@ -163,75 +169,10 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(socket); - zmq_ctx_destroy(context); + zmq_close(m_socket); + zmq_ctx_destroy(m_context); SWSS_LOG_NOTICE("mqPollThread end"); } -void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, - const std::vector& values) -{ - int serializedlen = (int)BinarySerializer::serializeBuffer( - m_buffer.data(), - m_buffer.size(), - dbName, - tableName, - values); - - SWSS_LOG_DEBUG("sending: %d", serializedlen); - int zmq_err = 0; - int retry_delay = 10; - int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) - { - rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); - if (rc >= 0) - { - m_allowZmqPoll = true; - SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); - return; - } - - zmq_err = zmq_errno(); - // sleep (2 ^ retry time) * 10 ms - retry_delay *= 2; - if (zmq_err == EINTR - || zmq_err== EFSM) - { - // EINTR: interrupted by signal - // EFSM: socket state not ready - // For example when ZMQ socket still not receive reply message from last sended package. - // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this - // error will happen. - // for more detail, please check: http://api.zeromq.org/2-1:zmq-send - SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); - retry_delay = 0; - } - else if (zmq_err == EAGAIN) - { - // EAGAIN: ZMQ is full to need try again - SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); - } - else if (zmq_err == ETERM) - { - auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); - SWSS_LOG_ERROR("%s", message.c_str()); - throw system_error(make_error_code(errc::connection_reset), message); - } - else - { - // for other error, send failed immediately. - auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); - SWSS_LOG_ERROR("%s", message.c_str()); - throw system_error(make_error_code(errc::io_error), message); - } - usleep(retry_delay * 1000); - } - // failed after retry - auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); - SWSS_LOG_ERROR("%s", message.c_str()); - throw system_error(make_error_code(errc::io_error), message); -} - } diff --git a/common/zmqserver.h b/common/zmqserver.h index c34be3db..d5b36d39 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -39,10 +39,10 @@ class ZmqServer const std::string tableName, ZmqMessageHandler* handler); - void sendMsg(const std::string& dbName, const std::string& tableName, - const std::vector& values); - private: + + void connect(); + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -59,6 +59,12 @@ class ZmqServer std::string m_vrf; + void* m_context; + + void* m_socket; + + bool m_allowZmqPoll; + std::map> m_HandlerMap; }; diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 499b9d23..3817fb52 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -403,94 +403,6 @@ static void testBatchMethod(bool producerPersistence) cout << endl << "Done." << endl; } -static bool zmq_done = false; -static void zmqConsumerWorker(string tableName, string endpoint) -{ - cout << "Consumer thread started: " << tableName << endl; - DBConnector db(TEST_DB, 0, true); - ZmqServer server(endpoint); - ZmqConsumerStateTable c(&db, tableName, server); - Select cs; - cs.addSelectable(&c); - //validate received data - Selectable *selectcs; - std::deque vkco; - int ret = 0; - while (!zmq_done) - { - ret = cs.select(&selectcs, 10, true); - if (ret == Select::OBJECT) - { - c.pops(vkco); - std::vector values; - values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); - server.sendMsg(TEST_DB, tableName, values); - } - } - allDataReceived = true; - cout << "Consumer thread ended: " << tableName << endl; -} -TEST(ZmqOneToOneSync, test) -{ - std::string testTableName = "ZMQ_PROD_CONS_UT"; - std::string pushEndpoint = "tcp://localhost:1234"; - std::string pullEndpoint = "tcp://*:1234"; - // start consumer first, SHM can only have 1 consumer per table. - thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint); - // Wait for the consumer to be ready. - sleep(1); - DBConnector db(TEST_DB, 0, true); - ZmqClient client(pushEndpoint); - ZmqProducerStateTable p(&db, testTableName, client); - std::vector kcos; - kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); - std::vector> kcos_p; - std::string dbName, tableName; - for (int i =0; i < 3; ++i) - { - p.send(kcos); - ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); - EXPECT_EQ(dbName, TEST_DB); - EXPECT_EQ(tableName, testTableName); - ASSERT_EQ(kcos_p.size(), 1); - EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); - EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); - std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; - EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); - } - zmq_done = true; - consumerThread->join(); - delete consumerThread; -} -TEST(ZmqOneToOneSyncClientError, test) -{ - std::string testTableName = "ZMQ_PROD_CONS_UT"; - std::string pushEndpoint = "tcp://localhost:1234"; - DBConnector db(TEST_DB, 0, true); - ZmqClient client(pushEndpoint); - ZmqProducerStateTable p(&db, testTableName, client); - std::vector kcos; - kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); - std::vector> kcos_p; - std::string dbName, tableName; - p.send(kcos); - // Wait will timeout without server reply. - EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); - // Send will return error without server reply. - EXPECT_THROW(p.send(kcos), std::system_error); -} -TEST(ZmqOneToOneSyncServerError, test) -{ - std::string testTableName = "ZMQ_PROD_CONS_UT"; - std::string pullEndpoint = "tcp://*:1234"; - DBConnector db(TEST_DB, 0, true); - ZmqServer server(pullEndpoint); - ZmqConsumerStateTable c(&db, testTableName, server); - std::vector values; - values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); - // Send will return error without client request. - EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error); -} TEST(ZmqConsumerStateTable, test) { // test with persist by consumer