Skip to content

Commit

Permalink
Adapt the rest of the codebase to the new MISC database, which can be…
Browse files Browse the repository at this point in the history
… written to independently of ledger state database
  • Loading branch information
marta-lokhova committed Nov 4, 2024
1 parent 657d48a commit fc5f387
Show file tree
Hide file tree
Showing 23 changed files with 287 additions and 202 deletions.
15 changes: 7 additions & 8 deletions src/database/test/DatabaseTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ transactionTest(Application::pointer app)
int a0 = a + 1;
int a1 = a + 2;

auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

session << "DROP TABLE IF EXISTS test";
session << "CREATE TABLE test (x INTEGER)";
Expand Down Expand Up @@ -104,7 +104,7 @@ checkMVCCIsolation(Application::pointer app)

int s2r1 = 0, s2r2 = 0, s2r3 = 0, s2r4 = 0;

auto& sess1 = app->getDatabase().getSession();
auto& sess1 = app->getDatabase().getRawSession();

sess1 << "DROP TABLE IF EXISTS test";
sess1 << "CREATE TABLE test (x INTEGER)";
Expand Down Expand Up @@ -217,7 +217,7 @@ TEST_CASE("postgres smoketest", "[db]")
Application::pointer app = createTestApplication(clock, cfg);
int a = 10, b = 0;

auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

SECTION("round trip")
{
Expand Down Expand Up @@ -249,7 +249,7 @@ TEST_CASE("postgres smoketest", "[db]")

SECTION("postgres MVCC test")
{
app->getDatabase().getSession() << "drop table if exists test";
app->getDatabase().getRawSession() << "drop table if exists test";
checkMVCCIsolation(app);
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ TEST_CASE("postgres performance", "[db][pgperf][!hide]")
try
{
Application::pointer app = createTestApplication(clock, cfg);
auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

session << "drop table if exists txtest;";
session << "create table txtest (a bigint, b bigint, c bigint, primary "
Expand Down Expand Up @@ -355,7 +355,6 @@ TEST_CASE("schema test", "[db]")
Application::pointer app = createTestApplication(clock, cfg);

auto& db = app->getDatabase();
auto dbv = db.getDBSchemaVersion();
auto av = db.getAppSchemaVersion();
REQUIRE(dbv == av);
auto dbv = db.getMainDBSchemaVersion();
REQUIRE(dbv == MAIN_SCHEMA_VERSION);
}
8 changes: 5 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2057,16 +2057,18 @@ void
HerderImpl::persistUpgrades()
{
ZoneScoped;
releaseAssert(threadIsMain());
auto s = mUpgrades.getParameters().toJson();
mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s);
mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s,
mApp.getDatabase().getMiscSession());
}

void
HerderImpl::restoreUpgrades()
{
ZoneScoped;
std::string s =
mApp.getPersistentState().getState(PersistentState::kLedgerUpgrades);
std::string s = mApp.getPersistentState().getState(
PersistentState::kLedgerUpgrades, mApp.getDatabase().getMiscSession());
if (!s.empty())
{
Upgrades::UpgradeParameters p;
Expand Down
1 change: 0 additions & 1 deletion src/herder/HerderPersistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,5 @@ class HerderPersistence
static void dropAll(Database& db);
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count);
static void createQuorumTrackingTable(soci::session& sess);
};
}
76 changes: 39 additions & 37 deletions src/herder/HerderPersistenceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,

auto usedQSets = UnorderedMap<Hash, SCPQuorumSetPtr>{};
auto& db = mApp.getDatabase();
auto& sess = db.getMiscSession();

soci::transaction txscope(db.getSession());
soci::transaction txscope(sess.session());

{
auto prepClean = db.getPreparedStatement(
"DELETE FROM scphistory WHERE ledgerseq =:l");
"DELETE FROM scphistory WHERE ledgerseq =:l", sess);

auto& st = prepClean.statement();
st.exchange(soci::use(seq));
Expand Down Expand Up @@ -92,7 +93,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
auto prepEnv =
db.getPreparedStatement("INSERT INTO scphistory "
"(nodeid, ledgerseq, envelope) VALUES "
"(:n, :l, :e)");
"(:n, :l, :e)",
sess);
auto& st = prepEnv.statement();
st.exchange(soci::use(nodeIDs, "n"));
st.exchange(soci::use(seqs, "l"));
Expand Down Expand Up @@ -124,7 +126,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
std::string qSetHHex(binToHex(qSetH));

