Skip to content

Commit

Permalink
Merge branch 'master' into zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
divyagayathri-hcl authored Nov 6, 2024
2 parents 5caad2d + e812954 commit 4710209
Show file tree
Hide file tree
Showing 31 changed files with 1,525 additions and 22 deletions.
12 changes: 11 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ common_libswsscommon_la_SOURCES = \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp
common/redis_table_waiter.cpp \
common/c-api/util.cpp \
common/c-api/dbconnector.cpp \
common/c-api/consumerstatetable.cpp \
common/c-api/producerstatetable.cpp \
common/c-api/subscriberstatetable.cpp \
common/c-api/zmqclient.cpp \
common/c-api/zmqserver.cpp \
common/c-api/zmqconsumerstatetable.cpp \
common/c-api/zmqproducerstatetable.cpp \
common/performancetimer.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)
Expand Down
26 changes: 24 additions & 2 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define __BINARY_SERIALIZER__

#include "common/armhelper.h"
#include "common/rediscommand.h"
#include "common/table.h"

#include <string>

Expand All @@ -11,6 +13,26 @@ namespace swss {

class BinarySerializer {
public:
static size_t serializedSize(const string &dbName, const string &tableName,
const vector<KeyOpFieldsValuesTuple> &kcos) {
size_t n = 0;
n += dbName.size() + sizeof(size_t);
n += tableName.size() + sizeof(size_t);

for (const KeyOpFieldsValuesTuple &kco : kcos) {
const vector<FieldValueTuple> &fvs = kfvFieldsValues(kco);
n += kfvKey(kco).size() + sizeof(size_t);
n += to_string(fvs.size()).size() + sizeof(size_t);

for (const FieldValueTuple &fv : fvs) {
n += fvField(fv).size() + sizeof(size_t);
n += fvValue(fv).size() + sizeof(size_t);
}
}

return n + sizeof(size_t);
}

static size_t serializeBuffer(
const char* buffer,
const size_t size,
Expand Down Expand Up @@ -192,8 +214,8 @@ class BinarySerializer {
{
if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size)
{
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\
key count: %zu, data length %zu, buffer size: %zu",
SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\n"
" key count: %zu, data length %zu, buffer size: %zu",
m_kvp_count,
datalen,
m_buffer_size);
Expand Down
34 changes: 34 additions & 0 deletions common/c-api/consumerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <cstdlib>
#include <cstring>
#include <deque>

#include "../consumerstatetable.h"
#include "../dbconnector.h"
#include "../table.h"
#include "consumerstatetable.h"
#include "util.h"

using namespace swss;
using namespace std;

SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *p_popBatchSize,
const int32_t *p_pri) {
int popBatchSize = p_popBatchSize ? numeric_cast<int>(*p_popBatchSize)
: TableConsumable::DEFAULT_POP_BATCH_SIZE;
int pri = p_pri ? numeric_cast<int>(*p_pri) : 0;
SWSSTry(return (SWSSConsumerStateTable) new ConsumerStateTable(
(DBConnector *)db, string(tableName), popBatchSize, pri));
}

void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl) {
SWSSTry(delete (ConsumerStateTable *)tbl);
}

SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl) {
SWSSTry({
deque<KeyOpFieldsValuesTuple> vkco;
((ConsumerStateTable *)tbl)->pops(vkco);
return makeKeyOpFieldValuesArray(vkco);
});
}
28 changes: 28 additions & 0 deletions common/c-api/consumerstatetable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#ifndef SWSS_COMMON_C_API_CONSUMERSTATETABLE_H
#define SWSS_COMMON_C_API_CONSUMERSTATETABLE_H

#include "dbconnector.h"
#include "util.h"

#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>

typedef struct SWSSConsumerStateTableOpaque *SWSSConsumerStateTable;

// Pass NULL for popBatchSize and/or pri to use the default values
SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName,
const int32_t *popBatchSize, const int32_t *pri);

void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl);

// Result array and all of its members must be freed using free()
SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl);

#ifdef __cplusplus
}
#endif

#endif
84 changes: 84 additions & 0 deletions common/c-api/dbconnector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <cstring>
#include <string>

#include "../dbconnector.h"
#include "dbconnector.h"
#include "util.h"

using namespace swss;
using namespace std;

void SWSSSonicDBConfig_initialize(const char *path) {
SWSSTry(SonicDBConfig::initialize(path));
}

void SWSSSonicDBConfig_initializeGlobalConfig(const char *path) {
SWSSTry(SonicDBConfig::initializeGlobalConfig(path));
}

SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
uint32_t timeout) {
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(hostname), port, timeout));
}

SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout) {
SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(sock_path), timeout));
}

SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn) {
SWSSTry(return (SWSSDBConnector) new DBConnector(string(dbName), timeout_ms, isTcpConn));
}

void SWSSDBConnector_free(SWSSDBConnector db) {
delete (DBConnector *)db;
}

int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) {
SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0);
}

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value) {
SWSSTry(((DBConnector *)db)->set(string(key), string(value)));
}

char *SWSSDBConnector_get(SWSSDBConnector db, const char *key) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->get(string(key));
return s ? strdup(s->c_str()) : nullptr;
});
}

