From 89ef49418474742bdd5c59292c4c20b8a8015d89 Mon Sep 17 00:00:00 2001 From: Evgenii Akentev Date: Mon, 4 Mar 2024 21:23:17 +0400 Subject: [PATCH] Process PactQueue concurrently. Signed-off-by: Evgenii Akentev Change-Id: Ia007a565d75c625ddb243534a546d57584ec8e7d --- bench/Chainweb/Pact/Backend/ForkingBench.hs | 46 ++- chainweb.cabal | 1 + src/Chainweb/Pact/PactService.hs | 290 ++++++++++-------- src/Chainweb/Pact/Service/PactInProcApi.hs | 9 +- src/Chainweb/Pact/Service/PactQueue.hs | 50 ++- .../Chainweb/Test/Pact/PactSingleChainTest.hs | 6 +- test/Chainweb/Test/Pact/TTL.hs | 6 +- test/Chainweb/Test/Pact/Utils.hs | 6 +- 8 files changed, 255 insertions(+), 159 deletions(-) diff --git a/bench/Chainweb/Pact/Backend/ForkingBench.hs b/bench/Chainweb/Pact/Backend/ForkingBench.hs index 7bfaae6a7c..84a11cc6e4 100644 --- a/bench/Chainweb/Pact/Backend/ForkingBench.hs +++ b/bench/Chainweb/Pact/Backend/ForkingBench.hs @@ -54,6 +54,7 @@ import GHC.Generics hiding (from, to) import System.Environment import System.LogLevel import System.Random +import System.Directory import Text.Printf @@ -87,7 +88,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils import Chainweb.Pact.PactService -import Chainweb.Pact.Service.BlockValidation +import Chainweb.Pact.Service.BlockValidation as BlockValidation import Chainweb.Pact.Service.PactQueue import Chainweb.Pact.Service.Types import Chainweb.Pact.Types @@ -142,6 +143,7 @@ bench :: RocksDb -> C.Benchmark bench rdb = C.bgroup "PactService" $ [ forkingBench , doubleForkingBench + , queueBench ] ++ map (oneBlock defBenchConfig) [1, 10, 50, 100] ++ map (oneBlock validateCfg) [0, 1, 10, 50, 100] ++ map (oneBlock compactCfg) [0, 1, 10, 50, 100] @@ -157,6 +159,21 @@ bench rdb = C.bgroup "PactService" $ let (T3 _ join1 _) = mainLineBlocks !! 5 void $ playLine pdb bhdb 5 join1 pactQueue nonceCounter + queueBench = withResources rdb 10 Quiet DontCompact + $ \mainLineBlocks pdb bhdb nonceCounter pactQueue _ -> + C.bench "queueBench" $ C.whnfIO $ do + let (T3 _ join1 _) = mainLineBlocks !! 5 + + race_ (void $ playLine pdb bhdb 100 join1 pactQueue nonceCounter) $ do + meta <- makeMeta cid + (_, _, cmds) <- unzip3 <$> createCoinAccounts testVer meta (take 30000 names) + let + txs = case traverse validateCommand cmds of + Left _ -> [] + Right !txs' -> txs' + + forM_ txs (\tx -> BlockValidation.local Nothing Nothing Nothing tx pactQueue) + doubleForkingBench = withResources rdb 10 Quiet DontCompact $ \mainLineBlocks pdb bhdb nonceCounter pactQueue _ -> C.bench "doubleForkingBench" $ C.whnfIO $ do @@ -263,6 +280,7 @@ data Resources , nonceCounter :: !(IORef Word64) , txPerBlock :: !(IORef Int) , sqlEnv :: !SQLiteEnv + , sqlEnv2 :: !SQLiteEnv } type RunPactService = @@ -293,10 +311,11 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr coinAccounts <- newMVar mempty nonceCounter <- newIORef 1 txPerBlock <- newIORef 10 - sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + sqlEnv <- openSQLiteConnection "/tmp/chainweb-bench.db" {- temporary SQLite db -} chainwebBenchPragmas + sqlEnv2 <- openSQLiteConnection "/tmp/chainweb-bench.db" {- temporary SQLite db -} chainwebBenchPragmas mp <- testMemPoolAccess txPerBlock coinAccounts pactService <- - startPact testVer logger blockHeaderDb payloadDb mp sqlEnv + startPact testVer logger blockHeaderDb payloadDb mp (sqlEnv, sqlEnv2) mainTrunkBlocks <- playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter when (compact == DoCompact) $ do @@ -310,6 +329,8 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr destroy (NoopNFData (Resources {..})) = do stopPact pactService stopSqliteDb sqlEnv + stopSqliteDb sqlEnv2 + removeFile "/tmp/chainweb-bench.db" pactQueueSize = 2000 @@ -320,7 +341,6 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig { _pactBlockGasLimit = 180_000 } - return (a, reqQ) stopPact (a, _) = cancel a @@ -328,8 +348,7 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr chainwebBenchPragmas = [ "synchronous = NORMAL" , "journal_mode = WAL" - , "locking_mode = EXCLUSIVE" - -- this is different from the prodcution database that uses @NORMAL@ + , "locking_mode = NORMAL" , "temp_store = MEMORY" , "auto_vacuum = NONE" , "page_size = 1024" @@ -371,7 +390,7 @@ testMemPoolAccess txsPerBlock accounts = do getTestBlock mVarAccounts txOrigTime validate bHeight hash | bHeight == 1 = do meta <- setTime txOrigTime <$> makeMeta cid - (as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta + (as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames case traverse validateCommand cmds of Left err -> throwM $ userError err Right !r -> do @@ -470,15 +489,20 @@ stockKey s = do stockKeyFile :: ByteString stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml") -createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)) -createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names +createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)] +createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names' where go a m = do (b,c) <- m return (Account a,b,c) -names :: NonEmpty String -names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" +twoNames :: [String] +twoNames = take 2 names + +names :: [String] +names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]] + where + names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" formatB16PubKey :: DynKeyPair -> Text formatB16PubKey = \case diff --git a/chainweb.cabal b/chainweb.cabal index 8db6a6a0c7..2f1d032899 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -827,6 +827,7 @@ benchmark bench , containers >= 0.5 , criterion , deepseq >= 1.4 + , directory >= 1.3 , exceptions >= 0.8 , file-embed >= 0.0 , lens >= 4.17 diff --git a/src/Chainweb/Pact/PactService.hs b/src/Chainweb/Pact/PactService.hs index 797d119b54..490b7fa2ae 100644 --- a/src/Chainweb/Pact/PactService.hs +++ b/src/Chainweb/Pact/PactService.hs @@ -99,7 +99,7 @@ import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpoin import Chainweb.Pact.Backend.Types import Chainweb.Pact.PactService.ExecBlock import Chainweb.Pact.PactService.Checkpointer -import Chainweb.Pact.Service.PactQueue (PactQueue, getNextRequest) +import Chainweb.Pact.Service.PactQueue (PactQueue, getNextWriteRequest, getNextReadRequest) import Chainweb.Pact.Service.Types import Chainweb.Pact.SPV import Chainweb.Pact.TransactionExec @@ -126,52 +126,37 @@ runPactService -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> IO () -runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb sqlenv config = - void $ withPactService ver cid chainwebLogger bhDb pdb sqlenv config $ do +runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb (sqlenv, sqlenv2) config = do + (pst, pse) <- mkPactService ver cid chainwebLogger bhDb pdb sqlenv config + + void $ runPactServiceM pst pse $ do initialPayloadState mempoolAccess ver cid - serviceRequests mempoolAccess reqQ -withPactService + (pst', pse') <- mkPactService ver cid chainwebLogger bhDb pdb sqlenv2 config + + race_ + (void $ runPactServiceM pst pse $ serviceWriteRequests mempoolAccess reqQ) + (void $ runPactServiceM pst' pse' $ serviceReadRequests mempoolAccess reqQ) + +mkPactService :: (Logger logger, CanReadablePayloadCas tbl) => ChainwebVersion -> ChainId -> logger -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> SQLiteEnv -> PactServiceConfig - -> PactServiceM logger tbl a - -> IO (T2 a PactServiceState) -withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = - withProdRelationalCheckpointer checkpointerLogger initialBlockState sqlenv ver cid $ \checkpointer -> do - let !rs = readRewards - let !pse = PactServiceEnv - { _psMempoolAccess = Nothing - , _psCheckpointer = checkpointer - , _psPdb = pdb - , _psBlockHeaderDb = bhDb - , _psGasModel = getGasModel - , _psMinerRewards = rs - , _psReorgLimit = _pactReorgLimit config - , _psPreInsertCheckTimeout = _pactPreInsertCheckTimeout config - , _psOnFatalError = defaultOnFatalError (logFunctionText chainwebLogger) - , _psVersion = ver - , _psAllowReadsInLocal = _pactAllowReadsInLocal config - , _psLogger = pactServiceLogger - , _psGasLogger = gasLogger <$ guard (_pactLogGas config) - , _psBlockGasLimit = _pactBlockGasLimit config - , _psEnableLocalTimeout = _pactEnableLocalTimeout config - } - !pst = PactServiceState mempty - + -> IO (PactServiceState, PactServiceEnv logger tbl) +mkPactService ver cid chainwebLogger bhDb pdb sqlenvs config = + withProdRelationalCheckpointer checkpointerLogger initialBlockState sqlenvs ver cid $ \checkpointer -> do when (_pactFullHistoryRequired config) $ do mEarliestBlock <- _cpGetEarliestBlock (_cpReadCp checkpointer) case mEarliestBlock of - Nothing -> do - pure () + Nothing -> pure () Just (earliestBlockHeight, _) -> do let gHeight = genesisHeight ver cid when (gHeight /= earliestBlockHeight) $ do @@ -189,20 +174,57 @@ withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = logError_ chainwebLogger (J.encodeText msg) throwM e - runPactServiceM pst pse $ do + let !rs = readRewards + let !pse = PactServiceEnv + { _psMempoolAccess = Nothing + , _psCheckpointer = checkpointer + , _psPdb = pdb + , _psBlockHeaderDb = bhDb + , _psGasModel = getGasModel + , _psMinerRewards = rs + , _psReorgLimit = _pactReorgLimit config + , _psPreInsertCheckTimeout = _pactPreInsertCheckTimeout config + , _psOnFatalError = defaultOnFatalError (logFunctionText chainwebLogger) + , _psVersion = ver + , _psAllowReadsInLocal = _pactAllowReadsInLocal config + , _psLogger = pactServiceLogger + , _psGasLogger = gasLogger <$ guard (_pactLogGas config) + , _psBlockGasLimit = _pactBlockGasLimit config + , _psEnableLocalTimeout = _pactEnableLocalTimeout config + } + !pst = PactServiceState mempty + + void $ runPactServiceM pst pse $ -- If the latest header that is stored in the checkpointer was on an -- orphaned fork, there is no way to recover it in the call of -- 'initalPayloadState.readContracts'. We therefore rewind to the latest -- avaliable header in the block header database. -- exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) - act + + return (pst, pse) where initialBlockState = initBlockState (_pactModuleCacheLimit config) $ genesisHeight ver cid pactServiceLogger = setComponent "pact" chainwebLogger checkpointerLogger = addLabel ("sub-component", "checkpointer") pactServiceLogger gasLogger = addLabel ("transaction", "GasLogs") pactServiceLogger +withPactService + :: Logger logger + => CanReadablePayloadCas tbl + => ChainwebVersion + -> ChainId + -> logger + -> BlockHeaderDb + -> PayloadDb tbl + -> SQLiteEnv + -> PactServiceConfig + -> PactServiceM logger tbl a + -> IO (T2 a PactServiceState) +withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = do + (pst, pse) <- mkPactService ver cid chainwebLogger bhDb pdb sqlenv config + runPactServiceM pst pse act + initializeLatestBlock :: (Logger logger) => CanReadablePayloadCas tbl => Bool -> PactServiceM logger tbl () initializeLatestBlock unlimitedRewind = findLatestValidBlockHeader' >>= \case Nothing -> return () @@ -282,36 +304,24 @@ lookupBlockHeader bhash ctx = do throwM $ BlockHeaderLookupFailure $ "failed lookup of parent header in " <> ctx <> ": " <> sshow e --- | Loop forever, serving Pact execution requests and reponses from the queues -serviceRequests +-- | Loop forever, serving Pact execution Write-requests +serviceWriteRequests :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) => MemPoolAccess -> PactQueue -> PactServiceM logger tbl () -serviceRequests memPoolAccess reqQ = do - logInfo "Starting service" - go `finally` logInfo "Stopping service" +serviceWriteRequests memPoolAccess reqQ = do + logInfo "Starting Write-requests handling service" + go `finally` logInfo "Stopping Write-requests handling service" where go = do PactServiceEnv{_psLogger} <- ask - logDebug "serviceRequests: wait" - msg <- liftIO $ getNextRequest reqQ + logDebug "serviceWriteRequests: wait" + msg <- liftIO $ getNextWriteRequest reqQ requestId <- liftIO $ UUID.toText <$> UUID.nextRandom let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger - logDebug $ "serviceRequests: " <> sshow msg + logDebug $ "serviceWriteRequests: " <> sshow msg case msg of - CloseMsg -> return () - LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do - trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ - tryOne "execLocal" localResultVar $ - execLocal localRequest preflight sigVerify rewindDepth - go - NewBlockMsg NewBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execNewBlock" - () 1 $ - tryOne "execNewBlock" _newResultVar $ - execNewBlock memPoolAccess _newMiner - go ValidateBlockMsg ValidateBlockReq {..} -> do tryOne "execValidateBlock" _valResultVar $ fmap fst $ trace' logFn "Chainweb.Pact.PactService.execValidateBlock" @@ -319,6 +329,44 @@ serviceRequests memPoolAccess reqQ = do (\(_, g) -> fromIntegral g) (execValidateBlock memPoolAccess _valBlockHeader _valPayloadData) go + SyncToBlockMsg SyncToBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ + tryOne "syncToBlockBlock" _syncToResultVar $ + execSyncToBlock _syncToBlockHeader + go + NewBlockMsg NewBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execNewBlock" + () 1 $ + tryOne "execNewBlock" _newResultVar $ + execNewBlock memPoolAccess _newMiner + go + _ -> error $ "impossible: unexpected request " ++ show msg + +-- | Loop forever, serving Pact execution Read-requests +serviceReadRequests + :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) + => MemPoolAccess + -> PactQueue + -> PactServiceM logger tbl () +serviceReadRequests _ reqQ = do + logInfo "Starting read-requests handling service" + go `finally` (logInfo "Stopping read-requests handling service") + where + go = do + logDebug "serviceReadRequests: wait" + msg <- liftIO $ getNextReadRequest reqQ + requestId <- liftIO $ UUID.toText <$> UUID.nextRandom + PactServiceEnv{_psLogger} <- ask + let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger + logDebug $ "serviceReadRequests: " <> sshow msg + case msg of + CloseMsg -> return () + + LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do + trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ + tryOne "execLocal" localResultVar $ + execLocal localRequest preflight sigVerify rewindDepth + go LookupPactTxsMsg (LookupPactTxsReq confDepth txHashes resultVar) -> do trace logFn "Chainweb.Pact.PactService.execLookupPactTxs" () (length txHashes) $ @@ -341,81 +389,79 @@ serviceRequests memPoolAccess reqQ = do tryOne "execHistoricalLookup" resultVar $ execHistoricalLookup bh d k go - SyncToBlockMsg SyncToBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ - tryOne "syncToBlockBlock" _syncToResultVar $ - execSyncToBlock _syncToBlockHeader - go ReadOnlyReplayMsg ReadOnlyReplayReq {..} -> do trace logFn "Chainweb.Pact.PactService.execReadOnlyReplay" (_readOnlyReplayLowerBound, _readOnlyReplayUpperBound) 1 $ tryOne "readOnlyReplayBlock" _readOnlyReplayResultVar $ execReadOnlyReplay _readOnlyReplayLowerBound _readOnlyReplayUpperBound go + _ -> error $ "impossible: unexpected request " ++ show msg +tryOne + :: forall logger tbl a. (Logger logger, CanReadablePayloadCas tbl) + => Text + -> MVar (Either PactException a) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne which mvar = tryOne' which mvar Right + +tryOne' + :: (Logger logger, CanReadablePayloadCas tbl) + => Text + -> MVar (Either PactException b) + -> (a -> Either PactException b) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne' which mvar post m = + (evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar)) + `catches` + [ Handler $ \(e :: SomeAsyncException) -> do + logWarn $ T.concat + [ "Received asynchronous exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ do + void $ tryPutMVar mvar $! toPactInternalError e + throwM e + , Handler $ \(e :: SomeException) -> do + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ do + void $ tryPutMVar mvar $! toPactInternalError e + ] + where toPactInternalError e = Left $ PactInternalError $ T.pack $ show e - tryOne - :: Text - -> MVar (Either PactException a) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne which mvar = tryOne' which mvar Right - - tryOne' - :: Text - -> MVar (Either PactException b) - -> (a -> Either PactException b) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne' which mvar post m = - (evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar)) - `catches` - [ Handler $ \(e :: SomeAsyncException) -> do - logWarn $ T.concat - [ "Received asynchronous exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ do - void $ tryPutMVar mvar $! toPactInternalError e - throwM e - , Handler $ \(e :: SomeException) -> do - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ do - void $ tryPutMVar mvar $! toPactInternalError e - ] - where - -- Pact turns AsyncExceptions into textual exceptions within - -- PactInternalError. So there is no easy way for us to distinguish - -- whether an exception originates from within pact or from the outside. - -- - -- A common strategy to deal with this is to run the computation (pact) - -- on a "hidden" internal thread. Lifting `forkIO` into a state - -- monad is generally not thread-safe. It is fine to do here, since - -- there is no concurrency. We use a thread here only to shield the - -- computation from external exceptions. - -- - -- This solution isn't bullet-proof and only meant as a temporary fix. A - -- proper solution is to fix pact, to handle asynchronous exceptions - -- gracefully. - -- - -- No mask is needed here. Asynchronous exceptions are handled - -- by the outer handlers and cause an abort. So no state is lost. - -- - evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a - evalPactOnThread act = do - e <- ask - s <- get - T2 r s' <- liftIO $ - withAsync (runPactServiceM s e act) wait - put $! s' - return $! r + -- Pact turns AsyncExceptions into textual exceptions within + -- PactInternalError. So there is no easy way for us to distinguish + -- whether an exception originates from within pact or from the outside. + -- + -- A common strategy to deal with this is to run the computation (pact) + -- on a "hidden" internal thread. Lifting `forkIO` into a state + -- monad is generally not thread-safe. It is fine to do here, since + -- there is no concurrency. We use a thread here only to shield the + -- computation from external exceptions. + -- + -- This solution isn't bullet-proof and only meant as a temporary fix. A + -- proper solution is to fix pact, to handle asynchronous exceptions + -- gracefully. + -- + -- No mask is needed here. Asynchronous exceptions are handled + -- by the outer handlers and cause an abort. So no state is lost. + -- + evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a + evalPactOnThread act = do + e <- ask + s <- get + T2 r s' <- liftIO $ + withAsync (runPactServiceM s e act) wait + put $! s' + return $! r execNewBlock diff --git a/src/Chainweb/Pact/Service/PactInProcApi.hs b/src/Chainweb/Pact/Service/PactInProcApi.hs index 926c7f1a0b..ca7d8d7e36 100644 --- a/src/Chainweb/Pact/Service/PactInProcApi.hs +++ b/src/Chainweb/Pact/Service/PactInProcApi.hs @@ -68,7 +68,8 @@ withPactService -> IO a withPactService ver cid logger mpc bhdb pdb pactDbDir config action = withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \sqlenv -> - withPactService' ver cid logger mpa bhdb pdb sqlenv config action + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \sqlenv2 -> + withPactService' ver cid logger mpa bhdb pdb (sqlenv, sqlenv2) config action where mpa = pactMemPoolAccess mpc $ addLabel ("sub-component", "MempoolAccess") logger @@ -84,18 +85,18 @@ withPactService' -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> (PactQueue -> IO a) -> IO a -withPactService' ver cid logger memPoolAccess bhDb pdb sqlenv config action = do +withPactService' ver cid logger memPoolAccess bhDb pdb sqlenvs config action = do reqQ <- newPactQueue (_pactQueueSize config) race (concurrently_ (monitor reqQ) (server reqQ)) (action reqQ) >>= \case Left () -> error "Chainweb.Pact.Service.PactInProcApi: pact service terminated unexpectedly" Right a -> return a where server reqQ = runForever logg "pact-service" - $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenv config + $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenvs config logg = logFunction logger monitor = runPactServiceQueueMonitor $ addLabel ("sub-component", "PactQueue") logger diff --git a/src/Chainweb/Pact/Service/PactQueue.hs b/src/Chainweb/Pact/Service/PactQueue.hs index b294aff5c4..3f7dc85052 100644 --- a/src/Chainweb/Pact/Service/PactQueue.hs +++ b/src/Chainweb/Pact/Service/PactQueue.hs @@ -17,7 +17,8 @@ -- module Chainweb.Pact.Service.PactQueue ( addRequest -, getNextRequest +, getNextWriteRequest +, getNextReadRequest , getPactQueueStats , newPactQueue , resetPactQueueStats @@ -51,9 +52,10 @@ import Chainweb.Utils -- other requests. -- data PactQueue = PactQueue - { _pactQueueValidateBlock :: !(TBQueue (T2 RequestMsg (Time Micros))) - , _pactQueueNewBlock :: !(TBQueue (T2 RequestMsg (Time Micros))) - , _pactQueueOtherMsg :: !(TBQueue (T2 RequestMsg (Time Micros))) + { _pactQueueWriteRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) + -- NewBlock requests are Read-requests as well but prioritize them with their own queue + , _pactQueueNewBlockRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) + , _pactQueueReadRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) , _pactQueuePactQueueValidateBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueNewBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueOtherMsgCounters :: !(IORef PactQueueCounters) @@ -84,18 +86,18 @@ addRequest q msg = do atomically $ writeTBQueue priority (T2 msg entranceTime) where priority = case msg of - ValidateBlockMsg {} -> _pactQueueValidateBlock q - NewBlockMsg {} -> _pactQueueNewBlock q - _ -> _pactQueueOtherMsg q - --- | Get the next available request from the Pact execution queue + -- Write-requests + ValidateBlockMsg {} -> _pactQueueWriteRequests q + SyncToBlockMsg {} -> _pactQueueWriteRequests q + -- Read-requests + NewBlockMsg {} -> _pactQueueNewBlockRequests q + _ -> _pactQueueReadRequests q + +-- | Get the next available Write-request from the Pact execution queue -- -getNextRequest :: PactQueue -> IO RequestMsg -getNextRequest q = do - T2 req entranceTime <- atomically - $ tryReadTBQueueOrRetry (_pactQueueValidateBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueNewBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueOtherMsg q) +getNextWriteRequest :: PactQueue -> IO RequestMsg +getNextWriteRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueWriteRequests q) <|> tryReadTBQueueOrRetry (_pactQueueNewBlockRequests q) requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime updatePactQueueCounters (counters req q) requestTime return req @@ -105,7 +107,23 @@ getNextRequest q = do Just msg -> return msg counters ValidateBlockMsg{} = _pactQueuePactQueueValidateBlockMsgCounters - counters NewBlockMsg{} = _pactQueuePactQueueNewBlockMsgCounters + counters NewBlockMsg{} = _pactQueuePactQueueNewBlockMsgCounters + counters _ = _pactQueuePactQueueOtherMsgCounters + +-- | Get the next available Read-request from the Pact execution queue +getNextReadRequest :: PactQueue -> IO RequestMsg +getNextReadRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueReadRequests q) + requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime + updatePactQueueCounters (counters req q) requestTime + return req + where + tryReadTBQueueOrRetry = tryReadTBQueue >=> \case + Nothing -> retry + Just msg -> return msg + + counters ValidateBlockMsg{} = error "getNextReadRequest.counters.impossible" + counters NewBlockMsg{} = error "getNextReadRequest.counters.impossible" counters _ = _pactQueuePactQueueOtherMsgCounters -- -------------------------------------------------------------------------- -- diff --git a/test/Chainweb/Test/Pact/PactSingleChainTest.hs b/test/Chainweb/Test/Pact/PactSingleChainTest.hs index 7433cfc8b7..2dae6b490e 100644 --- a/test/Chainweb/Test/Pact/PactSingleChainTest.hs +++ b/test/Chainweb/Test/Pact/PactSingleChainTest.hs @@ -272,11 +272,12 @@ rosettaFailsWithoutFullHistory rdb = blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid sqlEnv <- sqlEnvIO + sqlEnv2 <- sqlEnvIO mempool <- fmap snd dm let payloadDb = _bdbPayloadDb blockDb let cfg = testPactServiceConfig { _pactFullHistoryRequired = True } let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv cfg + e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (sqlEnv, sqlEnv2) cfg case e of Left (FullHistoryRequired {}) -> do pure () @@ -980,6 +981,7 @@ compactionSetup pat rdb pactCfg f = bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid let payloadDb = _bdbPayloadDb blockDb sqlEnv <- sqlEnvIO + sqlEnv2 <- sqlEnvIO (mempoolRef, mempool) <- do (ref, nonRef) <- dm pure (pure ref, nonRef) @@ -987,7 +989,7 @@ compactionSetup pat rdb pactCfg f = let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv pactCfg + void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (sqlEnv,sqlEnv2) pactCfg setOneShotMempool mempoolRef goldenMemPool diff --git a/test/Chainweb/Test/Pact/TTL.hs b/test/Chainweb/Test/Pact/TTL.hs index 79b2ffec87..3487929d12 100644 --- a/test/Chainweb/Test/Pact/TTL.hs +++ b/test/Chainweb/Test/Pact/TTL.hs @@ -242,10 +242,12 @@ doValidateBlock -> IO () doValidateBlock ctxIO header payload = do ctx <- ctxIO - _mv' <- validateBlock header (payloadWithOutputsToPayloadData payload) $ _ctxQueue ctx + mv <- validateBlock header (payloadWithOutputsToPayloadData payload) $ _ctxQueue ctx + void $ assertNotLeft =<< takeMVar mv + addNewPayload (_ctxPdb ctx) (_blockHeight header) payload unsafeInsertBlockHeaderDb (_ctxBdb ctx) header - -- FIXME FIXME FIXME: do at least some checks? + -- FIXME FIXME: do at least some checks? -- -------------------------------------------------------------------------- -- -- Misc Utils diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index 93e84da61d..418aa5c0b6 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -912,11 +912,12 @@ withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = reqQ <- newPactQueue 2000 bdb <- bdbio sqlEnv <- sqlEnvIO + sqlEnv2 <- sqlEnvIO mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig + runPactService version cid logger reqQ mempool bhdb pdb (sqlEnv, sqlEnv2) pactConfig return (a, (sqlEnv,reqQ,bdb)) stopPact (a, _) = cancel a @@ -971,8 +972,9 @@ withPactTestBlockDb version cid rdb mempoolIO pactConfig f = bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb sqlEnv <- startSqliteDb cid logger dir False + sqlEnv2 <- startSqliteDb cid logger dir False a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig + runPactService version cid logger reqQ mempool bhdb pdb (sqlEnv, sqlEnv2) pactConfig return (a, (sqlEnv,reqQ,bdb)) stopPact (a, (sqlEnv, _, _)) = cancel a >> stopSqliteDb sqlEnv