Skip to content

Commit

Permalink
add WorkState sum type that more properly represents PrimedWork state
Browse files Browse the repository at this point in the history
Change-Id: Ib1604f81a9e7f2f8542f28e0c467f417fcd00cc3
  • Loading branch information
chessai authored and edmundnoble committed May 31, 2024
1 parent cafb310 commit 211f536
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 65 deletions.
93 changes: 54 additions & 39 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ withMiningCoordination logger conf cdb inner
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
maybeNewBlock <- _pactNewBlock pact cid miner True (ParentHeader bh)
newBlock <- evaluate $ fromJuste maybeNewBlock
return (cid, Just newBlock)
return (cid, WorkReady newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
Expand Down Expand Up @@ -132,6 +132,43 @@ withMiningCoordination logger conf cdb inner

chainLogger cid = addLabel ("chain", toText cid)

-- we assume that this path always exists in PrimedWork and never delete it.
workForMiner :: Miner -> ChainId -> Traversal' PrimedWork WorkState
workForMiner miner cid = _Wrapped' . ix (view minerId miner) . ix cid

-- This is run "forever", raced with awaiting a new block.
periodicallyRefreshPayload :: TVar PrimedWork -> ChainId -> Miner -> IO a
periodicallyRefreshPayload tpw cid ourMiner = do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
mStaleBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! workForMiner ourMiner cid)
case primed of
WorkReady (NewBlockInProgress bip) -> return (Just bip)
WorkReady (NewBlockPayload {}) ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
WorkAlreadyMined {} -> return Nothing
WorkStale -> return Nothing

forM_ mStaleBlockInProgress $ \staleBlockInProgress -> do
maybeNewBlock <- _pactContinueBlock pact cid staleBlockInProgress
-- if continuing returns Nothing then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale
let newBlock = fromMaybe staleBlockInProgress maybeNewBlock

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions staleBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
workForMiner ourMiner cid .~ WorkReady (NewBlockInProgress newBlock)
periodicallyRefreshPayload tpw cid ourMiner

-- | THREAD: Keep a live-updated cache of Payloads for specific miners, such
-- that when they request new work, the block can be instantly constructed
-- without interacting with the Pact Queue.
Expand All @@ -146,65 +183,43 @@ withMiningCoordination logger conf cdb inner
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (Maybe NewBlock)
ourMiner = _Wrapped' . ix (view minerId miner) . ix cid
let !outdatedPayload = pw ^?! ourMiner . _Just
let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload
let
periodicallyRefreshPayload = do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
staleBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! (ourMiner . _Just))
case primed of
NewBlockInProgress bip -> return bip
NewBlockPayload {} ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
maybeNewBlock <- _pactContinueBlock pact cid staleBlockInProgress
-- if continuing returns Nothing then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale
let newBlock = fromMaybe staleBlockInProgress maybeNewBlock

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions staleBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
ourMiner .~ Just (NewBlockInProgress newBlock)
periodicallyRefreshPayload
ourMiner :: Traversal' PrimedWork WorkState
ourMiner = workForMiner miner cid
let !outdatedPayload = pw ^?! ourMiner
let outdatedParentHash = case outdatedPayload of
WorkReady outdatedBlock -> _blockHash (_parentHeader (newBlockParentHeader outdatedBlock))
WorkAlreadyMined outdatedBlockHash -> outdatedBlockHash
WorkStale -> error "primeWork loop: Invariant Violation: Stale work should be an impossibility"

newParent <- either ParentHeader id <$> race
-- wait for a block different from what we've got primed work for
(awaitNewBlock cdb cid outdatedParent)
(awaitNewBlock cdb cid outdatedParentHash)
-- in the meantime, periodically refresh the payload to make sure
-- it has all of the transactions it can have
periodicallyRefreshPayload
(periodicallyRefreshPayload tpw cid miner)

-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner .~ Nothing)
atomically $ modifyTVar' tpw (ourMiner .~ WorkStale)

-- Get a payload for the new block
maybeNewBlock <- _pactNewBlock pact cid miner True newParent
`onException` do
logFunctionText (chainLogger cid logger) Error
"creating new payload failed, resetting to old payload"
atomically $ modifyTVar' tpw (ourMiner .~ Just outdatedPayload)
-- We couldn't produce a new payload, but we don't want it to stay stale
atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload)
approximateThreadDelay 1_000_000


