Skip to content

Commit

Permalink
Add VRF support to ZMQ server/client (#920)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuh-80 authored Sep 30, 2024
1 parent 24979b0 commit 898aa5d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
15 changes: 13 additions & 2 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ using namespace std;
namespace swss {

ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
initialize(endpoint);
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}

ZmqClient::~ZmqClient()
Expand All @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient()
}
}

void ZmqClient::initialize(const std::string& endpoint)
void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_vrf = vrf;

connect();
}
Expand Down Expand Up @@ -89,6 +95,11 @@ void ZmqClient::connect()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str());
int rc = zmq_connect(m_socket, m_endpoint.c_str());
if (rc != 0)
Expand Down
6 changes: 4 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ZmqClient
{
public:
ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
~ZmqClient();

bool isConnected();
Expand All @@ -24,11 +25,12 @@ class ZmqClient
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
private:
void initialize(const std::string& endpoint);

void initialize(const std::string& endpoint, const std::string& vrf);

std::string m_endpoint;

std::string m_vrf;

void* m_context;

void* m_socket;
Expand Down
13 changes: 12 additions & 1 deletion common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: m_endpoint(endpoint)
: ZmqServer(endpoint, "")
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand Down Expand Up @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
Expand Down
3 changes: 3 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ZmqServer
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqServer(const std::string& endpoint);
ZmqServer(const std::string& endpoint, const std::string& vrf);
~ZmqServer();

void registerMessageHandler(
Expand All @@ -53,6 +54,8 @@ class ZmqServer

std::string m_endpoint;

std::string m_vrf;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down

0 comments on commit 898aa5d

Please sign in to comment.