Skip to content

Commit

Permalink
Merge branch 'develop' into bug/IS-1083-catchup-snapshot-priority-arc…
Browse files Browse the repository at this point in the history
…hive
  • Loading branch information
olehnikolaiev committed Oct 23, 2024
2 parents 06a445a + 73dc77e commit 3aa9283
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
29 changes: 21 additions & 8 deletions libskale/SnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ void SnapshotManager::restoreSnapshot( unsigned _blockNumber ) {

UnsafeRegion::lock ur_lock;

std::vector< std::string > volumes;
if ( chainParams.nodeInfo.archiveMode && _blockNumber == 0 )
volumes = coreVolumes;
else
std::vector< std::string > volumes = coreVolumes;
#ifdef HISTORIC_STATE
if ( _blockNumber > 0 )
volumes = allVolumes;
#endif

int dummy_counter = 0;
for ( const string& vol : volumes ) {
Expand All @@ -180,6 +180,19 @@ void SnapshotManager::restoreSnapshot( unsigned _blockNumber ) {
batched_io::test_crash_before_commit( "SnapshotManager::doSnapshot" );

} // for

if ( _blockNumber == 0 ) {
#ifdef HISTORIC_STATE
for ( const string& vol : allVolumes ) {
// continue if already present
if ( fs::exists( dataDir / vol ) && 0 == btrfs.present( ( dataDir / vol ).c_str() ) )
continue;

// create if not created yet ( only makes sense for historic nodes and 0 block number )
btrfs.subvolume.create( ( dataDir / vol ).c_str() );
} // for
#endif
}
}

// exceptions:
Expand Down Expand Up @@ -747,11 +760,11 @@ void SnapshotManager::computeSnapshotHash( unsigned _blockNumber, bool is_checki

int dummy_counter = 0;

std::vector< std::string > volumes;
if ( chainParams.nodeInfo.archiveMode && _blockNumber == 0 )
volumes = coreVolumes;
else
std::vector< std::string > volumes = coreVolumes;
#ifdef HISTORIC_STATE
if ( _blockNumber > 0 )
volumes = allVolumes;
#endif

for ( const auto& volume : volumes ) {
int res = btrfs.subvolume.property_set(
Expand Down
7 changes: 3 additions & 4 deletions libweb3jsonrpc/Skale.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ std::string exceptionToErrorMessage();
volatile bool Skale::g_bShutdownViaWeb3Enabled = false;
volatile bool Skale::g_bNodeInstanceShouldShutdown = false;
Skale::list_fn_on_shutdown_t Skale::g_list_fn_on_shutdown;
const uint64_t Skale::SNAPSHOT_DOWNLOAD_MONITOR_THREAD_SLEEP_MS = 10;

Skale::Skale( Client& _client, std::shared_ptr< SharedSpace > _sharedSpace )
: m_client( _client ), m_shared_space( _sharedSpace ) {}
Expand Down Expand Up @@ -211,7 +212,7 @@ nlohmann::json Skale::impl_skale_getSnapshot( const nlohmann::json& joRequest, C
m_client.chainParams().nodeInfo.archiveMode ) ) {
if ( threadExitRequested )
break;
sleep( 10 );
sleep( SNAPSHOT_DOWNLOAD_MONITOR_THREAD_SLEEP_MS );
}

clog( VerbosityInfo, "skale_downloadSnapshotFragmentMonitorThread" )
Expand Down Expand Up @@ -582,8 +583,7 @@ std::string Skale::oracle_checkResult( std::string& receipt ) {
namespace snapshot {

bool download( const std::string& strURLWeb3, unsigned& block_number, const fs::path& saveTo,
fn_progress_t onProgress, bool isBinaryDownload, std::string* pStrErrorDescription,
bool forArchiveNode ) {
fn_progress_t onProgress, bool isBinaryDownload, std::string* pStrErrorDescription ) {
if ( pStrErrorDescription )
pStrErrorDescription->clear();
std::ofstream f;
Expand Down Expand Up @@ -636,7 +636,6 @@ bool download( const std::string& strURLWeb3, unsigned& block_number, const fs::
joIn["method"] = "skale_getSnapshot";
nlohmann::json joParams = nlohmann::json::object();
joParams["blockNumber"] = block_number;
joParams["forArchiveNode"] = forArchiveNode;
joIn["params"] = joParams;
skutils::rest::data_t d = cli.call( joIn );
if ( !d.err_s_.empty() ) {
Expand Down
8 changes: 7 additions & 1 deletion libweb3jsonrpc/Skale.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class Skale : public dev::rpc::SkaleFace {
typedef std::function< void() > fn_on_shutdown_t;
static void onShutdownInvoke( fn_on_shutdown_t fn );

static uint64_t snapshotDownloadFragmentMonitorThreadTimeout() {
return SNAPSHOT_DOWNLOAD_MONITOR_THREAD_SLEEP_MS;
}

public:
nlohmann::json impl_skale_getSnapshot(
const nlohmann::json& joRequest, dev::eth::Client& client );
Expand All @@ -99,6 +103,8 @@ class Skale : public dev::rpc::SkaleFace {
typedef std::list< fn_on_shutdown_t > list_fn_on_shutdown_t;
static list_fn_on_shutdown_t g_list_fn_on_shutdown;

static const uint64_t SNAPSHOT_DOWNLOAD_MONITOR_THREAD_SLEEP_MS;

dev::eth::Client& m_client;
std::shared_ptr< SharedSpace > m_shared_space;
int currentSnapshotBlockNumber = -1;
Expand All @@ -118,7 +124,7 @@ typedef std::function< bool( size_t idxChunck, size_t cntChunks ) > fn_progress_

extern bool download( const std::string& strURLWeb3, unsigned& block_number, const fs::path& saveTo,
fn_progress_t onProgress, bool isBinaryDownload = true,
std::string* pStrErrorDescription = nullptr, bool forArchiveNode = false );
std::string* pStrErrorDescription = nullptr );

}; // namespace snapshot

Expand Down
27 changes: 12 additions & 15 deletions skaled/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ void downloadSnapshot( unsigned block_number, std::shared_ptr< SnapshotManager >
const std::string& strURLWeb3, const ChainParams& chainParams ) {
fs::path saveTo;
try {
clog( VerbosityInfo, "downloadSnapshot" )
<< cc::normal( "Will download snapshot from " ) << cc::u( strURLWeb3 ) << std::endl;
clog( VerbosityInfo, "downloadSnapshot" ) << "Will download snapshot from " << strURLWeb3;

try {
bool isBinaryDownload = true;
Expand All @@ -262,11 +261,10 @@ void downloadSnapshot( unsigned block_number, std::shared_ptr< SnapshotManager >
strURLWeb3, block_number, saveTo,
[&]( size_t idxChunck, size_t cntChunks ) -> bool {
clog( VerbosityInfo, "downloadSnapshot" )
<< cc::normal( "... download progress ... " ) << cc::size10( idxChunck )
<< cc::normal( " of " ) << cc::size10( cntChunks ) << "\r";
<< "... download progress ... " << idxChunck << " of " << cntChunks << "\r";
return true; // continue download
},
isBinaryDownload, &strErrorDescription, chainParams.nodeInfo.archiveMode );
isBinaryDownload, &strErrorDescription );
std::cout << " \r"; // clear
// progress
// line
Expand All @@ -281,14 +279,12 @@ void downloadSnapshot( unsigned block_number, std::shared_ptr< SnapshotManager >
std::throw_with_nested( std::runtime_error( "Exception while downloading snapshot" ) );
}
clog( VerbosityInfo, "downloadSnapshot" )
<< cc::success( "Snapshot download success for block " )
<< cc::u( to_string( block_number ) ) << std::endl;
<< "Snapshot download success for block " << to_string( block_number );
try {
snapshotManager->importDiff( block_number );
} catch ( ... ) {
std::throw_with_nested( std::runtime_error(
cc::fatal( "FATAL:" ) + " " +
cc::error( "Exception while importing downloaded snapshot: " ) ) );
std::throw_with_nested(
std::runtime_error( "FATAL: Exception while importing downloaded snapshot: " ) );
}

/// HACK refactor this piece of code! ///
Expand All @@ -304,7 +300,7 @@ void downloadSnapshot( unsigned block_number, std::shared_ptr< SnapshotManager >
}
if ( db_path.empty() ) {
clog( VerbosityError, "downloadSnapshot" )
<< cc::fatal( "Snapshot downloaded without " + prefix + " db" ) << std::endl;
<< "Snapshot downloaded without " + prefix + " db";
return;
}

Expand All @@ -315,8 +311,7 @@ void downloadSnapshot( unsigned block_number, std::shared_ptr< SnapshotManager >

} catch ( ... ) {
std::throw_with_nested(
std::runtime_error( cc::fatal( "FATAL:" ) + " " +
cc::error( "Exception while processing downloaded snapshot: " ) ) );
std::runtime_error( "FATAL: Exception while processing downloaded snapshot: " ) );
}
if ( !saveTo.empty() )
fs::remove( saveTo );
Expand Down Expand Up @@ -1630,9 +1625,11 @@ int main( int argc, char** argv ) try {
// sleep before send skale_getSnapshot again - will receive error
clog( VerbosityInfo, "main" )
<< std::string( "Will sleep for " )
<< chainParams.sChain.snapshotDownloadInactiveTimeout
<< chainParams.sChain.snapshotDownloadInactiveTimeout +
dev::rpc::Skale::snapshotDownloadFragmentMonitorThreadTimeout()
<< std::string( " seconds before downloading 0 snapshot" );
sleep( chainParams.sChain.snapshotDownloadInactiveTimeout );
sleep( chainParams.sChain.snapshotDownloadInactiveTimeout +
dev::rpc::Skale::snapshotDownloadFragmentMonitorThreadTimeout() );

downloadAndProccessSnapshot(
snapshotManager, chainParams, urlToDownloadSnapshotFrom, false );
Expand Down

0 comments on commit 3aa9283

Please sign in to comment.