case maybeNewBlock of
Nothing -> do
logFunctionText (chainLogger cid logger) Warn
"current block is not in the checkpointer; halting primed work loop temporarily"
approximateThreadDelay 1_000_000
atomically $ modifyTVar' tpw (ourMiner .~ Just outdatedPayload)
-- We couldn't produce a new payload, but we don't want it
-- to stay stale
atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload)
Just newBlock ->
atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock)
atomically $ modifyTVar' tpw (ourMiner .~ WorkReady newBlock)

pact :: PactExecutionService
pact = _webPactExecutionService $ view cutDbPactService cdb
Expand Down
10 changes: 5 additions & 5 deletions src/Chainweb/CutDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -348,16 +348,16 @@ awaitNewCutByChainId cdb cid c = atomically $ awaitNewCutByChainIdStm cdb cid c

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHeader -> IO BlockHeader
awaitNewBlock cdb cid bh = atomically $ awaitNewBlockStm cdb cid bh
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHash -> IO BlockHeader
awaitNewBlock cdb cid bHash = atomically $ awaitNewBlockStm cdb cid bHash

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHeader -> STM BlockHeader
awaitNewBlockStm cdb cid bh = do
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHash -> STM BlockHeader
awaitNewBlockStm cdb cid bHash = do
c <- _cutStm cdb
case HM.lookup cid (_cutMap c) of
Just bh' | _blockHash bh' /= _blockHash bh -> return bh'
Just bh' | _blockHash bh' /= bHash -> return bh'
_ -> retry

-- | As in `awaitNewCut`, but only updates when the specified `ChainId` has
Expand Down
38 changes: 28 additions & 10 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Chainweb.Miner.Coordinator
, PrevTime(..)
, ChainChoice(..)
, PrimedWork(..)
, WorkState(..)
, MiningCoordination(..)
, NoAsscociatedPayload(..)

Expand All @@ -59,7 +60,6 @@ import qualified Data.HashMap.Strict as HM
import Data.IORef
import Data.List(sort)
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Vector as V

Expand All @@ -71,6 +71,7 @@ import System.LogLevel (LogLevel(..))
-- internal modules

import Chainweb.BlockCreationTime
import Chainweb.BlockHash (BlockHash)
import Chainweb.BlockHeader
import Chainweb.Cut hiding (join)
import Chainweb.Cut.Create
Expand Down Expand Up @@ -132,14 +133,26 @@ data MiningCoordination logger tbl = MiningCoordination
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock)))
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId WorkState))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork
resetPrimed mid cid (PrimedWork pw) = PrimedWork
$! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw
data WorkState
= WorkReady NewBlock
-- ^ We have work ready for the miner
| WorkAlreadyMined BlockHash
-- ^ A block with this parent has already been mined and submitted to the
-- cut pipeline - we don't want to mine it again.
| WorkStale
-- ^ No work has been produced yet with the latest parent block on this
-- chain.
deriving stock (Show)

isWorkReady :: WorkState -> Bool
isWorkReady = \case
WorkReady {} -> True
_ -> False

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
Expand Down Expand Up @@ -202,20 +215,23 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any isJust mpw)
guard (any isWorkReady mpw)
return mpw
let mr = T2
<$> HM.lookup cid mpw
<*> getCutExtension c cid

case mr of
Just (T2 Nothing _) -> do
Just (T2 WorkStale _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (WorkAlreadyMined _) _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has a payload that was already mined"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (Just newBlock) extension) -> do
Just (T2 (WorkReady newBlock) extension) -> do
let ParentHeader primedParent = newBlockParentHeader newBlock
if _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension))
then do
Expand Down Expand Up @@ -264,7 +280,9 @@ publish lf cdb pwVar miner pwo s = do
Right (bh, Just ch) -> do

-- reset the primed payload for this cut extension
atomically $ modifyTVar pwVar $ resetPrimed miner (_chainId bh)
atomically $ modifyTVar pwVar $ \(PrimedWork pw) ->
PrimedWork $! HM.adjust (HM.insert (_chainId bh) (WorkAlreadyMined (_blockParent bh))) miner pw

addCutHashes cdb ch

