Skip to content

Commit

Permalink
add rocksdb compaction to compact-resume test
Browse files Browse the repository at this point in the history
Change-Id: I1a55ff53d99cd6093d1e397dc56820969d75b20e
  • Loading branch information
chessai committed Jul 2, 2024
1 parent addfb35 commit 1f5b7ad
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
30 changes: 13 additions & 17 deletions src/Chainweb/Pact/Backend/CompactionInMemory.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Chainweb.Pact.Backend.CompactionInMemory
( main
, withDefaultLogger
, compactPactState
, trimRocksDb
)
where

Expand Down Expand Up @@ -59,6 +60,7 @@ import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp)
import Data.ByteString (ByteString)
import Data.Function (fix, (&))
import Data.Int (Int64)
import Data.LogMessage (SomeLogMessage, logText)
import Data.LruCache (LruCache)
import Data.LruCache qualified as Lru
Expand Down Expand Up @@ -263,16 +265,12 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do

-- Copy over TransactionIndex.
--
-- TODO: Should we compact this based on the RocksDB 'minBlockHeight'?
-- We compact this based on the RocksDB 'minBlockHeight'.
-- /poll and SPV rely on having this table synchronised with RocksDB.
--
-- We should let users have the option to keep this around, as an
-- advanced option, and document those APIs which need it.
--
-- It isn't (currently) compacted so the amount of data is quite large
-- This, SYS:Pacts, and coin_coin-table were all used as benchmarks
-- for optimisations.
--
-- Maybe consider
-- https://tableplus.com/blog/2018/07/sqlite-how-to-copy-table-to-another-database.html
do
Expand Down Expand Up @@ -303,11 +301,15 @@ compactPactState logger v targetBlockHeight srcDb targetDb = do
-- This is faster than creating the indices before
createCheckpointerIndexes targetDb logger

-- Grab the endingtxid for determining latest state at the
-- target height
endingTxId <- getEndingTxId srcDb targetBlockHeight

-- Compact all user tables
log LL.Info "Starting user tables"
getLatestPactTableNamesAt srcDb targetBlockHeight
& S.mapM_ (\tblname -> do
compactTable logger srcDb targetDb (fromUtf8 tblname) targetBlockHeight
compactTable logger srcDb targetDb (fromUtf8 tblname) endingTxId
)

log LL.Info "Compaction done"
Expand Down Expand Up @@ -341,7 +343,7 @@ compact cfg = do
let cids = allChains cfg.chainwebVersion

-- Get the target blockheight.
targetBlockHeight <- withDefaultLogger LL.Error $ \logger -> do
targetBlockHeight <- withDefaultLogger LL.Debug $ \logger -> do
logFunctionText logger LL.Info "doin it!"

threadDelay 5_000_000
Expand All @@ -358,6 +360,8 @@ compact cfg = do
createDirectoryIfMissing True (pactDir cfg.toDir)
createDirectoryIfMissing True (rocksDir cfg.toDir)

logFunctionText logger LL.Debug $ "targetBlockHeight: " <> sshow targetBlockHeight

pure targetBlockHeight

-- Trim RocksDB here
Expand All @@ -380,9 +384,9 @@ compactTable :: (Logger logger)
-> Database -- ^ source database (where we get the active pact state)
-> Database -- ^ target database (where we put the compacted state, + use as a rowkey cache)
-> Text -- ^ the table we are compacting
-> BlockHeight -- ^ target blockheight
-> Int64 -- ^ target blockheight
-> IO ()
compactTable logger srcDb targetDb tblname targetBlockHeight = do
compactTable logger srcDb targetDb tblname endingTxId = do
let log = logFunctionText logger
let tblnameUtf8 = toUtf8 tblname

Expand All @@ -409,14 +413,6 @@ compactTable logger srcDb targetDb tblname targetBlockHeight = do
log LL.Info $ "Creating table indices for " <> tblname
createUserTableIndex targetDb tblnameUtf8

-- Grab the endingtxid for determining latest state at the
-- target height
--
-- TODO: move this out, doesn't need to be called for every table.
--
-- bh: 400, endingtxId: 10 (exclusive)
endingTxId <- getEndingTxId srcDb targetBlockHeight

-- | Get the active pact state (a 'Stream' of 'PactRow').
-- Uses an LruCache of RowKeys - the LRU Cache internally just stores
-- hashes.
Expand Down
20 changes: 16 additions & 4 deletions test/Chainweb/Test/MultiNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,12 @@ compactAndResumeTest :: ()
-> ChainwebVersion
-> Natural
-> RocksDb
-> RocksDb
-> FilePath
-> FilePath
-> (String -> IO ())
-> IO ()
compactAndResumeTest logLevel v n rdb srcPactDir targetPactDir step = do
compactAndResumeTest logLevel v n srcRocksDb targetRocksDb srcPactDir targetPactDir step = do
let logFun = step . T.unpack
let logger = genericLogger logLevel logFun

