Skip to content

Commit 9f41267

Browse files
committed
wip: new rate limiter
1 parent 27bc1ee commit 9f41267

File tree

5 files changed

+348
-27
lines changed

5 files changed

+348
-27
lines changed

chainweb.cabal

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ library
249249
, Chainweb.Utils.RequestLog
250250
, Chainweb.Utils.Rule
251251
, Chainweb.Utils.Serialization
252+
, Chainweb.Utils.Throttling
253+
, Chainweb.Utils.TokenLimiting
252254
, Chainweb.VerifierPlugin
253255
, Chainweb.VerifierPlugin.Allow
254256
, Chainweb.VerifierPlugin.Hyperlane.Announcement
@@ -361,6 +363,7 @@ library
361363
, base64-bytestring-kadena == 0.1
362364
, binary >= 0.8
363365
, bytestring >= 0.10.12
366+
, cache >= 0.1.1.2
364367
, case-insensitive >= 1.2
365368
, cassava >= 0.5.1
366369
, chainweb-storage >= 0.1
@@ -434,7 +437,7 @@ library
434437
, time >= 1.12.2
435438
, tls >=1.9
436439
, tls-session-manager >= 0.0
437-
, token-bucket >= 0.1
440+
, token-limiter >= 0.1
438441
, transformers >= 0.5
439442
, trifecta >= 2.1
440443
, unliftio >= 0.2

src/Chainweb/Chainweb.hs

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ import P2P.Peer
185185

186186
import qualified Pact.Types.ChainMeta as P
187187
import qualified Pact.Types.Command as P
188+
import qualified Chainweb.Utils.Throttling as Throttling
188189

189190
-- -------------------------------------------------------------------------- --
190191
-- Chainweb Resources
@@ -718,28 +719,29 @@ runChainweb cw nowServing = do
718719
logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want"
719720
mkValidationMiddleware
720721
else return id
721-
722-
concurrentlies_
723-
724-
-- 1. Start serving Rest API
725-
[ (if tls then serve else servePlain)
726-
$ httpLog
727-
. throttle (_chainwebPutPeerThrottler cw)
728-
. throttle (_chainwebMempoolThrottler cw)
729-
. throttle (_chainwebThrottler cw)
730-
. p2pRequestSizeLimit
731-
. p2pValidationMiddleware
732-
733-
-- 2. Start Clients (with a delay of 500ms)
734-
, threadDelay 500000 >> clients
735-
736-
-- 3. Start serving local API
737-
, threadDelay 500000 >> do
738-
serveServiceApi
739-
$ serviceHttpLog
740-
. serviceRequestSizeLimit
741-
. serviceApiValidationMiddleware
742-
]
722+
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" p2pThrottleEconomy $ \p2pThrottler ->
723+
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "service" serviceThrottleEconomy $ \serviceThrottler ->
724+
725+
concurrentlies_
726+
727+
-- 1. Start serving Rest API
728+
[ (if tls then serve else servePlain)
729+
$ httpLog
730+
. p2pRequestSizeLimit
731+
. p2pThrottler
732+
. p2pValidationMiddleware
733+
734+
-- 2. Start Clients (with a delay of 500ms)
735+
, threadDelay 500000 >> clients
736+
737+
-- 3. Start serving local API
738+
, threadDelay 500000 >> do
739+
serveServiceApi
740+
$ serviceHttpLog
741+
. serviceRequestSizeLimit
742+
. serviceThrottler
743+
. serviceApiValidationMiddleware
744+
]
743745

744746
where
745747

@@ -864,6 +866,22 @@ runChainweb cw nowServing = do
864866
setMaxLengthForRequest (\_req -> pure $ Just $ 2 * 1024 * 1024) -- 2MB
865867
defaultRequestSizeLimitSettings
866868

869+
p2pThrottleEconomy = Throttling.ThrottleEconomy
870+
{ Throttling.requestCost = 10
871+
, Throttling.requestBody100ByteCost = 1
872+
, Throttling.responseBody100ByteCost = 2
873+
, Throttling.maxBudget = 35_000
874+
, Throttling.freeRate = 35_000
875+
}
876+
877+
serviceThrottleEconomy = Throttling.ThrottleEconomy
878+
{ Throttling.requestCost = 10
879+
, Throttling.requestBody100ByteCost = 1
880+
, Throttling.responseBody100ByteCost = 2
881+
, Throttling.maxBudget = 50_000
882+
, Throttling.freeRate = 50_000
883+
}
884+
867885
-- Request size limit for the P2P API
868886
--
869887
-- NOTE: this may need to have to be adjusted if the p2p limits for batch