let bytes = sum . fmap (BS.length . _transactionBytes . fst) $
Expand Down Expand Up @@ -338,7 +356,7 @@ work mr mcid m = do
"no chains have primed work"
| otherwise ->
"all chains with primed work may be stalled. chains with primed payloads: "
<> sshow (sort [cid | (cid, Just _) <- HM.toList mpw])
<> sshow (sort [cid | (cid, WorkReady _) <- HM.toList mpw])
)

logDelays n'
Expand Down
27 changes: 17 additions & 10 deletions src/Chainweb/Miner/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
let (watchedMiner, minerBlocks) = HM.toList pw ^?! _head
-- and that miner will always have this chain(?)
-- if this chain doesn't exist yet just wait
blockOnChain <- maybe retry return
$ minerBlocks ^? ix watchedChain
blockOnChain <- do
case minerBlocks ^? ix watchedChain of
Just (WorkReady newBlock) -> return newBlock
_ -> retry
return (watchedMiner, blockOnChain)
blockOnChainRef <- newIORef blockOnChain
blockOnChainRef <- newIORef (WorkReady blockOnChain)

-- An update stream is closed after @timeout@ seconds. We add some jitter to
-- availablility of streams is uniformily distributed over time and not
Expand All @@ -169,7 +171,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
-- this is only different from WorkRefreshed for logging
eventForWorkChangeType WorkRegressed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing []

go :: TVar Bool -> ChainId -> MinerId -> IORef (Maybe NewBlock) -> IO ServerEvent
go :: TVar Bool -> ChainId -> MinerId -> IORef WorkState -> IO ServerEvent
go timer watchedChain watchedMiner blockOnChainRef = do
lastBlockOnChain <- readIORef blockOnChainRef

Expand Down Expand Up @@ -204,16 +206,21 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
-- should only happen in case of a race, where the miner
-- subscribes for updates and its work becomes stale before
-- it receives an update
(Nothing, Just _) -> return (WorkOutdated, currentBlockOnChain)
(WorkStale, WorkReady _) -> return (WorkOutdated, currentBlockOnChain)
(WorkStale, WorkAlreadyMined _) -> return (WorkOutdated, currentBlockOnChain)
(WorkAlreadyMined _, WorkReady _) -> return (WorkOutdated, currentBlockOnChain)
(WorkAlreadyMined _, WorkStale) -> return (WorkOutdated, currentBlockOnChain)

-- we just lost our PrimedWork because it's outdated,
-- miner should grab new work.
(Just _, Nothing) -> return (WorkOutdated, currentBlockOnChain)
(WorkReady _, WorkStale) -> return (WorkOutdated, currentBlockOnChain)
(WorkReady _, WorkAlreadyMined _) -> return (WorkOutdated, currentBlockOnChain)

-- there was no work, and that hasn't changed.
(Nothing, Nothing) -> retry
(WorkStale, WorkStale) -> retry
(WorkAlreadyMined _, WorkAlreadyMined _) -> retry

(Just (NewBlockInProgress lastBip), Just (NewBlockInProgress currentBip))
(WorkReady (NewBlockInProgress lastBip), WorkReady (NewBlockInProgress currentBip))
| ParentHeader lastPh <- _blockInProgressParentHeader lastBip
, ParentHeader currentPh <- _blockInProgressParentHeader currentBip
, lastPh /= currentPh ->
Expand All @@ -237,7 +244,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d

-- no apparent change
| otherwise -> retry
(Just (NewBlockPayload lastPh lastPwo), Just (NewBlockPayload currentPh currentPwo))
(WorkReady (NewBlockPayload lastPh lastPwo), WorkReady (NewBlockPayload currentPh currentPwo))
| lastPh /= currentPh ->
-- we've got a new block on a new parent, we must've missed
-- the update where the old block became outdated.
Expand All @@ -252,7 +259,7 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d

-- no apparent change
| otherwise -> retry
(Just _, Just _) ->
(WorkReady _, WorkReady _) ->
error "awaitNewPrimedWork: impossible: NewBlockInProgress replaced by a NewBlockPayload"

withLimit resp inner = bracket
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ listenHandler logger cdb cid pact mem (ListenerRequest key) = do
then do
pure Nothing
else do
Just <$!> CutDB.awaitNewBlockStm cdb cid lastBlockHeader
Just <$!> CutDB.awaitNewBlockStm cdb cid (_blockHash lastBlockHeader)

-- TODO: make configurable
defaultTimeout = 180 * 1000000 -- two minutes
Expand Down

0 comments on commit 211f536

Please sign in to comment.