Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq_new
  • Loading branch information
divyagayathri-hcl committed Nov 20, 2024
2 parents 45cf9e2 + fe30ccd commit 82a41c6
Show file tree
Hide file tree
Showing 32 changed files with 658 additions and 254 deletions.
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ common_libswsscommon_la_SOURCES = \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp \
common/interface.h \
common/c-api/util.cpp \
common/c-api/dbconnector.cpp \
common/c-api/consumerstatetable.cpp \
Expand Down
17 changes: 16 additions & 1 deletion common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ AsyncDBUpdater::~AsyncDBUpdater()
// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str());
}

void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
Expand Down Expand Up @@ -61,16 +62,30 @@ void AsyncDBUpdater::dbUpdateThread()
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
while (true)
{
size_t count;
count = queueSize();
if (count == 0)
{
// Check if there still data in queue before exit
if (!m_runThread)
{
SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str());
break;
}

// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}
else
{
if (!m_runThread)
{
SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count);
}
}

for (size_t ie = 0; ie < count; ie++)
{
Expand Down
11 changes: 11 additions & 0 deletions common/c-api/consumerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <boost/numeric/conversion/cast.hpp>
#include <cstdlib>
#include <cstring>
#include <deque>
Expand All @@ -10,6 +11,7 @@

using namespace swss;
using namespace std;
using boost::numeric_cast;

SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *p_popBatchSize,
Expand All @@ -32,3 +34,12 @@ SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl
return makeKeyOpFieldValuesArray(vkco);
});
}

uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl) {
SWSSTry(return numeric_cast<uint32_t>(((ConsumerStateTable *)tbl)->getFd()));
}

SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
uint8_t interrupt_on_signal) {
SWSSTry(return selectOne((ConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal));
}
11 changes: 11 additions & 0 deletions common/c-api/consumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl);
// Result array and all of its members must be freed using free()
SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl);

// Return the underlying fd for polling/selecting on.
// Callers must NOT read/write on fd, it may only be used for epoll or similar.
// After the fd becomes readable, SWSSConsumerStateTable_readData must be used to
// reset the fd and read data into internal data structures.
uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl);

// Block until data is available to read or until a timeout elapses.
// A timeout of 0 means the call will return immediately.
SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms,
uint8_t interrupt_on_signal);

#ifdef __cplusplus
}
#endif
Expand Down
29 changes: 19 additions & 10 deletions common/c-api/dbconnector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <cstring>
#include <string>
#include <utility>

#include "../dbconnector.h"
#include "dbconnector.h"
Expand Down Expand Up @@ -37,14 +38,14 @@ int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) {
SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0);
}

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value) {
SWSSTry(((DBConnector *)db)->set(string(key), string(value)));
void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value) {
SWSSTry(((DBConnector *)db)->set(string(key), takeStrRef(value)));
}

char *SWSSDBConnector_get(SWSSDBConnector db, const char *key) {
SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->get(string(key));
return s ? strdup(s->c_str()) : nullptr;
return s ? makeString(move(*s)) : nullptr;
});
}

Expand All @@ -57,21 +58,29 @@ int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *fie
}

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
const char *value) {
SWSSTry(((DBConnector *)db)->hset(string(key), string(field), string(value)));
SWSSStrRef value) {
SWSSTry(((DBConnector *)db)->hset(string(key), string(field), takeStrRef(value)));
}

char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) {
SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->hget(string(key), string(field));
return s ? strdup(s->c_str()) : nullptr;
return s ? makeString(move(*s)) : nullptr;
});
}

SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) {
SWSSTry({
auto map = ((DBConnector *)db)->hgetall(key);
return makeFieldValueArray(map);
auto map = ((DBConnector *)db)->hgetall(string(key));

// We can't move keys out of the map, we have to copy them, until C++17 map::extract so we
// copy them here into a vector to avoid needing an overload on makeFieldValueArray
vector<pair<string, string>> pairs;
pairs.reserve(map.size());
for (auto &pair : map)
pairs.push_back(make_pair(pair.first, move(pair.second)));

return makeFieldValueArray(std::move(pairs));
});
}

Expand Down
57 changes: 10 additions & 47 deletions common/c-api/dbconnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,71 +29,34 @@ void SWSSDBConnector_free(SWSSDBConnector db);
// Returns 0 when key doesn't exist, 1 when key was deleted
int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key);

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value);
void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value);

// Returns NULL if key doesn't exist.
// Result must be freed using free()
char *SWSSDBConnector_get(SWSSDBConnector db, const char *key);
// Returns NULL if key doesn't exist
// Result must be freed using SWSSString_free()
SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key);

// Returns 0 for false, 1 for true
int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field was deleted
int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field);

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
const char *value);
void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, SWSSStrRef value);

// Returns NULL if key or field doesn't exist.
// Result must be freed using free()
char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field);
// Returns NULL if key or field doesn't exist
// Result must be freed using SWSSString_free()
SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field);

