Skip to content

Commit

Permalink
add WorkState sum type that more properly represents PrimedWork state
Browse files Browse the repository at this point in the history
Change-Id: Ib1604f81a9e7f2f8542f28e0c467f417fcd00cc3
  • Loading branch information
chessai authored and edmundnoble committed Jun 17, 2024
1 parent 05620a9 commit 65514e6
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 71 deletions.
87 changes: 50 additions & 37 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ withMiningCoordination logger conf cdb inner
forM cids $ \cid -> do
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
newBlock <- throwIfNoHistory =<< getPayload cid miner (ParentHeader bh)
return (cid, Just newBlock)
return (cid, WorkReady newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
Expand Down Expand Up @@ -132,6 +132,43 @@ withMiningCoordination logger conf cdb inner

chainLogger cid = addLabel ("chain", toText cid)

-- we assume that this path always exists in PrimedWork and never delete it.
workForMiner :: Miner -> ChainId -> Traversal' PrimedWork WorkState
workForMiner miner cid = _Wrapped' . ix (view minerId miner) . ix cid

periodicallyRefreshPayload :: TVar PrimedWork -> ChainId -> Miner -> IO a
periodicallyRefreshPayload tpw cid ourMiner = forever $ do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
mContinuableBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! workForMiner ourMiner cid)
case primed of
WorkReady (NewBlockInProgress bip) -> return (Just bip)
WorkReady (NewBlockPayload {}) ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
WorkAlreadyMined {} -> return Nothing
WorkStale -> return Nothing

forM_ mContinuableBlockInProgress $ \continuableBlockInProgress -> do
maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress
-- if continuing returns NoHistory then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale.
let newBlock = case maybeNewBlock of
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
workForMiner ourMiner cid .~ WorkReady (NewBlockInProgress newBlock)

-- | THREAD: Keep a live-updated cache of Payloads for specific miners, such
-- that when they request new work, the block can be instantly constructed
-- without interacting with the Pact Queue.
Expand All @@ -146,57 +183,33 @@ withMiningCoordination logger conf cdb inner
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (Maybe NewBlock)
ourMiner = _Wrapped' . ix (view minerId miner) . ix cid
let !outdatedPayload = pw ^?! ourMiner . _Just
let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload
let
periodicallyRefreshPayload = do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
continuableBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! (ourMiner . _Just))
case primed of
NewBlockInProgress bip -> return bip
NewBlockPayload {} ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress
-- if continuing returns Nothing then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale
let newBlock = case maybeNewBlock of
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
ourMiner .~ Just (NewBlockInProgress newBlock)
periodicallyRefreshPayload
ourMiner :: Traversal' PrimedWork WorkState
ourMiner = workForMiner miner cid
let !outdatedPayload = fromJuste $ pw ^? ourMiner
let outdatedParentHash = case outdatedPayload of
WorkReady outdatedBlock -> _blockHash (_parentHeader (newBlockParentHeader outdatedBlock))
WorkAlreadyMined outdatedBlockHash -> outdatedBlockHash
WorkStale -> error "primeWork loop: Invariant Violation: Stale work should be an impossibility"

newParent <- either ParentHeader id <$> race
-- wait for a block different from what we've got primed work for
(awaitNewBlock cdb cid outdatedParent)
(awaitNewBlock cdb cid outdatedParentHash)
-- in the meantime, periodically refresh the payload to make sure
-- it has all of the transactions it can have
periodicallyRefreshPayload
(periodicallyRefreshPayload tpw cid miner)

-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner .~ Nothing)
atomically $ modifyTVar' tpw (ourMiner .~ WorkStale)

-- Get a payload for the new block
getPayload cid miner newParent >>= \case
NoHistory -> do
logFunctionText (addLabel ("chain", toText cid) logger) Warn
"current block is not in the checkpointer; halting primed work loop temporarily"
approximateThreadDelay 1_000_000
atomically $ modifyTVar' tpw (ourMiner .~ Just outdatedPayload)
atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload)
Historical newBlock ->
atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock)
atomically $ modifyTVar' tpw (ourMiner .~ WorkReady newBlock)

