Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…-common into zmq
  • Loading branch information
divyagayathri-hcl committed Nov 5, 2024
2 parents 2bd500b + 877c433 commit 5caad2d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 172 deletions.
30 changes: 30 additions & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,35 +198,65 @@ void ZmqClient::sendMsg(
}

bool ZmqClient::wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer)

{

SWSS_LOG_ENTER();

int rc;

for (int i = 0; true ; ++i)

{

rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0);

if (rc < 0)

{

if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)

{

continue;

}

SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());

}

if (rc >= (int)buffer.size())

{

SWSS_LOG_THROW(

"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",

(int)buffer.size(), rc);

}

break;

}

buffer.at(rc) = 0; // make sure that we end string with zero before parse

kcos.clear();

BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos);

return true;

}

}
5 changes: 5 additions & 0 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
~ZmqClient();
Expand All @@ -24,9 +25,13 @@ class ZmqClient
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);

bool wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer);

private:
Expand Down
19 changes: 12 additions & 7 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)

{

return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer);

}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand All @@ -182,11 +194,4 @@ size_t ZmqProducerStateTable::dbUpdaterQueueSize()
return m_asyncDBUpdater->queueSize();
}

bool ZmqProducerStateTable::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer);
}

}
6 changes: 4 additions & 2 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

size_t dbUpdaterQueueSize();

// This method should only be used if the ZmqClient enables one-to-one sync.

virtual bool wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);

Expand Down
85 changes: 13 additions & 72 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: ZmqServer(endpoint, "")
: m_endpoint(endpoint)
{
}

Expand All @@ -34,6 +34,9 @@ ZmqServer::~ZmqServer()
m_mqPollThread->join();

zmq_close(m_socket);

zmq_ctx_destroy(m_context);

}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -109,21 +112,24 @@ void ZmqServer::mqPollThread()
int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
m_endpoint.c_str(),
zmq_errno());
zmq_errno(),
strerror(zmq_errno()));
}

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
Expand All @@ -134,7 +140,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -163,75 +169,10 @@ void ZmqServer::mqPollThread()
handleReceivedData(m_buffer.data(), rc);
}

zmq_close(socket);
zmq_ctx_destroy(context);
zmq_close(m_socket);
zmq_ctx_destroy(m_context);

SWSS_LOG_NOTICE("mqPollThread end");
}

void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_buffer.data(),
m_buffer.size(),
dbName,
tableName,
values);

SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);
if (rc >= 0)
{
m_allowZmqPoll = true;
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
return;
}

zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
if (zmq_err == EINTR
|| zmq_err== EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
// error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);
retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
usleep(retry_delay * 1000);
}
// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

}
12 changes: 9 additions & 3 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:

void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -59,6 +59,12 @@ class ZmqServer

std::string m_vrf;

void* m_context;

void* m_socket;

bool m_allowZmqPoll;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
88 changes: 0 additions & 88 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,94 +403,6 @@ static void testBatchMethod(bool producerPersistence)
cout << endl << "Done." << endl;
}

static bool zmq_done = false;
static void zmqConsumerWorker(string tableName, string endpoint)
{
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint);
ZmqConsumerStateTable c(&db, tableName, server);
Select cs;
cs.addSelectable(&c);
//validate received data
Selectable *selectcs;
std::deque<KeyOpFieldsValuesTuple> vkco;
int ret = 0;
while (!zmq_done)
{
ret = cs.select(&selectcs, 10, true);
if (ret == Select::OBJECT)
{
c.pops(vkco);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
server.sendMsg(TEST_DB, tableName, values);
}
}
allDataReceived = true;
cout << "Consumer thread ended: " << tableName << endl;
}
TEST(ZmqOneToOneSync, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint);
// Wait for the consumer to be ready.
sleep(1);
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
for (int i =0; i < 3; ++i)
{
p.send(kcos);
ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
EXPECT_EQ(dbName, TEST_DB);
EXPECT_EQ(tableName, testTableName);
ASSERT_EQ(kcos_p.size(), 1);
EXPECT_EQ(kfvKey(*kcos_p[0]), "k");
EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND);
std::vector<FieldValueTuple> cos = std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}};
EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos);
}
zmq_done = true;
consumerThread->join();
delete consumerThread;
}
TEST(ZmqOneToOneSyncClientError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
// Send will return error without server reply.
EXPECT_THROW(p.send(kcos), std::system_error);
}
TEST(ZmqOneToOneSyncServerError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pullEndpoint = "tcp://*:1234";
DBConnector db(TEST_DB, 0, true);
ZmqServer server(pullEndpoint);
ZmqConsumerStateTable c(&db, testTableName, server);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
// Send will return error without client request.
EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error);
}
TEST(ZmqConsumerStateTable, test)
{
// test with persist by consumer
Expand Down

0 comments on commit 5caad2d

Please sign in to comment.