// Returns an empty map when the key doesn't exist.
// Result array and all of its elements must be freed using free()
// Returns an empty map when the key doesn't exist
// Result array and all of its elements must be freed using appropriate free functions
SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field exists
int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field);

// std::vector<std::string> keys(const std::string &key);

// std::pair<int, std::vector<std::string>> scan(int cursor = 0, const char
// *match = "", uint32_t count = 10);

// template<typename InputIterator>
// void hmset(const std::string &key, InputIterator start, InputIterator stop);

// void hmset(const std::unordered_map<std::string,
// std::vector<std::pair<std::string, std::string>>>& multiHash);

// std::shared_ptr<std::string> get(const std::string &key);

// std::shared_ptr<std::string> hget(const std::string &key, const std::string
// &field);

// int64_t incr(const std::string &key);

// int64_t decr(const std::string &key);

// int64_t rpush(const std::string &list, const std::string &item);

// std::shared_ptr<std::string> blpop(const std::string &list, int timeout);

// void subscribe(const std::string &pattern);

// void psubscribe(const std::string &pattern);

// void punsubscribe(const std::string &pattern);

// int64_t publish(const std::string &channel, const std::string &message);

// void config_set(const std::string &key, const std::string &value);

// Returns 1 on success, 0 on failure
int8_t SWSSDBConnector_flushdb(SWSSDBConnector db);

// std::map<std::string, std::map<std::string, std::map<std::string,
// std::string>>> getall();
#ifdef __cplusplus
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion common/c-api/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buff

void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key,
SWSSFieldValueArray values) {
SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values)));
SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(std::move(values))));
}

void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key) {
Expand Down
5 changes: 0 additions & 5 deletions common/c-api/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, SWS

void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key);

// Batched version of set() and del().
// virtual void set(const std::vector<KeyOpFieldsValuesTuple>& values);

// virtual void del(const std::vector<std::string>& keys);

void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl);

int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl);
Expand Down
19 changes: 7 additions & 12 deletions common/c-api/subscriberstatetable.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <boost/numeric/conversion/cast.hpp>
#include <cstdlib>
#include <cstring>
#include <deque>
Expand All @@ -11,6 +12,7 @@

using namespace swss;
using namespace std;
using boost::numeric_cast;

SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *p_popBatchSize,
Expand All @@ -34,19 +36,12 @@ SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable
});
}

uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl) {
SWSSTry(return ((SubscriberStateTable *)tbl)->hasData() ? 1 : 0);
}

uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl) {
SWSSTry(return ((SubscriberStateTable *)tbl)->hasCachedData() ? 1 : 0);
}

uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl) {
SWSSTry(return ((SubscriberStateTable *)tbl)->initializedWithData() ? 1 : 0);
uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl) {
SWSSTry(return numeric_cast<uint32_t>(((SubscriberStateTable *)tbl)->getFd()));
}

SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl,
uint32_t timeout_ms) {
SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms));
uint32_t timeout_ms,
uint8_t interrupt_on_signal) {
SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms, interrupt_on_signal));
}
16 changes: 7 additions & 9 deletions common/c-api/subscriberstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl);
// Result array and all of its members must be freed using free()
SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl);

// Returns 0 for false, 1 for true
uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl);

// Returns 0 for false, 1 for true
uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl);

// Returns 0 for false, 1 for true
uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl);
// Return the underlying fd for polling/selecting on.
// Callers must NOT read/write on fd, it may only be used for epoll or similar.
// After the fd becomes readable, SWSSSubscriberStateTable_readData must be used to
// reset the fd and read data into internal data structures.
uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl);

// Block until data is available to read or until a timeout elapses.
// A timeout of 0 means the call will return immediately.
SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl,
uint32_t timeout_ms);
uint32_t timeout_ms,
uint8_t interrupt_on_sugnal);

#ifdef __cplusplus
}
Expand Down
30 changes: 30 additions & 0 deletions common/c-api/util.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
#include "util.h"

using namespace swss;

bool swss::cApiTestingDisableAbort = false;

SWSSString SWSSString_new(const char *data, uint64_t length) {
SWSSTry(return makeString(std::string(data, numeric_cast<std::string::size_type>(length))));
}

SWSSString SWSSString_new_c_str(const char *c_str) {
SWSSTry(return makeString(std::string(c_str)));
}

const char *SWSSStrRef_c_str(SWSSStrRef s) {
SWSSTry(return ((std::string *)s)->c_str());
}

uint64_t SWSSStrRef_length(SWSSStrRef s) {
SWSSTry(return ((std::string *)s)->length());
}

void SWSSString_free(SWSSString s) {
SWSSTry(delete (std::string *)s);
}

void SWSSFieldValueArray_free(SWSSFieldValueArray arr) {
SWSSTry(delete[] arr.data);
}

void SWSSKeyOpFieldValuesArray_free(SWSSKeyOpFieldValuesArray kfvs) {
SWSSTry(delete[] kfvs.data);
}
Loading

0 comments on commit 82a41c6

Please sign in to comment.