From cbd55235e9b8f5bd1ceb40092cbf53b6e4d15b0d Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Wed, 6 Apr 2022 15:41:57 +0300 Subject: [PATCH] bug/SKALE-5039 --- chains/Schain.cpp | 31 ++++--- chains/Schain.h | 13 ++- network/Network.cpp | 10 +- oracle/OracleMessageThreadPool.cpp | 43 +++++++++ oracle/OracleMessageThreadPool.h | 38 ++++++++ oracle/OracleResultAssemblyAgent.cpp | 101 +++++++++++++++++++++ oracle/OracleResultAssemblyAgent.h | 54 +++++++++++ pendingqueue/TestMessageGeneratorAgent.cpp | 7 +- 8 files changed, 272 insertions(+), 25 deletions(-) create mode 100644 oracle/OracleMessageThreadPool.cpp create mode 100644 oracle/OracleMessageThreadPool.h create mode 100644 oracle/OracleResultAssemblyAgent.cpp create mode 100644 oracle/OracleResultAssemblyAgent.h diff --git a/chains/Schain.cpp b/chains/Schain.cpp index f1439ca76..0f587f170 100644 --- a/chains/Schain.cpp +++ b/chains/Schain.cpp @@ -85,7 +85,9 @@ #include "node/NodeInfo.h" #include "oracle/OracleServerAgent.h" #include "oracle/OracleClient.h" +#include "oracle/OracleMessageThreadPool.h" #include "oracle/OracleThreadPool.h" +#include "oracle/OracleResultAssemblyAgent.h" #include "pricing/PricingAgent.h" #include "protocols/ProtocolInstance.h" #include "protocols/blockconsensus/BlockConsensusAgent.h" @@ -190,12 +192,8 @@ void Schain::messageThreadProcessingLoop(Schain *_sChain) { CHECK_STATE((uint64_t) m->getMessage()->getBlockId() != 0); try { - if (m->getMessage()->getMsgType() == MSG_ORACLE_REQ_BROADCAST || - m->getMessage()->getMsgType() == MSG_ORACLE_RSP) { - _sChain->getOracleInstance()->routeAndProcessMessage(m); - } else { - _sChain->getBlockConsensusInstance()->routeAndProcessMessage(m); - } + _sChain->getBlockConsensusInstance()->routeAndProcessMessage(m); + } catch (exception &e) { LOG(err, "Exception in Schain::messageThreadProcessingLoop"); SkaleException::logNested(e); @@ -220,7 +218,7 @@ void Schain::startThreads() { if (getNode()->isSyncOnlyNode()) { return; } - CHECK_STATE(consensusMessageThreadPool); + CHECK_STATE(consensusMessageThreadPool) this->consensusMessageThreadPool->startService(); } @@ -233,8 +231,10 @@ Schain::Schain(weak_ptr _node, schain_index _schainIndex, const schain_id schainID(_schainID), startTimeMs(0), consensusMessageThreadPool(new SchainMessageThreadPool(this)), + node(_node), schainIndex(_schainIndex) { + lastCommittedBlockTimeStamp = TimeStamp(0, 0); // construct monitoring, timeout and stuck detection agents early @@ -297,6 +297,7 @@ void Schain::constructChildAgents() { MONITOR(__CLASS_NAME__, __FUNCTION__) try { + oracleResultAssemblyAgent = make_shared(*this); pricingAgent = make_shared(*this); catchupClientAgent = make_shared(*this); @@ -567,10 +568,10 @@ void Schain::printBlockLog(const ptr &_block) { if (!getNode()->isSyncOnlyNode()) { output = output + - ":KNWN:" + to_string(pendingTransactionsAgent->getKnownTransactionsSize()) + - ":CONS:" + to_string(ServerConnection::getTotalObjects()) + ":DSDS:" + - to_string(getSchain()->getNode()->getNetwork()->computeTotalDelayedSends()) + - ":SET:" + to_string(CryptoManager::getEcdsaStats()) + + ":KNWN:" + to_string(pendingTransactionsAgent->getKnownTransactionsSize()) + + ":CONS:" + to_string(ServerConnection::getTotalObjects()) + ":DSDS:" + + to_string(getSchain()->getNode()->getNetwork()->computeTotalDelayedSends()) + + ":SET:" + to_string(CryptoManager::getEcdsaStats()) + ":SBT:" + to_string(CryptoManager::getBLSStats()) + ":SEC:" + to_string(CryptoManager::getECDSATotals()) + ":SBC:" + to_string(CryptoManager::getBLSTotals()) + @@ -582,7 +583,7 @@ void Schain::printBlockLog(const ptr &_block) { LOG(info, output); //get malloc stats - static atomic mallocCounter = 0; + static atomic mallocCounter = 0; if (mallocCounter % 1000 == 0) { char *bp = nullptr; size_t size = 0; @@ -954,7 +955,7 @@ void Schain::healthCheck() { BOOST_THROW_EXCEPTION(ExitRequestedException( __CLASS_NAME__ )); } - auto port = (getNode()->isSyncOnlyNode()? port_type::CATCHUP : port_type::PROPOSAL); + auto port = (getNode()->isSyncOnlyNode() ? port_type::CATCHUP : port_type::PROPOSAL); auto socket = make_shared( *this, schain_index(i), port); @@ -1224,5 +1225,9 @@ u256 Schain::getRandomForBlockId(block_id _blockId) { ptr Schain::visualizationDataStream = nullptr; +const ptr &Schain::getOracleResultAssemblyAgent() const { + return oracleResultAssemblyAgent; +} + mutex Schain::vdsMutex; diff --git a/chains/Schain.h b/chains/Schain.h index 6f356ac6e..fee30621e 100644 --- a/chains/Schain.h +++ b/chains/Schain.h @@ -49,8 +49,8 @@ class BlockFinalizeDownloader; class BlockFinalizeDownloaderThreadPool; - class SchainMessageThreadPool; +class OracleMessageThreadPool; class TestMessageGeneratorAgent; class ConsensusExtFace; @@ -87,6 +87,7 @@ class TimeStamp; class CryptoManager; class StatusServer; class OracleClient; +class OracleResultAssemblyAgent; class Schain : public Agent { @@ -132,6 +133,8 @@ class Schain : public Agent { ptr< SchainMessageThreadPool > consensusMessageThreadPool; + ptr oracleResultAssemblyAgent; + ptr< IO > io; @@ -168,8 +171,6 @@ class Schain : public Agent { ptr< NodeInfo > thisNodeInfo = nullptr; - void checkForExit(); - void proposeNextBlock(); void processCommittedBlock( const ptr< CommittedBlock >& _block ); @@ -193,6 +194,10 @@ class Schain : public Agent { static void writeToVisualizationStream(string& _s); + + void checkForExit(); + + void addDeadNode(uint64_t _schainIndex, uint64_t timeMs); uint64_t getDeathTimeMs(uint64_t _schainIndex); @@ -268,6 +273,8 @@ class Schain : public Agent { void postMessage( const ptr< MessageEnvelope >& _me ); + const ptr &getOracleResultAssemblyAgent() const; + ptr< PendingTransactionsAgent > getPendingTransactionsAgent() const; ptr< MonitoringAgent > getMonitoringAgent() const; diff --git a/network/Network.cpp b/network/Network.cpp index 8b24f1483..adb2e09e3 100644 --- a/network/Network.cpp +++ b/network/Network.cpp @@ -39,6 +39,7 @@ #include "protocols/blockconsensus/BlockSignBroadcastMessage.h" #include "thirdparty/json.hpp" #include "thirdparty/lrucache.hpp" +#include "oracle/OracleResultAssemblyAgent.h" #include #include "unordered_set" @@ -204,7 +205,8 @@ void Network::broadcastOracleRequestMessage(const ptrgetSchainIndex())) { sendMessage(it.second, _msg); } else { - getSchain()->postMessage(make_shared(_msg, dstIndex)); + getSchain()->getOracleResultAssemblyAgent()->postMessage( + make_shared(_msg, dstIndex)); } } } catch (...) { @@ -226,7 +228,7 @@ void Network::sendOracleResponseMessage(const ptr &_msg, CHECK_STATE(dstNodeInfo); sendMessage(dstNodeInfo, _msg); } else { - getSchain()->postMessage(make_shared(_msg, _dstIndex)); + getSchain()->getOracleResultAssemblyAgent()->postMessage(make_shared(_msg, _dstIndex)); } } catch (...) { throw_with_nested(InvalidStateException(__FUNCTION__, __CLASS_NAME__)); @@ -263,7 +265,7 @@ void Network::networkReadLoop() { } if (msg->getMsgType() == MSG_ORACLE_REQ_BROADCAST || msg->getMsgType() == MSG_ORACLE_RSP) { - sChain->postMessage(m); + sChain->getOracleResultAssemblyAgent()->postMessage(m); continue; } @@ -326,7 +328,7 @@ void Network::postDeferOrDrop(const ptr &_me) { CHECK_STATE(msg); if (msg->getMsgType() == MSG_ORACLE_REQ_BROADCAST || msg->getMsgType() == MSG_ORACLE_RSP) { - sChain->postMessage(_me); + sChain->getOracleResultAssemblyAgent()->postMessage(_me); } else if (sChain->getBlockConsensusInstance()->shouldPost(msg)) { sChain->postMessage(_me); } else { diff --git a/oracle/OracleMessageThreadPool.cpp b/oracle/OracleMessageThreadPool.cpp new file mode 100644 index 000000000..83ce1e875 --- /dev/null +++ b/oracle/OracleMessageThreadPool.cpp @@ -0,0 +1,43 @@ +/* + Copyright (C) 2018-2019 SKALE Labs + + This file is part of skale-consensus. + + skale-consensus is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + skale-consensus is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with skale-consensus. If not, see . + + @file SchainMessageThreadPool.cpp + @author Stan Kladko + @date 2018 +*/ + +#include "SkaleCommon.h" +#include "Log.h" + +#include "chains/Schain.h" +#include "blockproposal/pusher/BlockProposalClientAgent.h" +#include "db/BlockProposalDB.h" +#include "pendingqueue/PendingTransactionsAgent.h" + +#include "OracleResultAssemblyAgent.h" +#include "OracleMessageThreadPool.h" + +OracleMessageThreadPool::OracleMessageThreadPool(Agent *_agent) : WorkerThreadPool(NUM_SCHAIN_THREADS, _agent, false) { + +} + +void OracleMessageThreadPool::createThread(uint64_t /*_threadNumber*/) { + LOCK(threadPoolLock) + threadpool.push_back(make_shared(OracleResultAssemblyAgent::messageThreadProcessingLoop, + reinterpret_cast < OracleResultAssemblyAgent * > ( agent ))); +} diff --git a/oracle/OracleMessageThreadPool.h b/oracle/OracleMessageThreadPool.h new file mode 100644 index 000000000..2389fc20a --- /dev/null +++ b/oracle/OracleMessageThreadPool.h @@ -0,0 +1,38 @@ +/* + Copyright (C) 2018-2019 SKALE Labs + + This file is part of skale-consensus. + + skale-consensus is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + skale-consensus is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with skale-consensus. If not, see . + + @file SchainMessageThreadPool.h + @author Stan Kladko + @date 2018 +*/ + +#pragma once + +class Agent; +class Schain; +class WorkerThreadPool; + + +class OracleMessageThreadPool : public WorkerThreadPool { +public: + + OracleMessageThreadPool(Agent *_agent); + + virtual void createThread(uint64_t _numThreads); +}; + diff --git a/oracle/OracleResultAssemblyAgent.cpp b/oracle/OracleResultAssemblyAgent.cpp new file mode 100644 index 000000000..d8dc20609 --- /dev/null +++ b/oracle/OracleResultAssemblyAgent.cpp @@ -0,0 +1,101 @@ +// +// Created by skale on 06.04.22. +// + +#include "SkaleCommon.h" +#include "Log.h" +#include "chains/Schain.h" +#include "monitoring/LivelinessMonitor.h" +#include "OracleMessageThreadPool.h" +#include "OracleServerAgent.h" +#include "OracleResultAssemblyAgent.h" + +OracleResultAssemblyAgent::OracleResultAssemblyAgent(Schain &_sChain) : Agent(_sChain, true), + oracleMessageThreadPool(new OracleMessageThreadPool(this)){ + try { + logThreadLocal_ = _sChain.getNode()->getLog(); + oracleMessageThreadPool->startService(); + } catch (...) { + throw_with_nested(FatalError(__FUNCTION__, __CLASS_NAME__)); + } +} + + +void OracleResultAssemblyAgent::messageThreadProcessingLoop(OracleResultAssemblyAgent*_agent) { + CHECK_ARGUMENT(_agent); + + setThreadName("orclAssemblyLoop", _agent->getSchain()->getNode()->getConsensusEngine()); + + _agent->getSchain()->waitOnGlobalStartBarrier(); + + try { + + logThreadLocal_ = _agent->getSchain()->getNode()->getLog(); + + queue > newQueue; + + while (!_agent->getSchain()->getNode()->isExitRequested()) { + { + unique_lock mlock(_agent->messageMutex); + while (_agent->messageQueue.empty()) { + _agent->messageCond.wait(mlock); + if (_agent->getNode()->isExitRequested()) + return; + } + + newQueue = _agent->messageQueue; + + while (!_agent->messageQueue.empty()) { + if (_agent->getNode()->isExitRequested()) + return; + + _agent->messageQueue.pop(); + } + } + + while (!newQueue.empty()) { + if (_agent->getNode()->isExitRequested()) + return; + + ptr m = newQueue.front(); + CHECK_STATE((uint64_t) m->getMessage()->getBlockId() != 0); + + try { + if (m->getMessage()->getMsgType() == MSG_ORACLE_REQ_BROADCAST || + m->getMessage()->getMsgType() == MSG_ORACLE_RSP) { + _agent->getSchain()->getOracleInstance()->routeAndProcessMessage(m); + } else { + CHECK_STATE(false); + } + } catch (exception &e) { + LOG(err, "Exception in Schain::oracleAssemblylLoop"); + SkaleException::logNested(e); + if (_agent->getNode()->isExitRequested()) + return; + } // catch + + newQueue.pop(); + } + } + + + } catch (FatalError &e) { + SkaleException::logNested(e); + _agent->getSchain()->getNode()->exitOnFatalError(e.what()); + } +} + +void OracleResultAssemblyAgent::postMessage(const ptr &_me) { + CHECK_ARGUMENT(_me); + + MONITOR(__CLASS_NAME__, __FUNCTION__) + + getSchain()->checkForExit(); + + CHECK_STATE((uint64_t) _me->getMessage()->getBlockId() != 0); + { + lock_guard l(messageMutex); + messageQueue.push(_me); + messageCond.notify_all(); + } +} \ No newline at end of file diff --git a/oracle/OracleResultAssemblyAgent.h b/oracle/OracleResultAssemblyAgent.h new file mode 100644 index 000000000..30f1cebe5 --- /dev/null +++ b/oracle/OracleResultAssemblyAgent.h @@ -0,0 +1,54 @@ +/* + Copyright (C) 2018-2019 SKALE Labs + + This file is part of skale-consensus. + + skale-consensus is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + skale-consensus is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with skale-consensus. If not, see . + + @file MonitoringAgent.h + @author Stan Kladko + @date 2018 +*/ + +#pragma once + + +#include "utils/Time.h" +#include "protocols/blockconsensus/BlockConsensusAgent.h" +#include "messages/MessageEnvelope.h" +#include "messages/Message.h" + +class OracleAssemblyThreadPool; +class LivelinessMonitor; + +class OracleResultAssemblyAgent : public Agent { + + + queue > messageQueue; + + ptr oracleMessageThreadPool; + + +public: + + queue> &getMessageQueue(); + + OracleResultAssemblyAgent(Schain &_sChain); + + + static void messageThreadProcessingLoop(OracleResultAssemblyAgent *_agent); + + + void postMessage(const ptr &_me); +}; \ No newline at end of file diff --git a/pendingqueue/TestMessageGeneratorAgent.cpp b/pendingqueue/TestMessageGeneratorAgent.cpp index 9470838e6..b9b4e1740 100644 --- a/pendingqueue/TestMessageGeneratorAgent.cpp +++ b/pendingqueue/TestMessageGeneratorAgent.cpp @@ -78,16 +78,13 @@ ConsensusExtFace::transactions_vector TestMessageGeneratorAgent::pendingTransact static uint64_t iterations = 0; // send oracle test once from schain index 1 - if (iterations == 40) { + if (iterations == 10) { LOG(info, "Sending Oracle test"); getSchain()->getOracleClient()->sendTestRequestGet(); - sleep(1); - getSchain()->getOracleClient()->sendTestRequestGet(); LOG(info, "Sent Oracle test"); } - iterations++; - + iterations++; return result;