Expand All @@ -533,21 +534,32 @@ compactAndResumeTest logLevel v n rdb srcPactDir targetPactDir step = do
stateVar <- newMVar (emptyConsensusState v)
let ct :: Int -> StartedChainweb logger -> IO ()
ct = harvestConsensusState logger stateVar
runNodesForSeconds logLevel logFun (multiConfig v n) n 60 rdb srcPactDir ct
runNodesForSeconds logLevel logFun (multiConfig v n) n 10 srcRocksDb srcPactDir ct
Just stats1 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v)
assertGe "average block count before compaction" (Actual $ _statBlockCount stats1) (Expected 50)
logFun $ sshow stats1

logFun "phase 2... compacting"
forM_ [0 .. int @_ @Int n - 1] $ \nid -> do

logFun "phase 2.1...compacting pact state"
forM_ [0 .. int @_ @Word n - 1] $ \nid -> do
forM_ (allChains v) $ \cid -> do
let logger' = addLabel ("nodeId", sshow nid) $ addLabel ("chainId", chainIdToText cid) logger
withSqliteDb cid logger' (srcPactDir </> show nid) False $ \srcDb -> do
withSqliteDb cid logger' (targetPactDir </> show nid) False $ \targetDb -> do
sigmaCompact v srcDb targetDb (BlockHeight 25)

logFun "phase 2.2...compacting RocksDB"
forM_ [0 .. int @_ @Word n - 1] $ \nid -> do
let srcRdb = srcRocksDb { _rocksDbNamespace = T.encodeUtf8 (toText nid) }
let tgtRdb = targetRocksDb { _rocksDbNamespace = T.encodeUtf8 (toText nid) }
-- We must keep every payload including and after the compaction target,
-- hence 'maxBound'.
Sigma.trimRocksDb (addLabel ("nodeId", sshow nid) logger) v (allChains v) 20 maxBound srcRdb tgtRdb

--namespacedNodeRocksDb = rdb { _rocksDbNamespace = T.encodeUtf8 $ toText nid }
logFun "phase 3... restarting nodes and ensuring progress"
runNodesForSeconds logLevel logFun (multiConfig v n) { _configFullHistoricPactState = False } n 60 rdb targetPactDir ct
runNodesForSeconds logLevel logFun (multiConfig v n) { _configFullHistoricPactState = False } n 10 targetRocksDb targetPactDir ct
Just stats2 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v)
-- We ensure that we've gotten to at least 1.5x the previous block count
assertGe "average block count post-compaction" (Actual $ _statBlockCount stats2) (Expected (3 * _statBlockCount stats1 `div` 2))
Expand Down
11 changes: 6 additions & 5 deletions test/SlowTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@ main :: IO ()
main = defaultMain suite

loglevel :: LogLevel
loglevel = Warn
loglevel = Info --Warn

-- note that because these tests run in parallel they must all use distinct rocksdb and sqlite dirs.
suite :: TestTree
suite = independentSequentialTestGroup "ChainwebSlowTests"
[ testCaseSteps "compact-resume" $ \step ->
withTempRocksDb "compact-resume-test-rocks" $ \rdb ->
withTempRocksDb "compact-resume-test-rocks-src" $ \srcRocksDb ->
withTempRocksDb "compact-resume-test-rocks-target" $ \targetRocksDb ->
withSystemTempDirectory "compact-resume-test-pact-src" $ \srcPactDbDir ->
withSystemTempDirectory "compact-resume-test-pact-target" $ \targetPactDbDir -> do
Chainweb.Test.MultiNode.compactAndResumeTest loglevel (fastForkingCpmTestVersion pairChainGraph) 6 rdb srcPactDbDir targetPactDbDir step
, testCaseSteps "compact-live-node" $ \step ->
Chainweb.Test.MultiNode.compactAndResumeTest loglevel (fastForkingCpmTestVersion pairChainGraph) 6 srcRocksDb targetRocksDb srcPactDbDir targetPactDbDir step
{-, testCaseSteps "compact-live-node" $ \step ->
withTempRocksDb "pact-import-test-rocks" $ \rdb ->
withSystemTempDirectory "pact-import-test-pact-src" $ \srcPactDbDir ->
withSystemTempDirectory "pact-import-test-pact-target" $ \targetPactDbDir -> do
Expand All @@ -61,5 +62,5 @@ suite = independentSequentialTestGroup "ChainwebSlowTests"
Chainweb.Test.MultiNode.pactImportTest loglevel (fastForkingCpmTestVersion twentyChainGraph) 1 rdb pactDbDir step
, testGroup "Network.X05.SelfSigned.Test"
[ Network.X509.SelfSigned.Test.tests
]
]-}
]

0 comments on commit 1f5b7ad

Please sign in to comment.