src/Chainweb/Utils.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ import Configuration.Utils hiding (Error, Lens)
233233
import Control.Concurrent (threadDelay)
234234
import Control.Concurrent.Async
235235
import Control.Concurrent.MVar
236-
import Control.Concurrent.TokenBucket
236+
import Control.Concurrent.TokenLimiter
237237
import Control.DeepSeq
238238
import Control.Exception (SomeAsyncException(..), evaluate)
239239
import Control.Lens hiding ((.=))
@@ -970,9 +970,13 @@ runForeverThrottled
970970
-> IO ()
971971
-> IO ()
972972
runForeverThrottled logfun name burst rate a = mask $ \umask -> do
973-
tokenBucket <- newTokenBucket
973+
let config = defaultLimitConfig
974+
{ maxBucketTokens = fromIntegral burst
975+
, bucketRefillTokensPerSecond = fromIntegral rate
976+
}
977+
tokenBucket <- newRateLimiter config
974978
logfun Debug $ "start " <> name
975-
let runThrottled = tokenBucketWait tokenBucket burst rate >> a
979+
let runThrottled = waitDebit config tokenBucket 1 >> a
976980
go = do
977981
forever (umask runThrottled) `catchAllSynchronous` \e ->
978982
logfun Error $ name <> " failed: " <> sshow e <> ". Restarting ..."
@@ -1494,4 +1498,4 @@ unsafeHead msg = \case
14941498
unsafeTail :: HasCallStack => String -> [a] -> [a]
14951499
unsafeTail msg = \case
14961500
_ : xs -> xs
1497-
[] -> error $ "unsafeTail: empty list: " <> msg
1501+
[] -> error $ "unsafeTail: empty list: " <> msg

