Skip to content

Commit

Permalink
wip: new rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
edmundnoble committed Nov 12, 2024
1 parent cd7b4dc commit ecc6bdf
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 27 deletions.
5 changes: 4 additions & 1 deletion chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ library
, Chainweb.Utils.RequestLog
, Chainweb.Utils.Rule
, Chainweb.Utils.Serialization
, Chainweb.Utils.Throttling
, Chainweb.Utils.TokenLimiting
, Chainweb.VerifierPlugin
, Chainweb.VerifierPlugin.Allow
, Chainweb.VerifierPlugin.Hyperlane.Announcement
Expand Down Expand Up @@ -361,6 +363,7 @@ library
, base64-bytestring-kadena == 0.1
, binary >= 0.8
, bytestring >= 0.10.12
, cache >= 0.1.1.2
, case-insensitive >= 1.2
, cassava >= 0.5.1
, chainweb-storage >= 0.1
Expand Down Expand Up @@ -434,7 +437,7 @@ library
, time >= 1.12.2
, tls >=1.9
, tls-session-manager >= 0.0
, token-bucket >= 0.1
, token-limiter >= 0.1
, transformers >= 0.5
, trifecta >= 2.1
, unliftio >= 0.2
Expand Down
62 changes: 40 additions & 22 deletions src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ import P2P.Peer

import qualified Pact.Types.ChainMeta as P
import qualified Pact.Types.Command as P
import qualified Chainweb.Utils.Throttling as Throttling

-- -------------------------------------------------------------------------- --
-- Chainweb Resources
Expand Down Expand Up @@ -718,28 +719,29 @@ runChainweb cw nowServing = do
logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want"
mkValidationMiddleware
else return id

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. throttle (_chainwebPutPeerThrottler cw)
. throttle (_chainwebMempoolThrottler cw)
. throttle (_chainwebThrottler cw)
. p2pRequestSizeLimit
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceApiValidationMiddleware
]
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" p2pThrottleEconomy $ \p2pThrottler ->
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "service" serviceThrottleEconomy $ \serviceThrottler ->

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. p2pRequestSizeLimit
. p2pThrottler
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceThrottler
. serviceApiValidationMiddleware
]

where

Expand Down Expand Up @@ -864,6 +866,22 @@ runChainweb cw nowServing = do
setMaxLengthForRequest (\_req -> pure $ Just $ 2 * 1024 * 1024) -- 2MB
defaultRequestSizeLimitSettings

p2pThrottleEconomy = Throttling.ThrottleEconomy
{ Throttling.requestCost = 10
, Throttling.requestBody100ByteCost = 1
, Throttling.responseBody100ByteCost = 2
, Throttling.maxBudget = 35_000
, Throttling.freeRate = 35_000
}

serviceThrottleEconomy = Throttling.ThrottleEconomy
{ Throttling.requestCost = 10
, Throttling.requestBody100ByteCost = 1
, Throttling.responseBody100ByteCost = 2
, Throttling.maxBudget = 50_000
, Throttling.freeRate = 50_000
}

-- Request size limit for the P2P API
--
-- NOTE: this may need to have to be adjusted if the p2p limits for batch
Expand Down
12 changes: 8 additions & 4 deletions src/Chainweb/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ import Configuration.Utils hiding (Error, Lens)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.TokenBucket
import Control.Concurrent.TokenLimiter
import Control.DeepSeq
import Control.Exception (SomeAsyncException(..), evaluate)
import Control.Lens hiding ((.=))
Expand Down Expand Up @@ -970,9 +970,13 @@ runForeverThrottled
-> IO ()
-> IO ()
runForeverThrottled logfun name burst rate a = mask $ \umask -> do
tokenBucket <- newTokenBucket
let config = defaultLimitConfig
{ maxBucketTokens = fromIntegral burst
, bucketRefillTokensPerSecond = fromIntegral rate
}
tokenBucket <- newRateLimiter config
logfun Debug $ "start " <> name
let runThrottled = tokenBucketWait tokenBucket burst rate >> a
let runThrottled = waitDebit config tokenBucket 1 >> a
go = do
forever (umask runThrottled) `catchAllSynchronous` \e ->
logfun Error $ name <> " failed: " <> sshow e <> ". Restarting ..."
Expand Down Expand Up @@ -1494,4 +1498,4 @@ unsafeHead msg = \case
unsafeTail :: HasCallStack => String -> [a] -> [a]
unsafeTail msg = \case
_ : xs -> xs
[] -> error $ "unsafeTail: empty list: " <> msg
[] -> error $ "unsafeTail: empty list: " <> msg
145 changes: 145 additions & 0 deletions src/Chainweb/Utils/Throttling.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Chainweb.Utils.Throttling
( ThrottleEconomy(..)
, ThrottledException(..)
, throttleMiddleware
) where

import Data.LogMessage
import Data.Text (Text)
import qualified Network.Wai as Wai
import qualified Network.Wai.Internal as Wai.Internal
import Chainweb.Utils.TokenLimiting
import Control.Exception.Safe
import Network.HTTP.Types.Status

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, macos-latest, true)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.10.1, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.6.6, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, ubuntu-22.04, true)