auto prep = db.getPreparedStatement(
"UPDATE quoruminfo SET qsethash = :h WHERE nodeid = :id");
"UPDATE quoruminfo SET qsethash = :h WHERE nodeid = :id", sess);
auto& st = prep.statement();
st.exchange(soci::use(qSetHHex));
st.exchange(soci::use(nodeIDStrKey));
Expand All @@ -136,7 +138,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
if (st.get_affected_rows() != 1)
{
auto prepI = db.getPreparedStatement(
"INSERT INTO quoruminfo (nodeid, qsethash) VALUES (:id, :h)");
"INSERT INTO quoruminfo (nodeid, qsethash) VALUES (:id, :h)",
sess);
auto& stI = prepI.statement();
stI.exchange(soci::use(nodeIDStrKey));
stI.exchange(soci::use(qSetHHex));
Expand All @@ -158,7 +161,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,

uint32_t lastSeenSeq;
auto prepSelQSet = db.getPreparedStatement(
"SELECT lastledgerseq FROM scpquorums WHERE qsethash = :h");
"SELECT lastledgerseq FROM scpquorums WHERE qsethash = :h", sess);
auto& stSel = prepSelQSet.statement();
stSel.exchange(soci::into(lastSeenSeq));
stSel.exchange(soci::use(qSetH));
Expand All @@ -177,7 +180,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,

auto prepUpQSet = db.getPreparedStatement(
"UPDATE scpquorums SET "
"lastledgerseq = :l WHERE qsethash = :h");
"lastledgerseq = :l WHERE qsethash = :h",
sess);

auto& stUp = prepUpQSet.statement();
stUp.exchange(soci::use(seq));
Expand All @@ -202,7 +206,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
auto prepInsQSet = db.getPreparedStatement(
"INSERT INTO scpquorums "
"(qsethash, lastledgerseq, qset) VALUES "
"(:h, :l, :v);");
"(:h, :l, :v);",
sess);

auto& stIns = prepInsQSet.statement();
stIns.exchange(soci::use(qSetH));
Expand Down Expand Up @@ -372,48 +377,45 @@ void
HerderPersistence::dropAll(Database& db)
{
ZoneScoped;
db.getSession() << "DROP TABLE IF EXISTS scphistory";
db.getRawMiscSession() << "DROP TABLE IF EXISTS scphistory";

db.getSession() << "DROP TABLE IF EXISTS scpquorums";
db.getRawMiscSession() << "DROP TABLE IF EXISTS scpquorums";

db.getSession() << "CREATE TABLE scphistory ("
"nodeid CHARACTER(56) NOT NULL,"
"ledgerseq INT NOT NULL CHECK (ledgerseq >= 0),"
"envelope TEXT NOT NULL"
")";
db.getRawMiscSession() << "CREATE TABLE scphistory ("
"nodeid CHARACTER(56) NOT NULL,"
"ledgerseq INT NOT NULL CHECK (ledgerseq >= 0),"
"envelope TEXT NOT NULL"
")";

db.getSession() << "CREATE INDEX scpenvsbyseq ON scphistory(ledgerseq)";
db.getRawMiscSession()
<< "CREATE INDEX scpenvsbyseq ON scphistory(ledgerseq)";

db.getSession() << "CREATE TABLE scpquorums ("
"qsethash CHARACTER(64) NOT NULL,"
"lastledgerseq INT NOT NULL CHECK (lastledgerseq >= 0),"
"qset TEXT NOT NULL,"
"PRIMARY KEY (qsethash)"
")";
db.getRawMiscSession()
<< "CREATE TABLE scpquorums ("
"qsethash CHARACTER(64) NOT NULL,"
"lastledgerseq INT NOT NULL CHECK (lastledgerseq >= 0),"
"qset TEXT NOT NULL,"
"PRIMARY KEY (qsethash)"
")";

db.getSession()
db.getRawMiscSession()
<< "CREATE INDEX scpquorumsbyseq ON scpquorums(lastledgerseq)";

db.getSession() << "DROP TABLE IF EXISTS quoruminfo";
}