src/Chainweb/Utils/Throttling.hs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
{-# LANGUAGE OverloadedStrings #-}
2+
{-# LANGUAGE RankNTypes #-}
3+
{-# LANGUAGE RecordWildCards #-}
4+
{-# LANGUAGE LambdaCase #-}
5+
{-# LANGUAGE DeriveAnyClass #-}
6+
{-# LANGUAGE DerivingStrategies #-}
7+
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
8+
9+
module Chainweb.Utils.Throttling
10+
( ThrottleEconomy(..)
11+
, ThrottledException(..)
12+
, throttleMiddleware
13+
) where
14+
15+
import Data.LogMessage
16+
import Data.Text (Text)
17+
import qualified Network.Wai as Wai
18+
import qualified Network.Wai.Internal as Wai.Internal
19+
import Chainweb.Utils.TokenLimiting
20+
import Control.Exception.Safe
21+
import Network.HTTP.Types.Status
22+
import qualified Data.ByteString as BS
23+
import qualified Data.Text as T
24+
import Data.Hashable
25+
import Network.Socket (SockAddr(..))
26+
import qualified Data.ByteString.Builder as BSB
27+
import System.IO.Unsafe (unsafeInterleaveIO)
28+
import qualified Data.ByteString.Lazy as LBS
29+
30+
data ThrottleEconomy = ThrottleEconomy
31+
{ requestCost :: Int
32+
, requestBody100ByteCost :: Int
33+
, responseBody100ByteCost :: Int
34+
, maxBudget :: Int
35+
, freeRate :: Int
36+
}
37+
38+
data ThrottledException = ThrottledException Text
39+
deriving (Show, Exception)
40+
41+
hashWithSalt' :: Hashable a => a -> Int -> Int
42+
hashWithSalt' = flip hashWithSalt
43+
44+
newtype HashableSockAddr = HashableSockAddr SockAddr
45+
deriving newtype Eq
46+
instance Hashable HashableSockAddr where
47+
hashWithSalt salt (HashableSockAddr sockAddr) = case sockAddr of
48+
SockAddrInet port hostAddr ->
49+
-- constructor tag
50+
hashWithSalt' (1 :: Word)
51+
. hashWithSalt' (fromIntegral port :: Word)
52+
. hashWithSalt' hostAddr
53+
$ salt
54+
SockAddrInet6 port flowInfo hostAddr scopeId ->
55+
hashWithSalt' (2 :: Word)
56+
. hashWithSalt' (fromIntegral port :: Word)
57+
. hashWithSalt' flowInfo
58+
. hashWithSalt' hostAddr
59+
. hashWithSalt' scopeId
60+
$ salt
61+
SockAddrUnix str ->
62+
hashWithSalt' (3 :: Word)
63+
. hashWithSalt' str
64+
$ salt
65+
66+
debitOrDie :: Hashable k => TokenLimitMap k -> (Text, k) -> Int -> IO ()
67+
debitOrDie tokenLimitMap (name, k) cost = do
68+
tryDebit cost k tokenLimitMap >>= \case
69+
True -> return ()
70+
False -> throwIO (ThrottledException name)
71+
72+
throttleMiddleware :: LogFunction -> Text -> ThrottleEconomy -> (Wai.Middleware -> IO r) -> IO r
73+
throttleMiddleware logfun name ThrottleEconomy{..} k =
74+
withTokenLimitMap logfun ("request-throttler-" <> name) limitCachePolicy limitConfig $ \tokenLimitMap -> do
75+
k $ middleware tokenLimitMap
76+
where
77+
middleware tokenLimitMap app request respond = do
78+
debitOrDie' requestCost
79+
meteredRequest <- meterRequest debitOrDie' request
80+
app meteredRequest (meterResponse debitOrDie' respond)
81+
where
82+
host = HashableSockAddr $ Wai.remoteHost request
83+
hostText = T.pack $ show (Wai.remoteHost request)
84+
debitOrDie' = debitOrDie tokenLimitMap (hostText, host)
85+
86+
limitCachePolicy = TokenLimitCachePolicy 30
87+
limitConfig = defaultLimitConfig
88+
{ maxBucketTokens = maxBudget
89+
, initialBucketTokens = maxBudget
90+
, bucketRefillTokensPerSecond = freeRate
91+
}
92+
93+
meterRequest debit request
94+
| requestBody100ByteCost == 0 = return request
95+
| otherwise = case Wai.requestBodyLength request of
96+
Wai.KnownLength requestBodyLen -> do
97+
() <- debit $ (requestBody100ByteCost * fromIntegral requestBodyLen) `div` 100
98+
return request
99+
Wai.ChunkedBody ->
100+
return (Wai.setRequestBodyChunks (getMeteredRequestBodyChunk debit request) request)
101+
102+
getMeteredRequestBodyChunk debit request = do
103+
chunk <- Wai.getRequestBodyChunk request
104+
-- charge *after* receiving a request body chunk
105+
() <- debit $ (requestBody100ByteCost * BS.length chunk) `div` 100
106+
return chunk
107+
108+
-- the only way to match on responses without using internal API is via
109+
-- responseToStream, which converts any response into a streaming response.
110+
-- unfortunately:
111+
-- * all of the responses produced by servant are builder responses,
112+
-- not streaming responses
113+
-- * streaming responses are not supported by http2; we try to use http2
114+
-- (see https://hackage.haskell.org/package/http2-5.3.5/docs/src/Network.HTTP2.Server.Run.html#runIO)
115+
-- * a streaming response body may be less efficient than a builder
116+
-- response body, in particular because it needs to use a chunked
117+
-- encoding
118+
--
119+
meterResponse
120+
:: (Int -> IO ())
121+
-> (Wai.Response -> IO a) -> Wai.Response -> IO a
122+
meterResponse _ respond response
123+
| responseBody100ByteCost == 0 = respond response
124+
meterResponse debit respond (Wai.Internal.ResponseStream status headers responseBody) = do
125+
respond
126+
$ Wai.responseStream status headers
127+
$ meterStreamingResponseBody debit responseBody
128+
meterResponse debit respond (Wai.Internal.ResponseBuilder status headers responseBody) = do
129+
respond
130+
<$> Wai.responseLBS status headers . LBS.fromChunks
131+
=<< meterBuilderResponseBody debit (LBS.toChunks $ BSB.toLazyByteString responseBody)
132+
meterResponse _ _ _ = error "unrecognized response type"
133+
134+
meterStreamingResponseBody debit responseBody send flush = responseBody
135+
(\chunkBSBuilder -> do
136+
let chunkBS = BS.toStrict (BSB.toLazyByteString chunkBSBuilder)
137+
() <- debit $ (responseBody100ByteCost * BS.length chunkBS) `div` 100
138+
-- charger *before* sending a response body chunk
139+
send (BSB.byteString chunkBS)
140+
)
141+
flush
142+
meterBuilderResponseBody debit (chunk:chunks) = unsafeInterleaveIO $ do
143+
() <- debit $ (responseBody100ByteCost * BS.length chunk) `div` 100
144+
(chunk:) <$> meterBuilderResponseBody debit chunks
145+
meterBuilderResponseBody _ [] = return []

0 commit comments

Comments
 (0)