int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key) {
SWSSTry(return ((DBConnector *)db)->exists(string(key)) ? 1 : 0);
}

int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry(return ((DBConnector *)db)->hdel(string(key), string(field)) ? 1 : 0);
}

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
const char *value) {
SWSSTry(((DBConnector *)db)->hset(string(key), string(field), string(value)));
}

char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry({
shared_ptr<string> s = ((DBConnector *)db)->hget(string(key), string(field));
return s ? strdup(s->c_str()) : nullptr;
});
}

SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) {
SWSSTry({
auto map = ((DBConnector *)db)->hgetall(key);
return makeFieldValueArray(map);
});
}

int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field) {
SWSSTry(return ((DBConnector *)db)->hexists(string(key), string(field)) ? 1 : 0);
}

int8_t SWSSDBConnector_flushdb(SWSSDBConnector db) {
SWSSTry(return ((DBConnector *)db)->flushdb() ? 1 : 0);
}
101 changes: 101 additions & 0 deletions common/c-api/dbconnector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#ifndef SWSS_COMMON_C_API_DBCONNECTOR_H
#define SWSS_COMMON_C_API_DBCONNECTOR_H

#include "util.h"
#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>

void SWSSSonicDBConfig_initialize(const char *path);

void SWSSSonicDBConfig_initializeGlobalConfig(const char *path);

typedef struct SWSSDBConnectorOpaque *SWSSDBConnector;

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port,
uint32_t timeout_ms);

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout_ms);

// Pass 0 to timeout for infinity
SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn);

void SWSSDBConnector_free(SWSSDBConnector db);

// Returns 0 when key doesn't exist, 1 when key was deleted
int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key);

void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value);

// Returns NULL if key doesn't exist.
// Result must be freed using free()
char *SWSSDBConnector_get(SWSSDBConnector db, const char *key);

// Returns 0 for false, 1 for true
int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field was deleted
int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field);

void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field,
const char *value);

// Returns NULL if key or field doesn't exist.
// Result must be freed using free()
char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field);

// Returns an empty map when the key doesn't exist.
// Result array and all of its elements must be freed using free()
SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key);

// Returns 0 when key or field doesn't exist, 1 when field exists
int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field);

// std::vector<std::string> keys(const std::string &key);

// std::pair<int, std::vector<std::string>> scan(int cursor = 0, const char
// *match = "", uint32_t count = 10);

// template<typename InputIterator>
// void hmset(const std::string &key, InputIterator start, InputIterator stop);

// void hmset(const std::unordered_map<std::string,
// std::vector<std::pair<std::string, std::string>>>& multiHash);

// std::shared_ptr<std::string> get(const std::string &key);

// std::shared_ptr<std::string> hget(const std::string &key, const std::string
// &field);

// int64_t incr(const std::string &key);

// int64_t decr(const std::string &key);

// int64_t rpush(const std::string &list, const std::string &item);

// std::shared_ptr<std::string> blpop(const std::string &list, int timeout);

// void subscribe(const std::string &pattern);

// void psubscribe(const std::string &pattern);

// void punsubscribe(const std::string &pattern);

// int64_t publish(const std::string &channel, const std::string &message);

// void config_set(const std::string &key, const std::string &value);

// Returns 1 on success, 0 on failure
int8_t SWSSDBConnector_flushdb(SWSSDBConnector db);

// std::map<std::string, std::map<std::string, std::map<std::string,
// std::string>>> getall();
#ifdef __cplusplus
}
#endif

#endif
53 changes: 53 additions & 0 deletions common/c-api/producerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include <cstring>
#include <string>

#include "../dbconnector.h"
#include "../producerstatetable.h"
#include "dbconnector.h"
#include "producerstatetable.h"
#include "util.h"

using namespace swss;
using namespace std;

SWSSProducerStateTable SWSSProducerStateTable_new(SWSSDBConnector db, const char *tableName) {
SWSSTry(return (SWSSProducerStateTable) new ProducerStateTable((DBConnector *)db,
string(tableName)));
}

void SWSSProducerStateTable_free(SWSSProducerStateTable tbl) {
SWSSTry(delete ((ProducerStateTable *)tbl));
}

void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buffered) {
SWSSTry(((ProducerStateTable *)tbl)->setBuffered((bool)buffered))
}

void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key,
SWSSFieldValueArray values) {
SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values)));
}

void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key) {
SWSSTry(((ProducerStateTable *)tbl)->del(string(key)));
}

void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl) {
SWSSTry(((ProducerStateTable *)tbl)->flush());
}

int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl) {
SWSSTry(return ((ProducerStateTable *)tbl)->count());
}

void SWSSProducerStateTable_clear(SWSSProducerStateTable tbl) {
SWSSTry(((ProducerStateTable *)tbl)->clear());
}

void SWSSProducerStateTable_create_temp_view(SWSSProducerStateTable tbl) {
SWSSTry(((ProducerStateTable *)tbl)->create_temp_view());
}

void SWSSProducerStateTable_apply_temp_view(SWSSProducerStateTable tbl) {
SWSSTry(((ProducerStateTable *)tbl)->apply_temp_view());
}
Loading

0 comments on commit 4710209

Please sign in to comment.