Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQ lib change. #925

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
94 changes: 90 additions & 4 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ using namespace std;
namespace swss {

ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
//:ZmqClient(endpoint, "")
{
initialize(endpoint);
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}

/*ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) :
m_waitTimeMs(waitTimeMs)
{
// m_waitTimeMs = waitTimeMs;
initialize(endpoint);
}*/

ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
Expand Down Expand Up @@ -55,6 +63,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)

connect();
}

void ZmqClient::initialize(const std::string& endpoint)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);

connect();
}

bool ZmqClient::isConnected()
{
Expand Down Expand Up @@ -137,7 +156,7 @@ void ZmqClient::sendMsg(
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
Expand Down Expand Up @@ -183,7 +202,7 @@ void ZmqClient::sendMsg(
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -192,9 +211,76 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

bool ZmqClient::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ENTER();

/* zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN; */

zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

int rc;
for (int i = 0; true; ++i)
{
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("cli: rc value is : %d", rc);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait");
return false;
// continue;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll");
continue;
}
SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno());
}

for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive");
continue;
}
SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
{
SWSS_LOG_ERROR(
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
(int)m_sendbuffer.size(), rc);
// return false;
}
break;
}
m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse
kcos.clear();
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);
return true;
}

}
10 changes: 10 additions & 0 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
// ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

bool isConnected();
Expand All @@ -23,8 +25,14 @@ class ZmqClient
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand All @@ -36,6 +44,8 @@ class ZmqClient

bool m_connected;

// uint32_t m_waitTimeMs;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
Expand Down
7 changes: 7 additions & 0 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos);
}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
4 changes: 4 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

virtual bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
88 changes: 87 additions & 1 deletion common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand All @@ -18,7 +19,8 @@ ZmqServer::ZmqServer(const std::string& endpoint)

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
m_vrf(vrf),
m_allowZmqPoll(true)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand All @@ -29,6 +31,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)

ZmqServer::~ZmqServer()
{
m_allowZmqPoll = true;
m_runThread = false;
m_mqPollThread->join();
}
Expand Down Expand Up @@ -94,6 +97,11 @@ void ZmqServer::mqPollThread()
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

//divya
int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code);
//divya

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
Expand All @@ -104,6 +112,7 @@ void ZmqServer::mqPollThread()
}

int rc = zmq_bind(socket, m_endpoint.c_str());
SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc);
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
Expand All @@ -120,10 +129,14 @@ void ZmqServer::mqPollThread()
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
SWSS_LOG_DEBUG("m_runThread: %d", m_runThread);
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -133,6 +146,7 @@ void ZmqServer::mqPollThread()

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -167,4 +181,76 @@ void ZmqServer::mqPollThread()
SWSS_LOG_NOTICE("mqPollThread end");
}

void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_buffer.data(),
m_buffer.size(),
dbName,
tableName,
values);
SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, ZMQ_NOBLOCK);
SWSS_LOG_DEBUG("ser: rc value is : %d", rc);
if (rc >= 0)
{
m_allowZmqPoll = true;
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
return;
}
zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err);

if (zmq_err == EINTR
|| zmq_err == EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
// error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Else case message is: %s", message.c_str());
// return;
}
usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Last Error message is %s", message.c_str());
// return;
}

}
10 changes: 10 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -56,6 +60,12 @@ class ZmqServer

std::string m_vrf;

void* m_socket;

bool m_allowZmqPoll;

// std::vector<char> m_sendbuffer;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
Loading
Loading