void
HerderPersistence::createQuorumTrackingTable(soci::session& sess)
{
sess << "CREATE TABLE quoruminfo ("
"nodeid CHARACTER(56) NOT NULL,"
"qsethash CHARACTER(64) NOT NULL,"
"PRIMARY KEY (nodeid))";
db.getRawMiscSession() << "DROP TABLE IF EXISTS quoruminfo";
db.getRawMiscSession() << "CREATE TABLE quoruminfo ("
"nodeid CHARACTER(56) NOT NULL,"
"qsethash CHARACTER(64) NOT NULL,"
"PRIMARY KEY (nodeid))";
}

void
HerderPersistence::deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count)
{
ZoneScoped;
DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count,
"scphistory", "ledgerseq");
DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count,
"scpquorums", "lastledgerseq");
DatabaseUtils::deleteOldEntriesHelper(db.getRawMiscSession(), ledgerSeq,
count, "scphistory", "ledgerseq");
DatabaseUtils::deleteOldEntriesHelper(db.getRawMiscSession(), ledgerSeq,
count, "scpquorums", "lastledgerseq");
}
}
5 changes: 3 additions & 2 deletions src/herder/PendingEnvelopes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,8 @@ PendingEnvelopes::getQSet(Hash const& hash)
else
{
auto& db = mApp.getDatabase();
qset = HerderPersistence::getQuorumSet(db, db.getSession(), hash);
qset =
HerderPersistence::getQuorumSet(db, db.getRawMiscSession(), hash);
}
if (qset)
{
Expand Down Expand Up @@ -814,7 +815,7 @@ PendingEnvelopes::rebuildQuorumTrackerState()
// see if we had some information for that node
auto& db = mApp.getDatabase();
auto h = HerderPersistence::getNodeQuorumSet(
db, db.getSession(), id);
db, db.getRawMiscSession(), id);
if (h)
{
res = getQSet(*h);
Expand Down
20 changes: 10 additions & 10 deletions src/herder/Upgrades.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,22 +690,22 @@ Upgrades::timeForUpgrade(uint64_t time) const
void
Upgrades::dropAll(Database& db)
{
db.getSession() << "DROP TABLE IF EXISTS upgradehistory";
db.getSession() << "CREATE TABLE upgradehistory ("
"ledgerseq INT NOT NULL CHECK (ledgerseq >= 0), "
"upgradeindex INT NOT NULL, "
"upgrade TEXT NOT NULL, "
"changes TEXT NOT NULL, "
"PRIMARY KEY (ledgerseq, upgradeindex)"
")";
db.getSession()
db.getRawSession() << "DROP TABLE IF EXISTS upgradehistory";
db.getRawSession() << "CREATE TABLE upgradehistory ("
"ledgerseq INT NOT NULL CHECK (ledgerseq >= 0), "
"upgradeindex INT NOT NULL, "
"upgrade TEXT NOT NULL, "
"changes TEXT NOT NULL, "
"PRIMARY KEY (ledgerseq, upgradeindex)"
")";
db.getRawSession()
<< "CREATE INDEX upgradehistbyseq ON upgradehistory (ledgerseq);";
}

void
Upgrades::dropSupportUpgradeHistory(Database& db)
{
db.getSession() << "DROP TABLE IF EXISTS upgradehistory";
db.getRawSession() << "DROP TABLE IF EXISTS upgradehistory";
}

static void
Expand Down
11 changes: 6 additions & 5 deletions src/herder/test/HerderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ TEST_CASE_VERSIONS("standalone", "[herder][acceptance]")
app->getCommandHandler().manualCmd("setcursor?id=A1&cursor=1");
app->getCommandHandler().manualCmd("maintenance?queue=true");
auto& db = app->getDatabase();
auto& sess = db.getSession();
auto& sess = db.getRawSession();

app->getCommandHandler().manualCmd("setcursor?id=A2&cursor=3");
app->getCommandHandler().manualCmd("maintenance?queue=true");
Expand Down Expand Up @@ -3395,15 +3395,15 @@ TEST_CASE("overlay parallel processing")
// soroban traffic
currLoadGenCount = loadGenDone.count();
auto secondLoadGenCount = secondLoadGenDone.count();
uint32_t const classicTxCount = 200;
uint32_t const txCount = 100;
// Generate Soroban txs from one node
loadGen.generateLoad(GeneratedLoadConfig::txLoad(
LoadGenMode::SOROBAN_UPLOAD, 50,
/* nTxs */ 500, desiredTxRate, /* offset */ 0));
/* nTxs */ txCount, desiredTxRate, /* offset */ 0));
// Generate classic txs from another node (with offset to prevent
// overlapping accounts)
secondLoadGen.generateLoad(GeneratedLoadConfig::txLoad(
LoadGenMode::PAY, 50, classicTxCount, desiredTxRate,
LoadGenMode::PAY, 50, txCount, desiredTxRate,
/* offset */ 50));