The import of ‘Network.HTTP.Types.Status’ is redundant
import qualified Data.ByteString as BS
import qualified Data.Text as T
import Data.Hashable
import Network.Socket (SockAddr(..))
import qualified Data.ByteString.Builder as BSB
import System.IO.Unsafe (unsafeInterleaveIO)
import qualified Data.ByteString.Lazy as LBS

data ThrottleEconomy = ThrottleEconomy
{ requestCost :: Int
, requestBody100ByteCost :: Int
, responseBody100ByteCost :: Int
, maxBudget :: Int
, freeRate :: Int
}

data ThrottledException = ThrottledException Text
deriving (Show, Exception)

hashWithSalt' :: Hashable a => a -> Int -> Int
hashWithSalt' = flip hashWithSalt

newtype HashableSockAddr = HashableSockAddr SockAddr
deriving newtype Eq
instance Hashable HashableSockAddr where
hashWithSalt salt (HashableSockAddr sockAddr) = case sockAddr of
SockAddrInet port hostAddr ->
-- constructor tag
hashWithSalt' (1 :: Word)
. hashWithSalt' (fromIntegral port :: Word)
. hashWithSalt' hostAddr
$ salt
SockAddrInet6 port flowInfo hostAddr scopeId ->
hashWithSalt' (2 :: Word)
. hashWithSalt' (fromIntegral port :: Word)
. hashWithSalt' flowInfo
. hashWithSalt' hostAddr
. hashWithSalt' scopeId
$ salt
SockAddrUnix str ->
hashWithSalt' (3 :: Word)
. hashWithSalt' str
$ salt

debitOrDie :: Hashable k => TokenLimitMap k -> (Text, k) -> Int -> IO ()
debitOrDie tokenLimitMap (name, k) cost = do
tryDebit cost k tokenLimitMap >>= \case
True -> return ()
False -> throwIO (ThrottledException name)

throttleMiddleware :: LogFunction -> Text -> ThrottleEconomy -> (Wai.Middleware -> IO r) -> IO r
throttleMiddleware logfun name ThrottleEconomy{..} k =
withTokenLimitMap logfun ("request-throttler-" <> name) limitCachePolicy limitConfig $ \tokenLimitMap -> do
k $ middleware tokenLimitMap
where
middleware tokenLimitMap app request respond = do
debitOrDie' requestCost
meteredRequest <- meterRequest debitOrDie' request
app meteredRequest (meterResponse debitOrDie' respond)
where
host = HashableSockAddr $ Wai.remoteHost request
hostText = T.pack $ show (Wai.remoteHost request)
debitOrDie' = debitOrDie tokenLimitMap (hostText, host)

limitCachePolicy = TokenLimitCachePolicy 30
limitConfig = defaultLimitConfig
{ maxBucketTokens = maxBudget
, initialBucketTokens = maxBudget
, bucketRefillTokensPerSecond = freeRate
}

meterRequest debit request
| requestBody100ByteCost == 0 = return request
| otherwise = case Wai.requestBodyLength request of
Wai.KnownLength requestBodyLen -> do
() <- debit $ (requestBody100ByteCost * fromIntegral requestBodyLen) `div` 100
return request
Wai.ChunkedBody ->
return (Wai.setRequestBodyChunks (getMeteredRequestBodyChunk debit request) request)

getMeteredRequestBodyChunk debit request = do
chunk <- Wai.getRequestBodyChunk request
-- charge *after* receiving a request body chunk
() <- debit $ (requestBody100ByteCost * BS.length chunk) `div` 100
return chunk

-- the only way to match on responses without using internal API is via
-- responseToStream, which converts any response into a streaming response.
-- unfortunately:
-- * all of the responses produced by servant are builder responses,
-- not streaming responses
-- * streaming responses are not supported by http2; we try to use http2
-- (see https://hackage.haskell.org/package/http2-5.3.5/docs/src/Network.HTTP2.Server.Run.html#runIO)
-- * a streaming response body may be less efficient than a builder
-- response body, in particular because it needs to use a chunked
-- encoding
--
meterResponse
:: (Int -> IO ())
-> (Wai.Response -> IO a) -> Wai.Response -> IO a
meterResponse _ respond response
| responseBody100ByteCost == 0 = respond response
meterResponse debit respond (Wai.Internal.ResponseStream status headers responseBody) = do
respond
$ Wai.responseStream status headers
$ meterStreamingResponseBody debit responseBody
meterResponse debit respond (Wai.Internal.ResponseBuilder status headers responseBody) = do
respond
<$> Wai.responseLBS status headers . LBS.fromChunks
=<< meterBuilderResponseBody debit (LBS.toChunks $ BSB.toLazyByteString responseBody)
meterResponse _ _ _ = error "unrecognized response type"

meterStreamingResponseBody debit responseBody send flush = responseBody
(\chunkBSBuilder -> do
let chunkBS = BS.toStrict (BSB.toLazyByteString chunkBSBuilder)
() <- debit $ (responseBody100ByteCost * BS.length chunkBS) `div` 100
-- charger *before* sending a response body chunk
send (BSB.byteString chunkBS)
)
flush
meterBuilderResponseBody debit (chunk:chunks) = unsafeInterleaveIO $ do
() <- debit $ (responseBody100ByteCost * BS.length chunk) `div` 100
(chunk:) <$> meterBuilderResponseBody debit chunks
meterBuilderResponseBody _ [] = return []
Loading

0 comments on commit ecc6bdf

Please sign in to comment.