getPayload :: ChainId -> Miner -> ParentHeader -> IO (Historical NewBlock)
getPayload cid m ph =
Expand Down
10 changes: 5 additions & 5 deletions src/Chainweb/CutDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,16 @@ awaitNewCutByChainId cdb cid c = atomically $ awaitNewCutByChainIdStm cdb cid c

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHeader -> IO BlockHeader
awaitNewBlock cdb cid bh = atomically $ awaitNewBlockStm cdb cid bh
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHash -> IO BlockHeader
awaitNewBlock cdb cid bHash = atomically $ awaitNewBlockStm cdb cid bHash

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHeader -> STM BlockHeader
awaitNewBlockStm cdb cid bh = do
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHash -> STM BlockHeader
awaitNewBlockStm cdb cid bHash = do
c <- _cutStm cdb
case HM.lookup cid (_cutMap c) of
Just bh' | _blockHash bh' /= _blockHash bh -> return bh'
Just bh' | _blockHash bh' /= bHash -> return bh'
_ -> retry

-- | As in `awaitNewCut`, but only updates when the specified `ChainId` has
Expand Down
38 changes: 28 additions & 10 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Chainweb.Miner.Coordinator
, PrevTime(..)
, ChainChoice(..)
, PrimedWork(..)
, WorkState(..)
, MiningCoordination(..)
, NoAsscociatedPayload(..)

Expand All @@ -59,7 +60,6 @@ import qualified Data.HashMap.Strict as HM
import Data.IORef
import Data.List(sort)
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Vector as V

Expand All @@ -71,6 +71,7 @@ import System.LogLevel (LogLevel(..))
-- internal modules

import Chainweb.BlockCreationTime
import Chainweb.BlockHash (BlockHash)
import Chainweb.BlockHeader
import Chainweb.Cut hiding (join)
import Chainweb.Cut.Create
Expand Down Expand Up @@ -132,14 +133,26 @@ data MiningCoordination logger tbl = MiningCoordination
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock)))
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId WorkState))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork
resetPrimed mid cid (PrimedWork pw) = PrimedWork
$! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw
data WorkState
= WorkReady NewBlock
-- ^ We have work ready for the miner
| WorkAlreadyMined BlockHash
-- ^ A block with this parent has already been mined and submitted to the
-- cut pipeline - we don't want to mine it again.
| WorkStale
-- ^ No work has been produced yet with the latest parent block on this
-- chain.
deriving stock (Show)

isWorkReady :: WorkState -> Bool
isWorkReady = \case
WorkReady {} -> True
_ -> False

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
Expand Down Expand Up @@ -202,20 +215,23 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any isJust mpw)
guard (any isWorkReady mpw)
return mpw
let mr = T2
<$> HM.lookup cid mpw
<*> getCutExtension c cid

case mr of
Just (T2 Nothing _) -> do
Just (T2 WorkStale _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (WorkAlreadyMined _) _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has a payload that was already mined"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (Just newBlock) extension) -> do
Just (T2 (WorkReady newBlock) extension) -> do
let ParentHeader primedParent = newBlockParentHeader newBlock
if _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension))
then do
Expand Down Expand Up @@ -264,7 +280,9 @@ publish lf cdb pwVar miner pwo s = do
Right (bh, Just ch) -> do

-- reset the primed payload for this cut extension
atomically $ modifyTVar pwVar $ resetPrimed miner (_chainId bh)
atomically $ modifyTVar pwVar $ \(PrimedWork pw) ->
PrimedWork $! HM.adjust (HM.insert (_chainId bh) (WorkAlreadyMined (_blockParent bh))) miner pw

addCutHashes cdb ch

let bytes = sum . fmap (BS.length . _transactionBytes . fst) $
Expand Down Expand Up @@ -338,7 +356,7 @@ work mr mcid m = do
"no chains have primed work"
| otherwise ->
"all chains with primed work may be stalled. chains with primed payloads: "
<> sshow (sort [cid | (cid, Just _) <- HM.toList mpw])
<> sshow (sort [cid | (cid, WorkReady _) <- HM.toList mpw])
)

