diff --git a/notifier.cpp b/notifier.cpp index 862d2e3..a1e37c9 100644 --- a/notifier.cpp +++ b/notifier.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -139,6 +140,109 @@ static natsStatus send_nats_message(stanConnection* sc, stanConnOptions* connOpt return s; } + +typedef std::function onMsgFunction; + +const uint64_t SEQ_LAST = ~(uint64_t)0; +const uint64_t SEQ_ALL = (uint64_t)0; + +natsStatus getMessagesFromNats(stanConnection *sc, const char *channel, uint64_t deliverSeq, onMsgFunction onMsg, std::optional timeout = {}) { + stanSubOptions *subOpts = NULL; + stanSubscription *sub = NULL; + natsStatus s; + struct Status { + onMsgFunction onMsg; + bool cont; + } status{onMsg, true}; + + s = stanSubOptions_Create(&subOpts); + + if (s == NATS_OK) { + if (deliverSeq == SEQ_LAST) { + s = stanSubOptions_StartWithLastReceived(subOpts); + } else if (deliverSeq == SEQ_ALL) { + s = stanSubOptions_DeliverAllAvailable(subOpts); + } else { + s = stanSubOptions_StartAtSequence(subOpts, deliverSeq); + } + } + + if (s == NATS_OK) + s = stanSubOptions_SetMaxInflight(subOpts, 4*1024); + + if (s == NATS_OK) + s = stanConnection_Subscribe(&sub, sc, channel, [](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg, void *closure) { + Status *st = static_cast(closure); + if (st->onMsg(sc, sub, channel, msg) == false) { + st->cont = false; + } + stanMsg_Destroy(msg); + }, &status, subOpts); + + stanSubOptions_Destroy(subOpts); + + int64_t start = nats_Now(); + + if (s == NATS_OK) { + while (!done && status.cont) { + if (timeout.has_value()) { + elapsed = nats_Now() - start; + if (elapsed > timeout) break; + } + nats_Sleep(15); + } + } + + if (sub != NULL) + s = stanSubscription_Close(sub); + + return s; +} + +struct BlockId { + unsigned number = 0; + std::string id = {}; +}; + +natsStatus getLastCommitBlock(stanConnection *sc, BlockId &commitBlockId, BlockId &lastBlockId) { + uint64_t lastSequence = 0; + natsStatus s; + + s = getMessagesFromNats(sc, "Blocks", SEQ_LAST, [&](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg)->bool { + lastSequence = stanMsg_GetSequence(msg); + return false; + }, 1000); + if (s != NATS_OK) return s; + + while(commitBlockId.number == 0 && lastSequence > 0) { + uint64_t startSequence = (lastSequence < 100) ? 0 : lastSequence-100; + s = getMessagesFromNats(sc, "Blocks", startSequence, [&](stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg)->bool { + namespace pt = boost::property_tree; + uint64_t sequence = stanMsg_GetSequence(msg); + std::string data(stanMsg_GetData(msg), stanMsg_GetDataLength(msg)); + std::stringstream ss(data); + pt::ptree root; + + try { + pt::read_json(ss, root); + std::string msg_type = root.get("msg_type"); + unsigned block_num = (msg_type == "AcceptTrx") ? 0 : root.get("block_num"); + std::string id = root.get("id"); + if ((msg_type == "CommitBlock" || msg_type == "AcceptBlock") && (lastBlockId.number < block_num)) lastBlockId = {block_num, id}; + if ((msg_type == "CommitBlock") && (commitBlockId.number < block_num)) commitBlockId = {block_num, id}; + } catch (const pt::ptree_error& err) {} + + return sequence < lastSequence; + }, 10*1000); + if (s != NATS_OK) return s; + + lastSequence = startSequence; + } + + return NATS_OK; +} + + int main(int argc, char** argv) { opts = parseArgs(argc, argv, ""); std::cerr << "Sending socket messages" << std::endl; @@ -177,6 +281,18 @@ int main(int argc, char** argv) { } std::remove(backup_file.c_str()); + BlockId commitBlockId, lastBlockId; + { + natsStatus s2; + std::cout << "Receve last & commit block from nats" << std::endl; + s2 = getLastCommitBlock(sc, commitBlockId, lastBlockId); + if (s2 != NATS_OK) { + return 2; + } + std::cout << "CommitBlock: " << commitBlockId.number << ", " << commitBlockId.id << std::endl; + std::cout << " LastBlock: " << lastBlockId.number << ", " << lastBlockId.id << std::endl; + } + try { socket_stream.connect(ep); if (socket_stream.native_non_blocking()) { @@ -204,23 +320,63 @@ int main(int argc, char** argv) { std::istream data_stream(&socket_buf); std::getline(data_stream, msg.data); -// try { -// // json validating -// std::stringstream local_stream; -// local_stream << msg.data; -// boost::property_tree::ptree pt; -// boost::property_tree::read_json(local_stream, pt); -// } catch (...) { -// std::cerr << "Data error: " << msg.data << std::endl; -// throw; -// } - if (print) { std::cout << msg.data << std::endl; } msg.index = msg_index++; msg.subject = get_subject(msg.data); + + if (lastBlockId.number || commitBlockId.number) { + if (msg.subject != "Blocks") { + std::cerr << "Invalid message subject '" << msg.subject << "' for synchronize mode" << std::endl; + break; + } + + namespace pt = boost::property_tree; + try { + std::stringstream ss(msg.data); + pt::ptree root; + pt::read_json(ss, root); + std::string msg_type = root.get("msg_type"); + std::string id = root.get("id"); + unsigned block_num = (msg_type == "AcceptTrx") ? commitBlockId.number : root.get("block_num"); + + if (block_num < commitBlockId.number) continue; + + if (commitBlockId.number && msg_type == "CommitBlock") { + if (block_num == commitBlockId.number) { + if (id != commitBlockId.id) { + std::cerr << "Invalid last commit block in synchronize mode. Expect: " << commitBlockId.id << ", get: " << id << std::endl; + break; + } else { + std::cerr << "Found last commit block in synchronize mode: " << commitBlockId.id << std::endl; + commitBlockId = BlockId(); // Disable check for commit block + continue; + } + } else if (block_num == commitBlockId.number + 1) { + std::cerr << "Continue with next commit block in synchronize mode: " << id << std::endl; + commitBlockId = BlockId(); // Disable check for commit block + } else { + std::cerr << "Invalid commit block_num in synchronize mode. Expect: " << commitBlockId.number << ", get: " << block_num << std::endl; + break; + } + } + + if (lastBlockId.number) { + if (block_num > lastBlockId.number + 1) { + std::cerr << "Too large block_num in message for synchronize mode. Last: " << lastBlockId.number << ", current: " << block_num << std::endl; + break; + } else lastBlockId = BlockId(); + } + + } catch (const pt::ptree_error &err) { + std::cerr << "Invalid json in synchronize mode: " << err.what() << std::endl; + std::cerr << msg.data << std::endl; + break; + } + } + std::pair result; { std::lock_guard guard(msgs_mutex);