Skip to content

Commit

Permalink
Merge pull request #48 from cyberway/47-skip-sended-events
Browse files Browse the repository at this point in the history
Skip events which was send already #47
  • Loading branch information
afalaleev authored May 3, 2020
2 parents 188798e + 60d8b0b commit 249ce6e
Showing 1 changed file with 167 additions and 11 deletions.
178 changes: 167 additions & 11 deletions notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <tuple>
#include <vector>
#include <mutex>
#include <functional>

#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
Expand Down Expand Up @@ -139,6 +140,109 @@ static natsStatus send_nats_message(stanConnection* sc, stanConnOptions* connOpt
return s;
}


typedef std::function<bool(stanConnection*, stanSubscription*, const char*, stanMsg*)> onMsgFunction;

const uint64_t SEQ_LAST = ~(uint64_t)0;
const uint64_t SEQ_ALL = (uint64_t)0;

natsStatus getMessagesFromNats(stanConnection *sc, const char *channel, uint64_t deliverSeq, onMsgFunction onMsg, std::optional<int64_t> timeout = {}) {
stanSubOptions *subOpts = NULL;
stanSubscription *sub = NULL;
natsStatus s;
struct Status {
onMsgFunction onMsg;
bool cont;
} status{onMsg, true};

s = stanSubOptions_Create(&subOpts);

if (s == NATS_OK) {
if (deliverSeq == SEQ_LAST) {
s = stanSubOptions_StartWithLastReceived(subOpts);
} else if (deliverSeq == SEQ_ALL) {
s = stanSubOptions_DeliverAllAvailable(subOpts);
} else {
s = stanSubOptions_StartAtSequence(subOpts, deliverSeq);
}
}

if (s == NATS_OK)
s = stanSubOptions_SetMaxInflight(subOpts, 4*1024);

if (s == NATS_OK)
s = stanConnection_Subscribe(&sub, sc, channel, [](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg, void *closure) {
Status *st = static_cast<Status*>(closure);
if (st->onMsg(sc, sub, channel, msg) == false) {
st->cont = false;
}
stanMsg_Destroy(msg);
}, &status, subOpts);

stanSubOptions_Destroy(subOpts);

int64_t start = nats_Now();

if (s == NATS_OK) {
while (!done && status.cont) {
if (timeout.has_value()) {
elapsed = nats_Now() - start;
if (elapsed > timeout) break;
}
nats_Sleep(15);
}
}

if (sub != NULL)
s = stanSubscription_Close(sub);

return s;
}

struct BlockId {
unsigned number = 0;
std::string id = {};
};

natsStatus getLastCommitBlock(stanConnection *sc, BlockId &commitBlockId, BlockId &lastBlockId) {
uint64_t lastSequence = 0;
natsStatus s;

s = getMessagesFromNats(sc, "Blocks", SEQ_LAST, [&](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg)->bool {
lastSequence = stanMsg_GetSequence(msg);
return false;
}, 1000);
if (s != NATS_OK) return s;

while(commitBlockId.number == 0 && lastSequence > 0) {
uint64_t startSequence = (lastSequence < 100) ? 0 : lastSequence-100;
s = getMessagesFromNats(sc, "Blocks", startSequence, [&](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg)->bool {
namespace pt = boost::property_tree;
uint64_t sequence = stanMsg_GetSequence(msg);
std::string data(stanMsg_GetData(msg), stanMsg_GetDataLength(msg));
std::stringstream ss(data);
pt::ptree root;

try {
pt::read_json(ss, root);
std::string msg_type = root.get<std::string>("msg_type");
unsigned block_num = (msg_type == "AcceptTrx") ? 0 : root.get<unsigned>("block_num");
std::string id = root.get<std::string>("id");
if ((msg_type == "CommitBlock" || msg_type == "AcceptBlock") && (lastBlockId.number < block_num)) lastBlockId = {block_num, id};
if ((msg_type == "CommitBlock") && (commitBlockId.number < block_num)) commitBlockId = {block_num, id};
} catch (const pt::ptree_error& err) {}

return sequence < lastSequence;
}, 10*1000);
if (s != NATS_OK) return s;

lastSequence = startSequence;
}

return NATS_OK;
}


