Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Dec 7, 2024
2 parents bbbe998 + fb6ce44 commit ea87661
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 24 deletions.
3 changes: 2 additions & 1 deletion common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,12 @@ namespace swss {

#define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT"

#define CFG_MEMORY_STATISTICS_TABLE_NAME "MEMORY_STATISTICS"

#define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE"
#define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE"
#define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE"


/***** STATE DATABASE *****/

#define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY"
Expand Down
28 changes: 18 additions & 10 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void ZmqClient::sendMsg(
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -211,7 +211,7 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -224,40 +224,48 @@ bool ZmqClient::wait(std::string& dbName,

/* zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;
items[0].events = ZMQ_POLLIN; */

zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

int rc;
for (int i = 0; true; ++i)
{
SWSS_LOG_DEBUG("m_waitTimeMs is : %d", (int)m_waitTimeMs);
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("cli: rc value is : %d", rc);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out");
SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait");
return false;
// continue;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll");
continue;
}
SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno());
} */
SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno());
}

int rc;
for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive");
continue;
}
SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno());
SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
Expand Down
2 changes: 1 addition & 1 deletion common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ZmqClient

private:
void initialize(const std::string& endpoint, const std::string& vrf);
// void initialize(const std::string& endpoint);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand Down
24 changes: 19 additions & 5 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand Down Expand Up @@ -96,6 +97,11 @@ void ZmqServer::mqPollThread()
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

//divya
int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code);
//divya

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
Expand All @@ -106,6 +112,7 @@ void ZmqServer::mqPollThread()
}

int rc = zmq_bind(socket, m_endpoint.c_str());
SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc);
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
Expand All @@ -122,12 +129,14 @@ void ZmqServer::mqPollThread()
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
SWSS_LOG_DEBUG("m_runThread: %d", m_runThread);
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -137,6 +146,7 @@ void ZmqServer::mqPollThread()

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -184,10 +194,10 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);
SWSS_LOG_DEBUG("rc value is : %d", rc);
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, ZMQ_NOBLOCK);
SWSS_LOG_DEBUG("ser: rc value is : %d", rc);
if (rc >= 0)
{
m_allowZmqPoll = true;
Expand All @@ -200,7 +210,7 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err);

if (zmq_err == EINTR
|| zmq_err== EFSM)
|| zmq_err == EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
Expand Down Expand Up @@ -228,7 +238,9 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Else case message is: %s", message.c_str());
// return;
}
usleep(retry_delay * 1000);
}
Expand All @@ -237,6 +249,8 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Last Error message is %s", message.c_str());
// return;
}

}
14 changes: 7 additions & 7 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,14 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste
static void ZmqWithResponse(bool producerPersistence)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string db_Name = "TEST_DB";
// std::string db_Name = "TEST_DB";
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to be ready.
sleep(10);
sleep(1);
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint);
ZmqProducerStateTable p(&db, testTableName, client, true);
Expand All @@ -526,8 +526,8 @@ static void ZmqWithResponse(bool producerPersistence)
for (int i = 0; i < 3; ++i)
{
p.send(kcos);
// ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
ASSERT_TRUE(p.wait(db_Name, testTableName, kcos_p));
ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
// ASSERT_TRUE(p.wait(db_Name, testTableName, kcos_p));
EXPECT_EQ(dbName, TEST_DB);
EXPECT_EQ(tableName, testTableName);
ASSERT_EQ(kcos_p.size(), 1);
Expand All @@ -552,7 +552,7 @@ TEST(ZmqWithResponseClientError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
std::string new_dbName = "TEST_DB";
// std::string new_dbName = "TEST_DB";
DBConnector db(TEST_DB, 0, true);
// ZmqClient client(pushEndpoint, 3000);
ZmqClient client(pushEndpoint);
Expand All @@ -563,7 +563,7 @@ TEST(ZmqWithResponseClientError, test)
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
// EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p));
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
// EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p));
}

0 comments on commit ea87661

Please sign in to comment.