From 17822f14800c94eaee677b9eb593c82893ac1f45 Mon Sep 17 00:00:00 2001 From: divyagayathri-hcl Date: Tue, 1 Oct 2024 14:43:29 -0700 Subject: [PATCH] ZMQ lib change. --- common/zmqclient.cpp | 32 +++++++++++ common/zmqclient.h | 5 ++ common/zmqproducerstatetable.cpp | 7 +++ common/zmqproducerstatetable.h | 6 ++ common/zmqserver.cpp | 65 ++++++++++++++++++++++ common/zmqserver.h | 3 + tests/zmq_state_ut.cpp | 94 ++++++++++++++++++++++++++++++++ 7 files changed, 212 insertions(+) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 0225d437..2f3f6e46 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -197,4 +197,36 @@ void ZmqClient::sendMsg( throw system_error(make_error_code(errc::io_error), message); } +bool ZmqClient::wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos, + std::vector& buffer) +{ + SWSS_LOG_ENTER(); + int rc; + for (int i = 0; true ; ++i) + { + rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); + if (rc < 0) + { + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + { + continue; + } + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + } + if (rc >= (int)buffer.size()) + { + SWSS_LOG_THROW( + "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", + (int)buffer.size(), rc); + } + break; + } + buffer.at(rc) = 0; // make sure that we end string with zero before parse + kcos.clear(); + BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); + return true; +} + } diff --git a/common/zmqclient.h b/common/zmqclient.h index 313e6573..ac021461 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -24,6 +24,11 @@ class ZmqClient const std::string& tableName, const std::vector& kcos, std::vector& sendbuffer); + bool wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos, + std::vector& buffer); + private: void initialize(const std::string& endpoint, const std::string& vrf); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index ec9396b3..d43e7843 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -182,4 +182,11 @@ size_t ZmqProducerStateTable::dbUpdaterQueueSize() return m_asyncDBUpdater->queueSize(); } +bool ZmqProducerStateTable::wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos) +{ + return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); +} + } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 74910782..f498fb95 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -38,6 +38,12 @@ class ZmqProducerStateTable : public ProducerStateTable virtual void send(const std::vector &kcos); size_t dbUpdaterQueueSize(); + + // This method should only be used if the ZmqClient enables one-to-one sync. + virtual bool wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos); + private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 4800b9ba..b74418ed 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -166,4 +166,69 @@ void ZmqServer::mqPollThread() SWSS_LOG_NOTICE("mqPollThread end"); } +void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values) +{ + int serializedlen = (int)BinarySerializer::serializeBuffer( + m_buffer.data(), + m_buffer.size(), + dbName, + tableName, + values); + + SWSS_LOG_DEBUG("sending: %d", serializedlen); + int zmq_err = 0; + int retry_delay = 10; + int rc = 0; + for (int i = 0; i <= MQ_MAX_RETRY; ++i) + { + rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); + if (rc >= 0) + { + m_allowZmqPoll = true; + SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); + return; + } + + zmq_err = zmq_errno(); + // sleep (2 ^ retry time) * 10 ms + retry_delay *= 2; + if (zmq_err == EINTR + || zmq_err== EFSM) + { + // EINTR: interrupted by signal + // EFSM: socket state not ready + // For example when ZMQ socket still not receive reply message from last sended package. + // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this + // error will happen. + // for more detail, please check: http://api.zeromq.org/2-1:zmq-send + SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + retry_delay = 0; + } + else if (zmq_err == EAGAIN) + { + // EAGAIN: ZMQ is full to need try again + SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); + } + else if (zmq_err == ETERM) + { + auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::connection_reset), message); + } + else + { + // for other error, send failed immediately. + auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); + } + usleep(retry_delay * 1000); + } + // failed after retry + auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); +} + } diff --git a/common/zmqserver.h b/common/zmqserver.h index 8afe18d7..c34be3db 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -39,6 +39,9 @@ class ZmqServer const std::string tableName, ZmqMessageHandler* handler); + void sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values); + private: void handleReceivedData(const char* buffer, const size_t size); diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 4818b7fd..499b9d23 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -288,6 +288,9 @@ static void testMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -351,6 +354,9 @@ static void testBatchMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -397,6 +403,94 @@ static void testBatchMethod(bool producerPersistence) cout << endl << "Done." << endl; } +static bool zmq_done = false; +static void zmqConsumerWorker(string tableName, string endpoint) +{ + cout << "Consumer thread started: " << tableName << endl; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(endpoint); + ZmqConsumerStateTable c(&db, tableName, server); + Select cs; + cs.addSelectable(&c); + //validate received data + Selectable *selectcs; + std::deque vkco; + int ret = 0; + while (!zmq_done) + { + ret = cs.select(&selectcs, 10, true); + if (ret == Select::OBJECT) + { + c.pops(vkco); + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + server.sendMsg(TEST_DB, tableName, values); + } + } + allDataReceived = true; + cout << "Consumer thread ended: " << tableName << endl; +} +TEST(ZmqOneToOneSync, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + // start consumer first, SHM can only have 1 consumer per table. + thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint); + // Wait for the consumer to be ready. + sleep(1); + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint); + ZmqProducerStateTable p(&db, testTableName, client); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + std::vector> kcos_p; + std::string dbName, tableName; + for (int i =0; i < 3; ++i) + { + p.send(kcos); + ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); + EXPECT_EQ(dbName, TEST_DB); + EXPECT_EQ(tableName, testTableName); + ASSERT_EQ(kcos_p.size(), 1); + EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); + EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); + std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; + EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); + } + zmq_done = true; + consumerThread->join(); + delete consumerThread; +} +TEST(ZmqOneToOneSyncClientError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint); + ZmqProducerStateTable p(&db, testTableName, client); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); + std::vector> kcos_p; + std::string dbName, tableName; + p.send(kcos); + // Wait will timeout without server reply. + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); + // Send will return error without server reply. + EXPECT_THROW(p.send(kcos), std::system_error); +} +TEST(ZmqOneToOneSyncServerError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pullEndpoint = "tcp://*:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(pullEndpoint); + ZmqConsumerStateTable c(&db, testTableName, server); + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + // Send will return error without client request. + EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error); +} TEST(ZmqConsumerStateTable, test) { // test with persist by consumer