Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Dec 5, 2024
2 parents c19af49 + aa1021f commit bcbb54e
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/c-api/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using namespace swss;
using namespace std;

SWSSZmqClient SWSSZmqClient_new(const char *endpoint) {
SWSSTry(return (SWSSZmqClient) new ZmqClient(endpoint));
SWSSTry(return (SWSSZmqClient) new ZmqClient(endpoint, ""));
}

void SWSSZmqClient_free(SWSSZmqClient zmqc) {
Expand Down
2 changes: 1 addition & 1 deletion common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class RedisPipeline {
return;

m_channels.insert(channel);
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G')\n";
m_shaPub = loadRedisScript(m_luaPub);
}

Expand Down
52 changes: 48 additions & 4 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
initialize(endpoint, vrf);
}

ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) :
m_waitTimeMs(waitTimeMs)
{
// m_waitTimeMs = waitTimeMs;
initialize(endpoint);
}

ZmqClient::~ZmqClient()
{
std::lock_guard<std::mutex> lock(m_socketMutex);
Expand Down Expand Up @@ -55,6 +62,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)

connect();
}

void ZmqClient::initialize(const std::string& endpoint)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);

connect();
}

bool ZmqClient::isConnected()
{
Expand Down Expand Up @@ -137,7 +155,7 @@ void ZmqClient::sendMsg(
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
{
// ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq
Expand Down Expand Up @@ -202,8 +220,32 @@ bool ZmqClient::wait(std::string& dbName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
SWSS_LOG_ENTER();

zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;

int rc;
for (int i = 0; true ; ++i)
for (int i = 0; true; ++i)
{
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out");
return false;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
continue;
}
SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno());
}

for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
Expand All @@ -212,13 +254,15 @@ bool ZmqClient::wait(std::string& dbName,
{
continue;
}
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
{
SWSS_LOG_THROW(
SWSS_LOG_ERROR(
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
(int)m_sendbuffer.size(), rc);
return false;
}
break;
}
Expand Down
4 changes: 4 additions & 0 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ZmqClient

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

bool isConnected();
Expand All @@ -31,6 +32,7 @@ class ZmqClient

private:
void initialize(const std::string& endpoint, const std::string& vrf);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand All @@ -42,6 +44,8 @@ class ZmqClient

bool m_connected;

uint32_t m_waitTimeMs;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
Expand Down
112 changes: 93 additions & 19 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
m_vrf(vrf),
m_allowZmqPoll(true)
{
connect();
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
m_mqPollThread = std::make_shared<std::thread>(&ZmqServer::mqPollThread, this);
Expand All @@ -33,6 +34,9 @@ ZmqServer::~ZmqServer()
m_allowZmqPoll = true;
m_runThread = false;
m_mqPollThread->join();

zmq_close(m_socket);
zmq_ctx_destroy(m_context);
}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -87,37 +91,40 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size)
handler->handleReceivedData(kcos);
}

void ZmqServer::mqPollThread()
void ZmqServer::connect()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");
m_context = zmq_ctx_new();

// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);
m_socket = zmq_socket(m_context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
int rc = zmq_bind(m_socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno(),
strerror(zmq_errno()));
zmq_errno());
}
}

void ZmqServer::mqPollThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

Expand All @@ -127,7 +134,7 @@ void ZmqServer::mqPollThread()
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
auto rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -136,7 +143,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -165,10 +172,77 @@ void ZmqServer::mqPollThread()
handleReceivedData(m_buffer.data(), rc);
}

zmq_close(socket);
zmq_ctx_destroy(context);

while (!m_allowZmqPoll)
{
usleep(10);
}
SWSS_LOG_NOTICE("mqPollThread end");
}

void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_buffer.data(),
m_buffer.size(),
dbName,
tableName,
values);
SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);

if (rc >= 0)
{
m_allowZmqPoll = true;
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
return;
}
zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
if (zmq_err == EINTR
|| zmq_err== EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
// error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

}
11 changes: 11 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:
void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -56,8 +61,14 @@ class ZmqServer

std::string m_vrf;

void* m_context;

void* m_socket;

bool m_allowZmqPoll;

std::vector<char> m_sendbuffer;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
Loading

0 comments on commit bcbb54e

Please sign in to comment.