From 65514e66b22536b3edf3d8c2c32b7cf839f8d7f4 Mon Sep 17 00:00:00 2001 From: chessai Date: Wed, 22 May 2024 13:20:05 -0500 Subject: [PATCH] add WorkState sum type that more properly represents PrimedWork state Change-Id: Ib1604f81a9e7f2f8542f28e0c467f417fcd00cc3 --- src/Chainweb/Chainweb/MinerResources.hs | 87 ++++++++++++++----------- src/Chainweb/CutDB.hs | 10 +-- src/Chainweb/Miner/Coordinator.hs | 38 ++++++++--- src/Chainweb/Miner/RestAPI/Server.hs | 31 ++++----- src/Chainweb/Pact/RestAPI/Server.hs | 2 +- 5 files changed, 97 insertions(+), 71 deletions(-) diff --git a/src/Chainweb/Chainweb/MinerResources.hs b/src/Chainweb/Chainweb/MinerResources.hs index 84a152059e..b911939bc8 100644 --- a/src/Chainweb/Chainweb/MinerResources.hs +++ b/src/Chainweb/Chainweb/MinerResources.hs @@ -99,7 +99,7 @@ withMiningCoordination logger conf cdb inner forM cids $ \cid -> do let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut)) newBlock <- throwIfNoHistory =<< getPayload cid miner (ParentHeader bh) - return (cid, Just newBlock) + return (cid, WorkReady newBlock) m <- newTVarIO initialPw c503 <- newIORef 0 @@ -132,6 +132,43 @@ withMiningCoordination logger conf cdb inner chainLogger cid = addLabel ("chain", toText cid) + -- we assume that this path always exists in PrimedWork and never delete it. + workForMiner :: Miner -> ChainId -> Traversal' PrimedWork WorkState + workForMiner miner cid = _Wrapped' . ix (view minerId miner) . ix cid + + periodicallyRefreshPayload :: TVar PrimedWork -> ChainId -> Miner -> IO a + periodicallyRefreshPayload tpw cid ourMiner = forever $ do + let delay = + timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf + threadDelay (fromIntegral @Micros @Int delay) + when (not $ v ^. versionCheats . disablePact) $ do + -- "stale" in the sense of not having all of the transactions + -- that it could. it still has the latest possible parent + mContinuableBlockInProgress <- atomically $ do + primed <- readTVar tpw <&> (^?! workForMiner ourMiner cid) + case primed of + WorkReady (NewBlockInProgress bip) -> return (Just bip) + WorkReady (NewBlockPayload {}) -> + error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed" + WorkAlreadyMined {} -> return Nothing + WorkStale -> return Nothing + + forM_ mContinuableBlockInProgress $ \continuableBlockInProgress -> do + maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress + -- if continuing returns NoHistory then the parent header + -- isn't available in the checkpointer right now. + -- in that case we just mark the payload as not stale. + let newBlock = case maybeNewBlock of + NoHistory -> continuableBlockInProgress + Historical b -> b + + logFunctionText (chainLogger cid logger) Debug + $ "refreshed block, old and new tx count: " + <> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock) + + atomically $ modifyTVar' tpw $ + workForMiner ourMiner cid .~ WorkReady (NewBlockInProgress newBlock) + -- | THREAD: Keep a live-updated cache of Payloads for specific miners, such -- that when they request new work, the block can be instantly constructed -- without interacting with the Pact Queue. @@ -146,47 +183,23 @@ withMiningCoordination logger conf cdb inner pw <- readTVarIO tpw let -- we assume that this path always exists in PrimedWork and never delete it. - ourMiner :: Traversal' PrimedWork (Maybe NewBlock) - ourMiner = _Wrapped' . ix (view minerId miner) . ix cid - let !outdatedPayload = pw ^?! ourMiner . _Just - let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload - let - periodicallyRefreshPayload = do - let delay = - timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf - threadDelay (fromIntegral @Micros @Int delay) - when (not $ v ^. versionCheats . disablePact) $ do - continuableBlockInProgress <- atomically $ do - primed <- readTVar tpw <&> (^?! (ourMiner . _Just)) - case primed of - NewBlockInProgress bip -> return bip - NewBlockPayload {} -> - error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed" - maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress - -- if continuing returns Nothing then the parent header - -- isn't available in the checkpointer right now. - -- in that case we just mark the payload as not stale - let newBlock = case maybeNewBlock of - NoHistory -> continuableBlockInProgress - Historical b -> b - - logFunctionText (chainLogger cid logger) Debug - $ "refreshed block, old and new tx count: " - <> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock) - - atomically $ modifyTVar' tpw $ - ourMiner .~ Just (NewBlockInProgress newBlock) - periodicallyRefreshPayload + ourMiner :: Traversal' PrimedWork WorkState + ourMiner = workForMiner miner cid + let !outdatedPayload = fromJuste $ pw ^? ourMiner + let outdatedParentHash = case outdatedPayload of + WorkReady outdatedBlock -> _blockHash (_parentHeader (newBlockParentHeader outdatedBlock)) + WorkAlreadyMined outdatedBlockHash -> outdatedBlockHash + WorkStale -> error "primeWork loop: Invariant Violation: Stale work should be an impossibility" newParent <- either ParentHeader id <$> race -- wait for a block different from what we've got primed work for - (awaitNewBlock cdb cid outdatedParent) + (awaitNewBlock cdb cid outdatedParentHash) -- in the meantime, periodically refresh the payload to make sure -- it has all of the transactions it can have - periodicallyRefreshPayload + (periodicallyRefreshPayload tpw cid miner) -- Temporarily block this chain from being considered for queries - atomically $ modifyTVar' tpw (ourMiner .~ Nothing) + atomically $ modifyTVar' tpw (ourMiner .~ WorkStale) -- Get a payload for the new block getPayload cid miner newParent >>= \case @@ -194,9 +207,9 @@ withMiningCoordination logger conf cdb inner logFunctionText (addLabel ("chain", toText cid) logger) Warn "current block is not in the checkpointer; halting primed work loop temporarily" approximateThreadDelay 1_000_000 - atomically $ modifyTVar' tpw (ourMiner .~ Just outdatedPayload) + atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload) Historical newBlock -> - atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock) + atomically $ modifyTVar' tpw (ourMiner .~ WorkReady newBlock) getPayload :: ChainId -> Miner -> ParentHeader -> IO (Historical NewBlock) getPayload cid m ph = diff --git a/src/Chainweb/CutDB.hs b/src/Chainweb/CutDB.hs index 1bdc2b032c..d958fbb8e9 100644 --- a/src/Chainweb/CutDB.hs +++ b/src/Chainweb/CutDB.hs @@ -349,16 +349,16 @@ awaitNewCutByChainId cdb cid c = atomically $ awaitNewCutByChainIdStm cdb cid c -- | As in `awaitNewCut`, but only updates when the header at the specified -- `ChainId` has changed, and only returns that new header. -awaitNewBlock :: CutDb tbl -> ChainId -> BlockHeader -> IO BlockHeader -awaitNewBlock cdb cid bh = atomically $ awaitNewBlockStm cdb cid bh +awaitNewBlock :: CutDb tbl -> ChainId -> BlockHash -> IO BlockHeader +awaitNewBlock cdb cid bHash = atomically $ awaitNewBlockStm cdb cid bHash -- | As in `awaitNewCut`, but only updates when the header at the specified -- `ChainId` has changed, and only returns that new header. -awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHeader -> STM BlockHeader -awaitNewBlockStm cdb cid bh = do +awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHash -> STM BlockHeader +awaitNewBlockStm cdb cid bHash = do c <- _cutStm cdb case HM.lookup cid (_cutMap c) of - Just bh' | _blockHash bh' /= _blockHash bh -> return bh' + Just bh' | _blockHash bh' /= bHash -> return bh' _ -> retry -- | As in `awaitNewCut`, but only updates when the specified `ChainId` has diff --git a/src/Chainweb/Miner/Coordinator.hs b/src/Chainweb/Miner/Coordinator.hs index d2a82a0dbf..d97b31bab8 100644 --- a/src/Chainweb/Miner/Coordinator.hs +++ b/src/Chainweb/Miner/Coordinator.hs @@ -33,6 +33,7 @@ module Chainweb.Miner.Coordinator , PrevTime(..) , ChainChoice(..) , PrimedWork(..) +, WorkState(..) , MiningCoordination(..) , NoAsscociatedPayload(..) @@ -59,7 +60,6 @@ import qualified Data.HashMap.Strict as HM import Data.IORef import Data.List(sort) import qualified Data.Map.Strict as M -import Data.Maybe import qualified Data.Text as T import qualified Data.Vector as V @@ -71,6 +71,7 @@ import System.LogLevel (LogLevel(..)) -- internal modules import Chainweb.BlockCreationTime +import Chainweb.BlockHash (BlockHash) import Chainweb.BlockHeader import Chainweb.Cut hiding (join) import Chainweb.Cut.Create @@ -132,14 +133,26 @@ data MiningCoordination logger tbl = MiningCoordination -- made as often as desired, without clogging the Pact queue. -- newtype PrimedWork = - PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock))) + PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId WorkState)) deriving newtype (Semigroup, Monoid) deriving stock Generic deriving anyclass (Wrapped) -resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork -resetPrimed mid cid (PrimedWork pw) = PrimedWork - $! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw +data WorkState + = WorkReady NewBlock + -- ^ We have work ready for the miner + | WorkAlreadyMined BlockHash + -- ^ A block with this parent has already been mined and submitted to the + -- cut pipeline - we don't want to mine it again. + | WorkStale + -- ^ No work has been produced yet with the latest parent block on this + -- chain. + deriving stock (Show) + +isWorkReady :: WorkState -> Bool +isWorkReady = \case + WorkReady {} -> True + _ -> False -- | Data shared between the mining threads represented by `newWork` and -- `publish`. @@ -202,20 +215,23 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do mpw <- atomically $ do PrimedWork pw <- readTVar tpw mpw <- maybe retry return (HM.lookup mid pw) - guard (any isJust mpw) + guard (any isWorkReady mpw) return mpw let mr = T2 <$> HM.lookup cid mpw <*> getCutExtension c cid case mr of - Just (T2 Nothing _) -> do + Just (T2 WorkStale _) -> do logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work" newWork logFun Anything eminer hdb pact tpw c + Just (T2 (WorkAlreadyMined _) _) -> do + logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has a payload that was already mined" + newWork logFun Anything eminer hdb pact tpw c Nothing -> do logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable" newWork logFun Anything eminer hdb pact tpw c - Just (T2 (Just newBlock) extension) -> do + Just (T2 (WorkReady newBlock) extension) -> do let ParentHeader primedParent = newBlockParentHeader newBlock if _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension)) then do @@ -264,7 +280,9 @@ publish lf cdb pwVar miner pwo s = do Right (bh, Just ch) -> do -- reset the primed payload for this cut extension - atomically $ modifyTVar pwVar $ resetPrimed miner (_chainId bh) + atomically $ modifyTVar pwVar $ \(PrimedWork pw) -> + PrimedWork $! HM.adjust (HM.insert (_chainId bh) (WorkAlreadyMined (_blockParent bh))) miner pw + addCutHashes cdb ch let bytes = sum . fmap (BS.length . _transactionBytes . fst) $ @@ -338,7 +356,7 @@ work mr mcid m = do "no chains have primed work" | otherwise -> "all chains with primed work may be stalled. chains with primed payloads: " - <> sshow (sort [cid | (cid, Just _) <- HM.toList mpw]) + <> sshow (sort [cid | (cid, WorkReady _) <- HM.toList mpw]) ) logDelays n' diff --git a/src/Chainweb/Miner/RestAPI/Server.hs b/src/Chainweb/Miner/RestAPI/Server.hs index 20a0be5c38..e2d6698bf5 100644 --- a/src/Chainweb/Miner/RestAPI/Server.hs +++ b/src/Chainweb/Miner/RestAPI/Server.hs @@ -144,10 +144,12 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d let (watchedMiner, minerBlocks) = HM.toList pw ^?! _head -- and that miner will always have this chain(?) -- if this chain doesn't exist yet just wait - blockOnChain <- maybe retry return - $ minerBlocks ^? ix watchedChain + blockOnChain <- do + case minerBlocks ^? ix watchedChain of + Just (WorkReady newBlock) -> return newBlock + _ -> retry return (watchedMiner, blockOnChain) - blockOnChainRef <- newIORef blockOnChain + blockOnChainRef <- newIORef (WorkReady blockOnChain) -- An update stream is closed after @timeout@ seconds. We add some jitter to -- availablility of streams is uniformily distributed over time and not @@ -169,7 +171,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d -- this is only different from WorkRefreshed for logging eventForWorkChangeType WorkRegressed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing [] - go :: TVar Bool -> ChainId -> MinerId -> IORef (Maybe NewBlock) -> IO ServerEvent + go :: TVar Bool -> ChainId -> MinerId -> IORef WorkState -> IO ServerEvent go timer watchedChain watchedMiner blockOnChainRef = do lastBlockOnChain <- readIORef blockOnChainRef @@ -200,20 +202,11 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d let currentBlockOnChain = pw ^?! ix watchedMiner . ix watchedChain case (lastBlockOnChain, currentBlockOnChain) of - -- we just got new PrimedWork after it was outdated; - -- should only happen in case of a race, where the miner - -- subscribes for updates and its work becomes stale before - -- it receives an update - (Nothing, Just _) -> return (WorkOutdated, currentBlockOnChain) - - -- we just lost our PrimedWork because it's outdated, - -- miner should grab new work. - (Just _, Nothing) -> return (WorkOutdated, currentBlockOnChain) - -- there was no work, and that hasn't changed. - (Nothing, Nothing) -> retry + (WorkStale, WorkStale) -> retry + (WorkAlreadyMined _, WorkAlreadyMined _) -> retry - (Just (NewBlockInProgress lastBip), Just (NewBlockInProgress currentBip)) + (WorkReady (NewBlockInProgress lastBip), WorkReady (NewBlockInProgress currentBip)) | ParentHeader lastPh <- _blockInProgressParentHeader lastBip , ParentHeader currentPh <- _blockInProgressParentHeader currentBip , lastPh /= currentPh -> @@ -237,7 +230,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d -- no apparent change | otherwise -> retry - (Just (NewBlockPayload lastPh lastPwo), Just (NewBlockPayload currentPh currentPwo)) + (WorkReady (NewBlockPayload lastPh lastPwo), WorkReady (NewBlockPayload currentPh currentPwo)) | lastPh /= currentPh -> -- we've got a new block on a new parent, we must've missed -- the update where the old block became outdated. @@ -252,9 +245,11 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d -- no apparent change | otherwise -> retry - (Just _, Just _) -> + (WorkReady _, WorkReady _) -> error "awaitNewPrimedWork: impossible: NewBlockInProgress replaced by a NewBlockPayload" + _ -> return (WorkOutdated, currentBlockOnChain) + withLimit resp inner = bracket (atomicModifyIORef' count $ \x -> (x - 1, x - 1)) (const $ atomicModifyIORef' count $ \x -> (x + 1, ())) diff --git a/src/Chainweb/Pact/RestAPI/Server.hs b/src/Chainweb/Pact/RestAPI/Server.hs index 72035d2b6c..f690dec32b 100644 --- a/src/Chainweb/Pact/RestAPI/Server.hs +++ b/src/Chainweb/Pact/RestAPI/Server.hs @@ -354,7 +354,7 @@ listenHandler logger cdb cid pact mem (ListenerRequest key) = do then do pure Nothing else do - Just <$!> CutDB.awaitNewBlockStm cdb cid lastBlockHeader + Just <$!> CutDB.awaitNewBlockStm cdb cid (_blockHash lastBlockHeader) -- TODO: make configurable defaultTimeout = 180 * 1000000 -- two minutes