simulation->crankUntil(
Expand Down Expand Up @@ -5465,7 +5465,8 @@ TEST_CASE("SCP message capture from previous ledger", "[herder]")
// Prepare query
auto& db = node->getDatabase();
auto prep = db.getPreparedStatement(
"SELECT envelope FROM scphistory WHERE ledgerseq = :l");
"SELECT envelope FROM scphistory WHERE ledgerseq = :l",
db.getMiscSession());
auto& st = prep.statement();
st.exchange(soci::use(ledgerNum));
std::string envStr;
Expand Down
17 changes: 9 additions & 8 deletions src/history/HistoryManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ static std::string kSQLCreateStatement =
void
HistoryManager::dropAll(Database& db)
{
db.getSession() << "DROP TABLE IF EXISTS publishqueue;";
soci::statement st = db.getSession().prepare << kSQLCreateStatement;
db.getRawSession() << "DROP TABLE IF EXISTS publishqueue;";
soci::statement st = db.getRawSession().prepare << kSQLCreateStatement;
st.execute(true);
}

Expand Down Expand Up @@ -149,7 +149,8 @@ HistoryManagerImpl::dropSQLBasedPublish()
// Migrate all the existing queued checkpoints to the new format
{
std::string state;
auto prep = db.getPreparedStatement("SELECT state FROM publishqueue;");
auto prep =
db.getPreparedStatement("SELECT state FROM publishqueue;", sess);
auto& st = prep.statement();
st.exchange(soci::into(state));
st.define_and_bind();
Expand All @@ -170,9 +171,9 @@ HistoryManagerImpl::dropSQLBasedPublish()
for (auto const& checkpoint : checkpointLedgers)
{
auto begin = firstLedgerInCheckpointContaining(checkpoint);
populateCheckpointFilesFromDB(mApp, sess, begin, freq,
populateCheckpointFilesFromDB(mApp, sess.session(), begin, freq,
mCheckpointBuilder);
LedgerHeaderUtils::copyToStream(db, sess, begin, freq,
LedgerHeaderUtils::copyToStream(db, sess.session(), begin, freq,
mCheckpointBuilder);
// Checkpoints in publish queue are complete, so we can finalize them
mCheckpointBuilder.checkpointComplete(checkpoint);
Expand All @@ -184,17 +185,17 @@ HistoryManagerImpl::dropSQLBasedPublish()
{
// Then, reconstruct any partial checkpoints that haven't yet been
// queued
populateCheckpointFilesFromDB(mApp, sess,
populateCheckpointFilesFromDB(mApp, sess.session(),
firstLedgerInCheckpointContaining(lcl),
freq, mCheckpointBuilder);
LedgerHeaderUtils::copyToStream(db, sess,
LedgerHeaderUtils::copyToStream(db, sess.session(),
firstLedgerInCheckpointContaining(lcl),
freq, mCheckpointBuilder);
}
db.clearPreparedStatementCache();

// Now it's safe to drop obsolete SQL tables
sess << "DROP TABLE IF EXISTS publishqueue;";
sess.session() << "DROP TABLE IF EXISTS publishqueue;";
dropSupportTxHistory(db);
dropSupportTxSetHistory(db);
}
Expand Down
9 changes: 5 additions & 4 deletions src/history/StateSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ StateSnapshot::writeSCPMessages() const
{
ZoneScoped;
std::unique_ptr<soci::session> snapSess(
mApp.getDatabase().canUsePool()
? std::make_unique<soci::session>(mApp.getDatabase().getPool())
: nullptr);
soci::session& sess(snapSess ? *snapSess : mApp.getDatabase().getSession());
(mApp.getDatabase().canUsePool()
? std::make_unique<soci::session>(mApp.getDatabase().getMiscPool())
: nullptr));
soci::session& sess(snapSess ? *snapSess
: mApp.getDatabase().getRawMiscSession());
soci::transaction tx(sess);

// The current "history block" is stored in _four_ files, one just ledger
Expand Down
Loading

0 comments on commit fc5f387

Please sign in to comment.