Skip to content

Commit

Permalink
[common] enable redispipeline to only publish after flush (#895)
Browse files Browse the repository at this point in the history
What I did

optimize redispipeline flush performance by remove unnecessary publish commands
add a new parameterbool flushPub in producerstatetable constructor function
to enable/disable batch publish feature
default value of m_flushPub is false, so no impact on existing codes
optimization is effective only explicitly set this option
remove individual publish command from the producerstatetable APIs' lua scripts
add a publish command when the pipeline flushes [if m_flushPub is true]
Why I did it

save TCP traffic and increase fpmsyncd efficiency
It's a feature included in BGP Loading Optimization HLD #1521
  • Loading branch information
a114j0y authored Nov 18, 2024
1 parent 378e828 commit 901f3b4
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 35 deletions.
93 changes: 58 additions & 35 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,71 @@ using namespace std;
namespace swss {

ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false)
{
m_pipeowned = true;
}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: ProducerStateTable(pipeline, tableName, buffered, false) {}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
, TableName_KeySet(tableName)
, m_flushPub(flushPub)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
reloadRedisScript();

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
{
delete m_pipe;
}
}

void ProducerStateTable::reloadRedisScript()
{
// Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush

// However, if m_buffered is false, follow the original one publish per lua design
// Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered

/* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */
if (m_buffered && m_flushPub)
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));

/* 2. Setup lua strings: determine whether to attach luaPub after each lua string */

// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);

string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);
"redis.call('DEL', KEYS[3])\n";

string luaBatchedSet =
"local added = 0\n"
Expand All @@ -59,48 +91,39 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
if (!m_flushPub || !m_buffered)
{
delete m_pipe;
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}

/* 3. load redis script based on the lua string */
m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}

void ProducerStateTable::setBuffered(bool buffered)
{
m_buffered = buffered;
reloadRedisScript();
}

void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
Expand Down
4 changes: 4 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
public:
ProducerStateTable(DBConnector *db, const std::string &tableName);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered, bool flushPub);
virtual ~ProducerStateTable();

void setBuffered(bool buffered);
Expand Down Expand Up @@ -51,6 +52,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet

void apply_temp_view();
private:
bool m_flushPub; // publish per piepeline flush intead of per redis script
bool m_buffered;
bool m_pipeowned;
bool m_tempViewActive;
Expand All @@ -62,6 +64,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;

void reloadRedisScript(); // redis script may change if m_buffered changes
};

}
44 changes: 44 additions & 0 deletions common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include <string>
#include <queue>
#include <unordered_set>
#include <functional>
#include <chrono>
#include <iostream>
#include "redisreply.h"
#include "rediscommand.h"
#include "dbconnector.h"
Expand All @@ -22,9 +25,11 @@ class RedisPipeline {
RedisPipeline(const DBConnector *db, size_t sz = 128)
: COMMAND_MAX(sz)
, m_remaining(0)
, m_shaPub("")
{
m_db = db->newConnector(NEWCONNECTOR_TIMEOUT);
initializeOwnerTid();
lastHeartBeat = std::chrono::steady_clock::now();
}

~RedisPipeline() {
Expand Down Expand Up @@ -113,11 +118,19 @@ class RedisPipeline {

void flush()
{
lastHeartBeat = std::chrono::steady_clock::now();

if (m_remaining == 0) {
return;
}

while(m_remaining)
{
// Construct an object to use its dtor, so that resource is released
RedisReply r(pop());
}

publish();
}

size_t size()
Expand Down Expand Up @@ -145,12 +158,43 @@ class RedisPipeline {
m_ownerTid = gettid();
}

void addChannel(std::string channel)
{
if (m_channels.find(channel) != m_channels.end())
return;

m_channels.insert(channel);
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
m_shaPub = loadRedisScript(m_luaPub);
}

int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now())
{
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
}

void publish() {
if (m_shaPub.empty()) {
return;
}
RedisCommand cmd;
cmd.format(
"EVALSHA %s 0",
m_shaPub.c_str());
RedisReply r(m_db, cmd);
}

private:
DBConnector *m_db;
std::queue<int> m_expectedTypes;
size_t m_remaining;
long int m_ownerTid;

std::string m_luaPub;
std::string m_shaPub;
std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked
std::unordered_set<std::string> m_channels;

void mayflush()
{
if (m_remaining >= COMMAND_MAX)
Expand Down
56 changes: 56 additions & 0 deletions tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,59 @@ TEST(ConsumerStateTable, async_multitable)

cout << endl << "Done." << endl;
}

TEST(ConsumerStateTable, flushPub)
{
clearDB();

/* Prepare producer */
int index = 0;
string tableName = "UT_REDIS_THREAD_" + to_string(index);
DBConnector db(TEST_DB, 0, true);
RedisPipeline pipeline(&db);
ProducerStateTable p(&pipeline, tableName, false, true);
p.setBuffered(true);

string key = "TheKey";
int maxNumOfFields = 2;

/* Set operation */
{
vector<FieldValueTuple> fields;
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
p.set(key, fields);
}

/* Del operation */
p.del(key);
p.flush();

/* Prepare consumer */
ConsumerStateTable c(&db, tableName);
Select cs;
Selectable *selectcs;
cs.addSelectable(&c);

/* First pop operation */
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "DEL");

auto fvs = kfvFieldsValues(kco);
EXPECT_EQ(fvs.size(), 0U);
}

/* Second select operation */
{
int ret = cs.select(&selectcs, 1000);
EXPECT_EQ(ret, Select::TIMEOUT);
}
}

0 comments on commit 901f3b4

Please sign in to comment.