logDelays n'
Expand Down
31 changes: 13 additions & 18 deletions src/Chainweb/Miner/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
let (watchedMiner, minerBlocks) = HM.toList pw ^?! _head
-- and that miner will always have this chain(?)
-- if this chain doesn't exist yet just wait
blockOnChain <- maybe retry return
$ minerBlocks ^? ix watchedChain
blockOnChain <- do
case minerBlocks ^? ix watchedChain of
Just (WorkReady newBlock) -> return newBlock
_ -> retry
return (watchedMiner, blockOnChain)
blockOnChainRef <- newIORef blockOnChain
blockOnChainRef <- newIORef (WorkReady blockOnChain)

-- An update stream is closed after @timeout@ seconds. We add some jitter to
-- availablility of streams is uniformily distributed over time and not
Expand All @@ -169,7 +171,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
-- this is only different from WorkRefreshed for logging
eventForWorkChangeType WorkRegressed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing []

go :: TVar Bool -> ChainId -> MinerId -> IORef (Maybe NewBlock) -> IO ServerEvent
go :: TVar Bool -> ChainId -> MinerId -> IORef WorkState -> IO ServerEvent
go timer watchedChain watchedMiner blockOnChainRef = do
lastBlockOnChain <- readIORef blockOnChainRef

Expand Down Expand Up @@ -200,20 +202,11 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
let currentBlockOnChain = pw ^?! ix watchedMiner . ix watchedChain
case (lastBlockOnChain, currentBlockOnChain) of

-- we just got new PrimedWork after it was outdated;
-- should only happen in case of a race, where the miner
-- subscribes for updates and its work becomes stale before
-- it receives an update
(Nothing, Just _) -> return (WorkOutdated, currentBlockOnChain)

-- we just lost our PrimedWork because it's outdated,
-- miner should grab new work.
(Just _, Nothing) -> return (WorkOutdated, currentBlockOnChain)

-- there was no work, and that hasn't changed.
(Nothing, Nothing) -> retry
(WorkStale, WorkStale) -> retry
(WorkAlreadyMined _, WorkAlreadyMined _) -> retry

(Just (NewBlockInProgress lastBip), Just (NewBlockInProgress currentBip))
(WorkReady (NewBlockInProgress lastBip), WorkReady (NewBlockInProgress currentBip))
| ParentHeader lastPh <- _blockInProgressParentHeader lastBip
, ParentHeader currentPh <- _blockInProgressParentHeader currentBip
, lastPh /= currentPh ->
Expand All @@ -237,7 +230,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d

-- no apparent change
| otherwise -> retry
(Just (NewBlockPayload lastPh lastPwo), Just (NewBlockPayload currentPh currentPwo))
(WorkReady (NewBlockPayload lastPh lastPwo), WorkReady (NewBlockPayload currentPh currentPwo))
| lastPh /= currentPh ->
-- we've got a new block on a new parent, we must've missed
-- the update where the old block became outdated.
Expand All @@ -252,9 +245,11 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d

-- no apparent change
| otherwise -> retry
(Just _, Just _) ->
(WorkReady _, WorkReady _) ->
error "awaitNewPrimedWork: impossible: NewBlockInProgress replaced by a NewBlockPayload"

_ -> return (WorkOutdated, currentBlockOnChain)

withLimit resp inner = bracket
(atomicModifyIORef' count $ \x -> (x - 1, x - 1))
(const $ atomicModifyIORef' count $ \x -> (x + 1, ()))
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ listenHandler logger cdb cid pact mem (ListenerRequest key) = do
then do
pure Nothing
else do
Just <$!> CutDB.awaitNewBlockStm cdb cid lastBlockHeader
Just <$!> CutDB.awaitNewBlockStm cdb cid (_blockHash lastBlockHeader)

-- TODO: make configurable
defaultTimeout = 180 * 1000000 -- two minutes
Expand Down

0 comments on commit 65514e6

Please sign in to comment.