diff --git a/common/schema.h b/common/schema.h index e99f2ad8..461ea4d8 100644 --- a/common/schema.h +++ b/common/schema.h @@ -475,11 +475,12 @@ namespace swss { #define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT" +#define CFG_MEMORY_STATISTICS_TABLE_NAME "MEMORY_STATISTICS" + #define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE" #define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE" #define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE" - /***** STATE DATABASE *****/ #define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY" diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 3d61ba62..715f157e 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -202,7 +202,7 @@ void ZmqClient::sendMsg( else { // for other error, send failed immediately. - auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } @@ -211,7 +211,7 @@ void ZmqClient::sendMsg( } // failed after retry - auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } @@ -224,17 +224,24 @@ bool ZmqClient::wait(std::string& dbName, /* zmq_pollitem_t items [1] = { }; items[0].socket = m_socket; - items[0].events = ZMQ_POLLIN; + items[0].events = ZMQ_POLLIN; */ + + zmq_pollitem_t poll_item; + poll_item.fd = 0; + poll_item.socket = m_socket; + poll_item.events = ZMQ_POLLIN; + poll_item.revents = 0; int rc; for (int i = 0; true; ++i) { - SWSS_LOG_DEBUG("m_waitTimeMs is : %d", (int)m_waitTimeMs); - rc = zmq_poll(items, 1, (int)m_waitTimeMs); + rc = zmq_poll(&poll_item, 1, 1000); + SWSS_LOG_DEBUG("cli: rc value is : %d", rc); if (rc == 0) { - SWSS_LOG_ERROR("zmq_poll timed out"); + SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait"); return false; +// continue; } if (rc > 0) { @@ -242,12 +249,12 @@ bool ZmqClient::wait(std::string& dbName, } if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) { + SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll"); continue; } - SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno()); - } */ + SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno()); + } - int rc; for (int i = 0; true; ++i) { rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0); @@ -255,9 +262,10 @@ bool ZmqClient::wait(std::string& dbName, { if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) { + SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive"); continue; } - SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno()); + SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno()); return false; } if (rc >= (int)m_sendbuffer.size()) diff --git a/common/zmqclient.h b/common/zmqclient.h index ebc0bd01..0e835f31 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -32,7 +32,7 @@ class ZmqClient private: void initialize(const std::string& endpoint, const std::string& vrf); -// void initialize(const std::string& endpoint); + void initialize(const std::string& endpoint); std::string m_endpoint; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 3235dc5c..66bb5e76 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -96,6 +97,11 @@ void ZmqServer::mqPollThread() void* context = zmq_ctx_new();; void* socket = zmq_socket(context, ZMQ_PULL); +//divya + int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code); +//divya + // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); @@ -106,6 +112,7 @@ void ZmqServer::mqPollThread() } int rc = zmq_bind(socket, m_endpoint.c_str()); + SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc); if (rc != 0) { SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", @@ -122,12 +129,14 @@ void ZmqServer::mqPollThread() poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); + SWSS_LOG_DEBUG("m_runThread: %d", m_runThread); while (m_runThread) { m_allowZmqPoll = false; // receive message rc = zmq_poll(&poll_item, 1, 1000); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -137,6 +146,7 @@ void ZmqServer::mqPollThread() // receive message rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc); if (rc < 0) { int zmq_err = zmq_errno(); @@ -184,10 +194,10 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, int zmq_err = 0; int retry_delay = 10; int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) + for (int i = 0; i <= MQ_MAX_RETRY; ++i) { - rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); - SWSS_LOG_DEBUG("rc value is : %d", rc); + rc = zmq_send(m_socket, m_buffer.data(), serializedlen, ZMQ_NOBLOCK); + SWSS_LOG_DEBUG("ser: rc value is : %d", rc); if (rc >= 0) { m_allowZmqPoll = true; @@ -200,7 +210,7 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err); if (zmq_err == EINTR - || zmq_err== EFSM) + || zmq_err == EFSM) { // EINTR: interrupted by signal // EFSM: socket state not ready @@ -228,7 +238,9 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, // for other error, send failed immediately. auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); -// throw system_error(make_error_code(errc::io_error), message); + throw system_error(make_error_code(errc::io_error), message); +// SWSS_LOG_THROW("Else case message is: %s", message.c_str()); +// return; } usleep(retry_delay * 1000); } @@ -237,6 +249,8 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); +// SWSS_LOG_THROW("Last Error message is %s", message.c_str()); +// return; } } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 9026ca86..12dd7cda 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -508,14 +508,14 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste static void ZmqWithResponse(bool producerPersistence) { std::string testTableName = "ZMQ_PROD_CONS_UT"; - std::string db_Name = "TEST_DB"; +// std::string db_Name = "TEST_DB"; std::string pushEndpoint = "tcp://localhost:1234"; std::string pullEndpoint = "tcp://*:1234"; // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence); // Wait for the consumer to be ready. - sleep(10); + sleep(1); DBConnector db(TEST_DB, 0, true); ZmqClient client(pushEndpoint); ZmqProducerStateTable p(&db, testTableName, client, true); @@ -526,8 +526,8 @@ static void ZmqWithResponse(bool producerPersistence) for (int i = 0; i < 3; ++i) { p.send(kcos); -// ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); - ASSERT_TRUE(p.wait(db_Name, testTableName, kcos_p)); + ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); +// ASSERT_TRUE(p.wait(db_Name, testTableName, kcos_p)); EXPECT_EQ(dbName, TEST_DB); EXPECT_EQ(tableName, testTableName); ASSERT_EQ(kcos_p.size(), 1); @@ -552,7 +552,7 @@ TEST(ZmqWithResponseClientError, test) { std::string testTableName = "ZMQ_PROD_CONS_UT"; std::string pushEndpoint = "tcp://localhost:1234"; - std::string new_dbName = "TEST_DB"; +// std::string new_dbName = "TEST_DB"; DBConnector db(TEST_DB, 0, true); // ZmqClient client(pushEndpoint, 3000); ZmqClient client(pushEndpoint); @@ -563,7 +563,7 @@ TEST(ZmqWithResponseClientError, test) std::string dbName, tableName; p.send(kcos); // Wait will timeout without server reply. -// EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); - EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p)); + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); +// EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p)); }