From 793cac9ab4f0d3874ed4d7a555950468d407544c Mon Sep 17 00:00:00 2001 From: Edmund Noble Date: Tue, 9 Apr 2024 22:14:09 -0400 Subject: [PATCH] feature: miner update stream reports refreshed blocks Github PR: https://github.com/kadena-io/chainweb-node/pull/1907 Change-Id: I16a7fc2fed6c5e09e73cbcb704c81da41b7ffa3b --- src/Chainweb/Chainweb/MinerResources.hs | 11 +- src/Chainweb/Miner/RestAPI/Server.hs | 139 ++++++++++++++++++++---- src/Chainweb/WebPactExecutionService.hs | 5 +- 3 files changed, 126 insertions(+), 29 deletions(-) diff --git a/src/Chainweb/Chainweb/MinerResources.hs b/src/Chainweb/Chainweb/MinerResources.hs index cb7ac96b5c..ce5696d7be 100644 --- a/src/Chainweb/Chainweb/MinerResources.hs +++ b/src/Chainweb/Chainweb/MinerResources.hs @@ -130,6 +130,8 @@ withMiningCoordination logger conf cdb inner !miners = S.toList (_coordinationMiners coordConf) <> [ _nodeMiner inNodeConf | _nodeMiningEnabled inNodeConf ] + chainLogger cid = addLabel ("chain", toText cid) + -- | THREAD: Keep a live-updated cache of Payloads for specific miners, such -- that when they request new work, the block can be instantly constructed -- without interacting with the Pact Queue. @@ -137,7 +139,7 @@ withMiningCoordination logger conf cdb inner primeWork :: TVar PrimedWork -> ChainId -> IO () primeWork tpw cid = forConcurrently_ miners $ \miner -> - runForever (logFunction logger) "primeWork" (go miner) + runForever (logFunction (chainLogger cid logger)) "primeWork" (go miner) where go :: Miner -> IO () go miner = do @@ -168,9 +170,8 @@ withMiningCoordination logger conf cdb inner -- in that case we just mark the payload as not stale let newBlock = fromMaybe staleBlockInProgress maybeNewBlock - logFunctionText logger Info - $ "refreshed block on chain " <> sshow cid - <> ", old and new tx count " + logFunctionText (chainLogger cid logger) Debug + $ "refreshed block, old and new tx count: " <> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions staleBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock) atomically $ modifyTVar' tpw $ @@ -204,7 +205,7 @@ withMiningCoordination logger conf cdb inner -- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader' then return $ NewBlockPayload ph emptyPayload - else trace (logFunction logger) + else trace (logFunction (chainLogger cid logger)) "Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock" () 1 (_pactNewBlock pact cid m True) diff --git a/src/Chainweb/Miner/RestAPI/Server.hs b/src/Chainweb/Miner/RestAPI/Server.hs index ee14a92cbe..3152147943 100644 --- a/src/Chainweb/Miner/RestAPI/Server.hs +++ b/src/Chainweb/Miner/RestAPI/Server.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} @@ -25,18 +26,21 @@ module Chainweb.Miner.RestAPI.Server import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay) +import Control.Lens import Control.Monad (when, unless) import Control.Monad.Catch (bracket, try, catches) import qualified Control.Monad.Catch as E import Control.Monad.Except (throwError) import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM (atomically) +import Control.Monad.STM import Data.Binary.Builder (fromByteString) +import qualified Data.HashMap.Strict as HM import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef, writeIORef) import qualified Data.Map.Strict as M import Data.Proxy (Proxy(..)) import qualified Data.Set as S +import qualified Data.Vector as V import Network.HTTP.Types.Status import Network.Wai (responseLBS) @@ -45,23 +49,26 @@ import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO) import Servant.API import Servant.Server +import System.LogLevel import System.Random -- internal modules -import Chainweb.Cut (Cut) +import Chainweb.BlockHeader import Chainweb.Cut.Create -import Chainweb.CutDB (awaitNewCutByChainIdStm, _cut) -import Chainweb.Logger (Logger) +import Chainweb.Logger import Chainweb.Miner.Config import Chainweb.Miner.Coordinator import Chainweb.Miner.Core import Chainweb.Miner.Pact import Chainweb.Miner.RestAPI (MiningApi) +import Chainweb.Pact.Service.Types(BlockInProgress(..), Transactions(..)) +import Chainweb.Payload import Chainweb.RestAPI.Utils -import Chainweb.Utils (EncodingException(..)) +import Chainweb.Utils import Chainweb.Utils.Serialization import Chainweb.Version +import Chainweb.WebPactExecutionService -- -------------------------------------------------------------------------- -- -- Work Handler @@ -114,14 +121,33 @@ solvedHandler mr (HeaderBytes bytes) = do -- -------------------------------------------------------------------------- -- -- Updates Handler +-- | Whether the work is outdated and should be thrown out immediately, +-- or there's just fresher work available and the old work is still valid. +data WorkChange = WorkOutdated | WorkRefreshed | WorkRegressed + deriving stock (Show, Eq) + updatesHandler :: Logger l => MiningCoordination l tbl -> ChainBytes -> Tagged Handler Application updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ do - cid <- runGetS decodeChainId cbytes - cv <- _cut (_coordCutDb mr) >>= newIORef + watchedChain <- runGetS decodeChainId cbytes + (watchedMiner, blockOnChain) <- atomically $ do + PrimedWork pw <- readTVar (_coordPrimedWork mr) + -- we check if `watchedMiner` has new work. we ignore + -- all other miner IDs, because we don't know which miner + -- is asking for updates, and if we watched all of them at once + -- we'd send too many messages. + -- this is deliberately partial, primed work will always have + -- at least one miner or that's an error + let (watchedMiner, minerBlocks) = HM.toList pw ^?! _head + -- and that miner will always have this chain(?) + -- if this chain doesn't exist yet just wait + blockOnChain <- maybe retry return + $ minerBlocks ^? ix watchedChain + return (watchedMiner, blockOnChain) + blockOnChainRef <- newIORef blockOnChain -- An update stream is closed after @timeout@ seconds. We add some jitter to -- availablility of streams is uniformily distributed over time and not @@ -130,36 +156,105 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d jitter <- randomRIO @Double (0.9, 1.1) timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000) - eventSourceAppIO (go timer cid cv) req resp + eventSourceAppIO (go timer watchedChain watchedMiner blockOnChainRef) req resp where timeout = _coordinationUpdateStreamTimeout $ _coordConf mr - -- | A nearly empty `ServerEvent` that signals the discovery of a new - -- `Cut`. Currently there is no need to actually send any information over - -- to the caller. + -- | A nearly empty `ServerEvent` that signals the work on this chain has + -- changed, and how. -- - f :: ServerEvent - f = ServerEvent (Just $ fromByteString "New Cut") Nothing [] + eventForWorkChangeType :: WorkChange -> ServerEvent + eventForWorkChangeType WorkOutdated = ServerEvent (Just $ fromByteString "New Cut") Nothing [] + eventForWorkChangeType WorkRefreshed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing [] + -- this is only different from WorkRefreshed for logging + eventForWorkChangeType WorkRegressed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing [] - go :: TVar Bool -> ChainId -> IORef Cut -> IO ServerEvent - go timer cid cv = do - c <- readIORef cv + go :: TVar Bool -> ChainId -> MinerId -> IORef (Maybe NewBlock) -> IO ServerEvent + go timer watchedChain watchedMiner blockOnChainRef = do + lastBlockOnChain <- readIORef blockOnChainRef -- await either a timeout or a new event - maybeCut <- atomically $ do + maybeNewBlock <- atomically $ do t <- readTVar timer if t then return Nothing - else Just <$> awaitNewCutByChainIdStm (_coordCutDb mr) cid c + else Just <$> awaitNewPrimedWork watchedChain watchedMiner lastBlockOnChain - case maybeCut of + case maybeNewBlock of Nothing -> return CloseEvent - Just c' -> do - writeIORef cv $! c' - return f + Just (workChange, currentBlockOnChain) -> do + writeIORef blockOnChainRef currentBlockOnChain + logFunctionText logger Debug $ + "sent update to miner on chain " <> toText watchedChain <> ": " <> sshow workChange + when (workChange == WorkRegressed) $ + logFunctionText logger Warn $ + "miner block regressed: " <> sshow currentBlockOnChain + return (eventForWorkChangeType workChange) + where + logger = addLabel ("chain", toText watchedChain) (_coordLogger mr) count = _coordUpdateStreamCount mr + awaitNewPrimedWork watchedChain watchedMiner lastBlockOnChain = do + PrimedWork pw <- readTVar (_coordPrimedWork mr) + let currentBlockOnChain = pw ^?! ix watchedMiner . ix watchedChain + case (lastBlockOnChain, currentBlockOnChain) of + + -- we just got new PrimedWork after it was outdated; + -- should only happen in case of a race, where the miner + -- subscribes for updates and its work becomes stale before + -- it receives an update + (Nothing, Just _) -> return (WorkOutdated, currentBlockOnChain) + + -- we just lost our PrimedWork because it's outdated, + -- miner should grab new work. + (Just _, Nothing) -> return (WorkOutdated, currentBlockOnChain) + + -- there was no work, and that hasn't changed. + (Nothing, Nothing) -> retry + + (Just (NewBlockInProgress lastBip), Just (NewBlockInProgress currentBip)) + | ParentHeader lastPh <- _blockInProgressParentHeader lastBip + , ParentHeader currentPh <- _blockInProgressParentHeader currentBip + , lastPh /= currentPh -> + -- we've got a new block on a new parent, we must've missed + -- the update where the old block became outdated. + -- miner should restart + return (WorkOutdated, currentBlockOnChain) + + | lastTlen <- V.length (_transactionPairs $ _blockInProgressTransactions lastBip) + , currentTlen <- V.length (_transactionPairs $ _blockInProgressTransactions currentBip) + , lastTlen /= currentTlen -> + if lastTlen < currentTlen + then + -- our refreshed block somehow has less transactions, + -- but the same parent header, log this as a bizarre case + return (WorkRegressed, currentBlockOnChain) + else + -- we've got a block that's been extended with new transactions + -- miner should restart + return (WorkRefreshed, currentBlockOnChain) + + -- no apparent change + | otherwise -> retry + (Just (NewBlockPayload lastPh lastPwo), Just (NewBlockPayload currentPh currentPwo)) + | lastPh /= currentPh -> + -- we've got a new block on a new parent, we must've missed + -- the update where the old block became outdated. + -- miner should restart. + return (WorkOutdated, currentBlockOnChain) + + | _payloadWithOutputsPayloadHash lastPwo /= _payloadWithOutputsPayloadHash currentPwo -> + -- this should be impossible because NewBlockPayload is for + -- when Pact is off so blocks can't be refreshed, but we've got + -- a different block with the same parent, so the miner should restart. + return (WorkRefreshed, currentBlockOnChain) + + -- no apparent change + | otherwise -> retry + (Just _, Just _) -> + error "awaitNewPrimedWork: impossible: NewBlockInProgress replaced by a NewBlockPayload" + withLimit resp inner = bracket (atomicModifyIORef' count $ \x -> (x - 1, x - 1)) (const $ atomicModifyIORef' count $ \x -> (x + 1, ())) diff --git a/src/Chainweb/WebPactExecutionService.hs b/src/Chainweb/WebPactExecutionService.hs index 9a96c8b7ae..31161c77a4 100644 --- a/src/Chainweb/WebPactExecutionService.hs +++ b/src/Chainweb/WebPactExecutionService.hs @@ -47,8 +47,9 @@ import Pact.Types.RowData (RowData) -- PactExecutionService data NewBlock - = NewBlockInProgress !BlockInProgress - | NewBlockPayload !ParentHeader !PayloadWithOutputs + = NewBlockInProgress !BlockInProgress + | NewBlockPayload !ParentHeader !PayloadWithOutputs + deriving Show newBlockToPayloadWithOutputs :: NewBlock -> PayloadWithOutputs newBlockToPayloadWithOutputs (NewBlockInProgress bip)