int main(int argc, char** argv) {
opts = parseArgs(argc, argv, "");
std::cerr << "Sending socket messages" << std::endl;
Expand Down Expand Up @@ -177,6 +281,18 @@ int main(int argc, char** argv) {
}
std::remove(backup_file.c_str());

BlockId commitBlockId, lastBlockId;
{
natsStatus s2;
std::cout << "Receve last & commit block from nats" << std::endl;
s2 = getLastCommitBlock(sc, commitBlockId, lastBlockId);
if (s2 != NATS_OK) {
return 2;
}
std::cout << "CommitBlock: " << commitBlockId.number << ", " << commitBlockId.id << std::endl;
std::cout << " LastBlock: " << lastBlockId.number << ", " << lastBlockId.id << std::endl;
}

try {
socket_stream.connect(ep);
if (socket_stream.native_non_blocking()) {
Expand Down Expand Up @@ -204,23 +320,63 @@ int main(int argc, char** argv) {
std::istream data_stream(&socket_buf);
std::getline(data_stream, msg.data);

// try {
// // json validating
// std::stringstream local_stream;
// local_stream << msg.data;
// boost::property_tree::ptree pt;
// boost::property_tree::read_json(local_stream, pt);
// } catch (...) {
// std::cerr << "Data error: " << msg.data << std::endl;
// throw;
// }

if (print) {
std::cout << msg.data << std::endl;
}

msg.index = msg_index++;
msg.subject = get_subject(msg.data);

if (lastBlockId.number || commitBlockId.number) {
if (msg.subject != "Blocks") {
std::cerr << "Invalid message subject '" << msg.subject << "' for synchronize mode" << std::endl;
break;
}

namespace pt = boost::property_tree;
try {
std::stringstream ss(msg.data);
pt::ptree root;
pt::read_json(ss, root);
std::string msg_type = root.get<std::string>("msg_type");
std::string id = root.get<std::string>("id");
unsigned block_num = (msg_type == "AcceptTrx") ? commitBlockId.number : root.get<unsigned>("block_num");

if (block_num < commitBlockId.number) continue;

if (commitBlockId.number && msg_type == "CommitBlock") {
if (block_num == commitBlockId.number) {
if (id != commitBlockId.id) {
std::cerr << "Invalid last commit block in synchronize mode. Expect: " << commitBlockId.id << ", get: " << id << std::endl;
break;
} else {
std::cerr << "Found last commit block in synchronize mode: " << commitBlockId.id << std::endl;
commitBlockId = BlockId(); // Disable check for commit block
continue;
}
} else if (block_num == commitBlockId.number + 1) {
std::cerr << "Continue with next commit block in synchronize mode: " << id << std::endl;
commitBlockId = BlockId(); // Disable check for commit block
} else {
std::cerr << "Invalid commit block_num in synchronize mode. Expect: " << commitBlockId.number << ", get: " << block_num << std::endl;
break;
}
}

if (lastBlockId.number) {
if (block_num > lastBlockId.number + 1) {
std::cerr << "Too large block_num in message for synchronize mode. Last: " << lastBlockId.number << ", current: " << block_num << std::endl;
break;
} else lastBlockId = BlockId();
}

} catch (const pt::ptree_error &err) {
std::cerr << "Invalid json in synchronize mode: " << err.what() << std::endl;
std::cerr << msg.data << std::endl;
break;
}
}

std::pair<message_map::iterator, bool> result;
{
std::lock_guard<std::mutex> guard(msgs_mutex);
Expand Down

0 comments on commit 249ce6e

Please sign in to comment.