Skip to content

Commit

Permalink
wip: merge
Browse files Browse the repository at this point in the history
  • Loading branch information
edmundnoble committed Jun 19, 2024
5 parents b714bab + 96c1d21 + 22c187d + 8904396 + 0a3cd89 commit c5e5ac6
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
2 changes: 2 additions & 0 deletions changes/2024-06-19T182508-0400.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Log current cut periodically, instead of when it changes, for more
consistency and less space use.
2 changes: 2 additions & 0 deletions changes/2024-06-19T182544-0400.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Better progress messages for read-only replay, including a time
estimate and smoothed rate calculation.
1 change: 1 addition & 0 deletions changes/2024-06-19T182611-0400.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up read-only replay by avoiding playing empty blocks
2 changes: 2 additions & 0 deletions changes/2024-06-19T182645-0400.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a performance bug in read-only replay which was not using a cache
for module data
6 changes: 3 additions & 3 deletions node/ChainwebNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ runMonitorLoop actionLabel logger = runForeverThrottled
runCutMonitor :: Logger logger => logger -> CutDb tbl -> IO ()
runCutMonitor logger db = L.withLoggerLabel ("component", "cut-monitor") logger $ \l ->
runMonitorLoop "ChainwebNode.runCutMonitor" l $ do
S.mapM_ (logFunctionJson l Info)
$ S.map (cutToCutHashes Nothing)
$ cutStream db
logFunctionJson l Info . cutToCutHashes Nothing
=<< _cut db
threadDelay 15_000_000

data BlockUpdate = BlockUpdate
{ _blockUpdateBlockHeader :: !(ObjectEncoded BlockHeader)
Expand Down
11 changes: 8 additions & 3 deletions src/Chainweb/Pact/Backend/RelationalCheckpointer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ doReadFrom logger v cid sql moduleCacheVar maybeParent doRead = do
Nothing -> genesisHeight v cid
Just parent -> succ . _blockHeight . _parentHeader $ parent

withMVar moduleCacheVar $ \sharedModuleCache -> do
modifyMVar moduleCacheVar $ \sharedModuleCache -> do
bracket
(beginSavepoint sql BatchSavepoint)
(\_ -> abortSavepoint sql BatchSavepoint) $ \() -> do
getEndTxId "doReadFrom" sql maybeParent >>= traverse \startTxId -> do
h <- getEndTxId "doReadFrom" sql maybeParent >>= traverse \startTxId -> do
newDbEnv <- newMVar $ BlockEnv
(mkBlockHandlerEnv v cid currentHeight sql DoNotPersistIntraBlockWrites logger)
(initBlockState defaultModuleCacheLimit startTxId)
Expand All @@ -184,7 +184,12 @@ doReadFrom logger v cid sql moduleCacheVar maybeParent doRead = do
, _cpLookupProcessedTx = \hs ->
runBlockEnv newDbEnv (doLookupSuccessful currentHeight hs)
}
doRead curBlockDbEnv
r <- doRead curBlockDbEnv
finalCache <- _bsModuleCache . _benvBlockState <$> readMVar newDbEnv
return (r, finalCache)
case h of
NoHistory -> return (sharedModuleCache, NoHistory)
Historical (r, finalCache) -> return (finalCache, Historical r)



Expand Down
47 changes: 34 additions & 13 deletions src/Chainweb/Pact/PactService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ import Chainweb.Version
import Chainweb.Version.Guards
import Utils.Logging.Trace
import Chainweb.Counter
import Data.Time.Clock
import Text.Printf
import Data.Time.Format.ISO8601

runPactService
:: Logger logger
Expand Down Expand Up @@ -787,7 +790,7 @@ execReadOnlyReplay lowerBound maybeUpperBound = pactLabel "execReadOnlyReplay" $
withPactState $ \runPact ->
liftIO $ getBranchIncreasing bhdb upperBound (int lowerHeight) $ \blocks -> do
heightRef <- newIORef lowerHeight
withAsync (heightProgress lowerHeight heightRef (logInfo_ logger)) $ \_ -> do
withAsync (heightProgress lowerHeight (_blockHeight upperBound) heightRef (logInfo_ logger)) $ \_ -> do
blocks
& Stream.hoist liftIO
& play bhdb pdb heightRef runPact
Expand Down Expand Up @@ -821,27 +824,45 @@ execReadOnlyReplay lowerBound maybeUpperBound = pactLabel "execReadOnlyReplay" $
liftIO $ writeIORef heightRef (_blockHeight bh)
payload <- liftIO $ fromJuste <$>
lookupPayloadDataWithHeight pdb (Just $ _blockHeight bh) (_blockPayloadHash bh)
void $ execBlock bh (CheckablePayload payload)
let isPayloadEmpty = V.null (_payloadDataTransactions payload)
let isUpgradeBlock = isJust $ _chainwebVersion bhdb ^? versionUpgrades . onChain (_chainId bhdb) . ix (_blockHeight bh)
unless (isPayloadEmpty && not isUpgradeBlock) $
void $ execBlock bh (CheckablePayload payload)
)
validationFailed <- readIORef validationFailedRef
when validationFailed $
throwM $ BlockValidationFailure $ BlockValidationFailureMsg $
J.encodeJsonText ("Prior block validation errors" :: Text)
return r

heightProgress :: BlockHeight -> IORef BlockHeight -> (Text -> IO ()) -> IO ()
heightProgress initialHeight ref logFun = do
r <- newIORef initialHeight
let delaySecs = 20
heightProgress :: BlockHeight -> BlockHeight -> IORef BlockHeight -> (Text -> IO ()) -> IO ()
heightProgress initialHeight endHeight ref logFun = do
heightAndRateRef <- newIORef (initialHeight, 0.0 :: Double)
let delayMicros = 20_000_000
liftIO $ threadDelay (delayMicros `div` 2)
forever $ do
h <- readIORef r
h' <- readIORef ref
writeIORef r h'
liftIO $ threadDelay delayMicros
(lastHeight, oldRate) <- readIORef heightAndRateRef
now' <- getCurrentTime
currentHeight <- readIORef ref
let blocksPerSecond
= 0.8
* oldRate
+ 0.2
* fromIntegral (currentHeight - lastHeight)
/ (fromIntegral delayMicros / 1_000_000)
writeIORef heightAndRateRef (currentHeight, blocksPerSecond)
let est =
flip addUTCTime now'
$ realToFrac @Double @NominalDiffTime
$ fromIntegral @BlockHeight @Double
(endHeight - initialHeight)
/ blocksPerSecond
logFun
$ "processed: " <> sshow (h' - initialHeight)
<> ", current height: " <> sshow h'
<> ", rate: " <> sshow ((h' - h) `div` fromIntegral delaySecs) <> "blocks/sec"
liftIO $ threadDelay (delaySecs * 1_000_000)
$ T.pack $ printf "height: %d | rate: %.1f blocks/sec | est. %s"
(fromIntegral @BlockHeight @Int $ currentHeight - initialHeight)
blocksPerSecond
(formatShow iso8601Format est)

execLocal
:: (Logger logger, CanReadablePayloadCas tbl)
Expand Down

0 comments on commit c5e5ac6

Please sign in to comment.