Skip to content

Commit

Permalink
Merge pull request #607 from skalenetwork/bug/SKALE-5039-oracle-reque…
Browse files Browse the repository at this point in the history
…st-stuck

bug/SKALE-5039
  • Loading branch information
kladkogex authored Apr 6, 2022
2 parents d72f15c + cbd5523 commit 9e3521e
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 25 deletions.
31 changes: 18 additions & 13 deletions chains/Schain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
#include "node/NodeInfo.h"
#include "oracle/OracleServerAgent.h"
#include "oracle/OracleClient.h"
#include "oracle/OracleMessageThreadPool.h"
#include "oracle/OracleThreadPool.h"
#include "oracle/OracleResultAssemblyAgent.h"
#include "pricing/PricingAgent.h"
#include "protocols/ProtocolInstance.h"
#include "protocols/blockconsensus/BlockConsensusAgent.h"
Expand Down Expand Up @@ -190,12 +192,8 @@ void Schain::messageThreadProcessingLoop(Schain *_sChain) {
CHECK_STATE((uint64_t) m->getMessage()->getBlockId() != 0);

try {
if (m->getMessage()->getMsgType() == MSG_ORACLE_REQ_BROADCAST ||
m->getMessage()->getMsgType() == MSG_ORACLE_RSP) {
_sChain->getOracleInstance()->routeAndProcessMessage(m);
} else {
_sChain->getBlockConsensusInstance()->routeAndProcessMessage(m);
}
_sChain->getBlockConsensusInstance()->routeAndProcessMessage(m);

} catch (exception &e) {
LOG(err, "Exception in Schain::messageThreadProcessingLoop");
SkaleException::logNested(e);
Expand All @@ -220,7 +218,7 @@ void Schain::startThreads() {
if (getNode()->isSyncOnlyNode()) {
return;
}
CHECK_STATE(consensusMessageThreadPool);
CHECK_STATE(consensusMessageThreadPool)
this->consensusMessageThreadPool->startService();
}

Expand All @@ -233,8 +231,10 @@ Schain::Schain(weak_ptr<Node> _node, schain_index _schainIndex, const schain_id
schainID(_schainID),
startTimeMs(0),
consensusMessageThreadPool(new SchainMessageThreadPool(this)),

node(_node),
schainIndex(_schainIndex) {

lastCommittedBlockTimeStamp = TimeStamp(0, 0);

// construct monitoring, timeout and stuck detection agents early
Expand Down Expand Up @@ -297,6 +297,7 @@ void Schain::constructChildAgents() {
MONITOR(__CLASS_NAME__, __FUNCTION__)

try {
oracleResultAssemblyAgent = make_shared<OracleResultAssemblyAgent>(*this);
pricingAgent = make_shared<PricingAgent>(*this);
catchupClientAgent = make_shared<CatchupClientAgent>(*this);

Expand Down Expand Up @@ -567,10 +568,10 @@ void Schain::printBlockLog(const ptr<CommittedBlock> &_block) {

if (!getNode()->isSyncOnlyNode()) {
output = output +
":KNWN:" + to_string(pendingTransactionsAgent->getKnownTransactionsSize()) +
":CONS:" + to_string(ServerConnection::getTotalObjects()) + ":DSDS:" +
to_string(getSchain()->getNode()->getNetwork()->computeTotalDelayedSends()) +
":SET:" + to_string(CryptoManager::getEcdsaStats()) +
":KNWN:" + to_string(pendingTransactionsAgent->getKnownTransactionsSize()) +
":CONS:" + to_string(ServerConnection::getTotalObjects()) + ":DSDS:" +
to_string(getSchain()->getNode()->getNetwork()->computeTotalDelayedSends()) +
":SET:" + to_string(CryptoManager::getEcdsaStats()) +
":SBT:" + to_string(CryptoManager::getBLSStats()) +
":SEC:" + to_string(CryptoManager::getECDSATotals()) +
":SBC:" + to_string(CryptoManager::getBLSTotals()) +
Expand All @@ -582,7 +583,7 @@ void Schain::printBlockLog(const ptr<CommittedBlock> &_block) {
LOG(info, output);

//get malloc stats
static atomic <uint64_t> mallocCounter = 0;
static atomic<uint64_t> mallocCounter = 0;
if (mallocCounter % 1000 == 0) {
char *bp = nullptr;
size_t size = 0;
Expand Down Expand Up @@ -954,7 +955,7 @@ void Schain::healthCheck() {
BOOST_THROW_EXCEPTION(ExitRequestedException( __CLASS_NAME__ ));
}

auto port = (getNode()->isSyncOnlyNode()? port_type::CATCHUP : port_type::PROPOSAL);
auto port = (getNode()->isSyncOnlyNode() ? port_type::CATCHUP : port_type::PROPOSAL);

auto socket = make_shared<ClientSocket>(
*this, schain_index(i), port);
Expand Down Expand Up @@ -1224,5 +1225,9 @@ u256 Schain::getRandomForBlockId(block_id _blockId) {

ptr<ofstream> Schain::visualizationDataStream = nullptr;

const ptr<OracleResultAssemblyAgent> &Schain::getOracleResultAssemblyAgent() const {
return oracleResultAssemblyAgent;
}


mutex Schain::vdsMutex;
13 changes: 10 additions & 3 deletions chains/Schain.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class BlockFinalizeDownloader;
class BlockFinalizeDownloaderThreadPool;



class SchainMessageThreadPool;
class OracleMessageThreadPool;

class TestMessageGeneratorAgent;
class ConsensusExtFace;
Expand Down Expand Up @@ -87,6 +87,7 @@ class TimeStamp;
class CryptoManager;
class StatusServer;
class OracleClient;
class OracleResultAssemblyAgent;

class Schain : public Agent {

Expand Down Expand Up @@ -132,6 +133,8 @@ class Schain : public Agent {
ptr< SchainMessageThreadPool > consensusMessageThreadPool;


ptr<OracleResultAssemblyAgent> oracleResultAssemblyAgent;


ptr< IO > io;

Expand Down Expand Up @@ -168,8 +171,6 @@ class Schain : public Agent {

ptr< NodeInfo > thisNodeInfo = nullptr;

void checkForExit();

void proposeNextBlock();

void processCommittedBlock( const ptr< CommittedBlock >& _block );
Expand All @@ -193,6 +194,10 @@ class Schain : public Agent {

static void writeToVisualizationStream(string& _s);


void checkForExit();


void addDeadNode(uint64_t _schainIndex, uint64_t timeMs);

uint64_t getDeathTimeMs(uint64_t _schainIndex);
Expand Down Expand Up @@ -268,6 +273,8 @@ class Schain : public Agent {

void postMessage( const ptr< MessageEnvelope >& _me );

const ptr<OracleResultAssemblyAgent> &getOracleResultAssemblyAgent() const;

ptr< PendingTransactionsAgent > getPendingTransactionsAgent() const;

ptr< MonitoringAgent > getMonitoringAgent() const;
Expand Down
10 changes: 6 additions & 4 deletions network/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "protocols/blockconsensus/BlockSignBroadcastMessage.h"
#include "thirdparty/json.hpp"
#include "thirdparty/lrucache.hpp"
#include "oracle/OracleResultAssemblyAgent.h"
#include <db/MsgDB.h>

#include "unordered_set"
Expand Down Expand Up @@ -204,7 +205,8 @@ void Network::broadcastOracleRequestMessage(const ptr<OracleRequestBroadcastMess
if (dstIndex != (getSchain()->getSchainIndex())) {
sendMessage(it.second, _msg);
} else {
getSchain()->postMessage(make_shared<NetworkMessageEnvelope>(_msg, dstIndex));
getSchain()->getOracleResultAssemblyAgent()->postMessage(
make_shared<NetworkMessageEnvelope>(_msg, dstIndex));
}
}
} catch (...) {
Expand All @@ -226,7 +228,7 @@ void Network::sendOracleResponseMessage(const ptr<OracleResponseMessage> &_msg,
CHECK_STATE(dstNodeInfo);
sendMessage(dstNodeInfo, _msg);
} else {
getSchain()->postMessage(make_shared<NetworkMessageEnvelope>(_msg, _dstIndex));
getSchain()->getOracleResultAssemblyAgent()->postMessage(make_shared<NetworkMessageEnvelope>(_msg, _dstIndex));
}
} catch (...) {
throw_with_nested(InvalidStateException(__FUNCTION__, __CLASS_NAME__));
Expand Down Expand Up @@ -263,7 +265,7 @@ void Network::networkReadLoop() {
}

if (msg->getMsgType() == MSG_ORACLE_REQ_BROADCAST || msg->getMsgType() == MSG_ORACLE_RSP) {
sChain->postMessage(m);
sChain->getOracleResultAssemblyAgent()->postMessage(m);
continue;
}

Expand Down Expand Up @@ -326,7 +328,7 @@ void Network::postDeferOrDrop(const ptr<NetworkMessageEnvelope> &_me) {
CHECK_STATE(msg);

if (msg->getMsgType() == MSG_ORACLE_REQ_BROADCAST || msg->getMsgType() == MSG_ORACLE_RSP) {
sChain->postMessage(_me);
sChain->getOracleResultAssemblyAgent()->postMessage(_me);
} else if (sChain->getBlockConsensusInstance()->shouldPost(msg)) {
sChain->postMessage(_me);
} else {
Expand Down
43 changes: 43 additions & 0 deletions oracle/OracleMessageThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright (C) 2018-2019 SKALE Labs
This file is part of skale-consensus.
skale-consensus is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
skale-consensus is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with skale-consensus. If not, see <https://www.gnu.org/licenses/>.
@file SchainMessageThreadPool.cpp
@author Stan Kladko
@date 2018
*/

#include "SkaleCommon.h"
#include "Log.h"

#include "chains/Schain.h"
#include "blockproposal/pusher/BlockProposalClientAgent.h"
#include "db/BlockProposalDB.h"
#include "pendingqueue/PendingTransactionsAgent.h"

#include "OracleResultAssemblyAgent.h"
#include "OracleMessageThreadPool.h"

OracleMessageThreadPool::OracleMessageThreadPool(Agent *_agent) : WorkerThreadPool(NUM_SCHAIN_THREADS, _agent, false) {

}

void OracleMessageThreadPool::createThread(uint64_t /*_threadNumber*/) {
LOCK(threadPoolLock)
threadpool.push_back(make_shared<thread>(OracleResultAssemblyAgent::messageThreadProcessingLoop,
reinterpret_cast < OracleResultAssemblyAgent * > ( agent )));
}
38 changes: 38 additions & 0 deletions oracle/OracleMessageThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright (C) 2018-2019 SKALE Labs
This file is part of skale-consensus.
skale-consensus is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
skale-consensus is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with skale-consensus. If not, see <https://www.gnu.org/licenses/>.
@file SchainMessageThreadPool.h
@author Stan Kladko
@date 2018
*/

#pragma once

class Agent;
class Schain;
class WorkerThreadPool;


class OracleMessageThreadPool : public WorkerThreadPool {
public:

OracleMessageThreadPool(Agent *_agent);

virtual void createThread(uint64_t _numThreads);
};

101 changes: 101 additions & 0 deletions oracle/OracleResultAssemblyAgent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//
// Created by skale on 06.04.22.
//

#include "SkaleCommon.h"
#include "Log.h"
#include "chains/Schain.h"
#include "monitoring/LivelinessMonitor.h"
#include "OracleMessageThreadPool.h"
#include "OracleServerAgent.h"
#include "OracleResultAssemblyAgent.h"

OracleResultAssemblyAgent::OracleResultAssemblyAgent(Schain &_sChain) : Agent(_sChain, true),
oracleMessageThreadPool(new OracleMessageThreadPool(this)){
try {
logThreadLocal_ = _sChain.getNode()->getLog();
oracleMessageThreadPool->startService();
} catch (...) {
throw_with_nested(FatalError(__FUNCTION__, __CLASS_NAME__));
}
}


void OracleResultAssemblyAgent::messageThreadProcessingLoop(OracleResultAssemblyAgent*_agent) {
CHECK_ARGUMENT(_agent);

setThreadName("orclAssemblyLoop", _agent->getSchain()->getNode()->getConsensusEngine());

_agent->getSchain()->waitOnGlobalStartBarrier();

try {

logThreadLocal_ = _agent->getSchain()->getNode()->getLog();

queue<ptr<MessageEnvelope> > newQueue;

while (!_agent->getSchain()->getNode()->isExitRequested()) {
{
unique_lock<mutex> mlock(_agent->messageMutex);
while (_agent->messageQueue.empty()) {
_agent->messageCond.wait(mlock);
if (_agent->getNode()->isExitRequested())
return;
}

newQueue = _agent->messageQueue;

while (!_agent->messageQueue.empty()) {
if (_agent->getNode()->isExitRequested())
return;

_agent->messageQueue.pop();
}
}

while (!newQueue.empty()) {
if (_agent->getNode()->isExitRequested())
return;

ptr<MessageEnvelope> m = newQueue.front();
CHECK_STATE((uint64_t) m->getMessage()->getBlockId() != 0);

try {
if (m->getMessage()->getMsgType() == MSG_ORACLE_REQ_BROADCAST ||
m->getMessage()->getMsgType() == MSG_ORACLE_RSP) {
_agent->getSchain()->getOracleInstance()->routeAndProcessMessage(m);
} else {
CHECK_STATE(false);
}
} catch (exception &e) {
LOG(err, "Exception in Schain::oracleAssemblylLoop");
SkaleException::logNested(e);
if (_agent->getNode()->isExitRequested())
return;
} // catch

newQueue.pop();
}
}


} catch (FatalError &e) {
SkaleException::logNested(e);
_agent->getSchain()->getNode()->exitOnFatalError(e.what());
}
}

void OracleResultAssemblyAgent::postMessage(const ptr<MessageEnvelope> &_me) {
CHECK_ARGUMENT(_me);

MONITOR(__CLASS_NAME__, __FUNCTION__)

getSchain()->checkForExit();

CHECK_STATE((uint64_t) _me->getMessage()->getBlockId() != 0);
{
lock_guard<mutex> l(messageMutex);
messageQueue.push(_me);
messageCond.notify_all();
}
}
Loading

0 comments on commit 9e3521e

Please sign in to comment.