From a9ee635caf752507043464dd7cb7f9728b7860a8 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Thu, 9 May 2024 13:23:51 -0700 Subject: [PATCH] Stress tests and abortive close See `docs/stress-tests.md` Also adds an abortive close HTTP2/TCP option. --- .gitignore | 3 + docs/stress-tests.md | 139 +++++++ grapesy.cabal | 99 +++-- src/Network/GRPC/Client/Connection.hs | 13 +- src/Network/GRPC/Common/Compression.hs | 4 +- src/Network/GRPC/Common/HTTP2Settings.hs | 17 + src/Network/GRPC/Server/Run.hs | 13 +- src/Network/GRPC/Server/StreamType.hs | 2 +- test-stress/Main.hs | 32 +- test-stress/Test/Stress/Client.hs | 242 ++++++++--- test-stress/Test/Stress/Cmdline.hs | 466 ++++++++++++++++++---- test-stress/Test/Stress/Common.hs | 25 ++ test-stress/Test/Stress/Driver.hs | 333 ++++++++++++++++ test-stress/Test/Stress/Driver/Summary.hs | 112 ++++++ test-stress/Test/Stress/Server.hs | 124 +++++- test-stress/Test/Stress/Server/API.hs | 14 - util/Network/GRPC/Util/Session/Channel.hs | 1 + 17 files changed, 1442 insertions(+), 197 deletions(-) create mode 100644 docs/stress-tests.md create mode 100644 test-stress/Test/Stress/Common.hs create mode 100644 test-stress/Test/Stress/Driver.hs create mode 100644 test-stress/Test/Stress/Driver/Summary.hs delete mode 100644 test-stress/Test/Stress/Server/API.hs diff --git a/.gitignore b/.gitignore index 32ec18ac..43c7df5d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ dist-newstyle *.eventlog.html *.hp *.prof +*.aux +*.ps +*.svg perf.data perf.data.* strace.log diff --git a/docs/stress-tests.md b/docs/stress-tests.md new file mode 100644 index 00000000..98f941ab --- /dev/null +++ b/docs/stress-tests.md @@ -0,0 +1,139 @@ +# Stress Tests + +The stress tests are intended to test the performance of `grapesy`, primarily +ensuring that memory usage stays constant even under extreme load. The building +blocks of the test are the stress test server and client applications. These can +be ran manually as separate processes using (e.g.): + +``` +cabal run test-stress -- server ... +cabal run test-stress -- client ... +``` + +However, to automate the specific extreme performance scenarios we're interested +in, we also have a driver application that will automically spin up a selection +of servers and clients we care about: + +``` +cabal run test-stress -- driver +``` + +The driver will automically kill and restart certain clients and servers to test +"unstable" performance scenarios (e.g. do we leak memory when we attempt to +reconnect to disconnected servers?). + +To enable verbose debugging traces of any of these components, pass `-v` as a +top-level option, e.g.: + +``` +cabal run test-stress -- -v client ... +``` + +**Warning:** Passing `-v` and `driver` will result in *a lot* of output. + +The rest of this document will explain how to use each of these components on +the command-line and the specifics regarding what each of them is doing. + +# The API + +The stress test client and server communicate via the four major types of RPCs: + +1. **Non-streaming:** Client sends one message, server sends one message back. +2. **Client streaming:** Client sends `N` messages, server sends one message + back. +3. **Server streaming:** Client sends one message specifying `N`, server sends + `N` messages back. +4. **Bidirectional streaming:** Client sends one messages specifying `N`, server + and client take turns sending one message back and forth until each has sent + `N` messages. + +These are the "atoms" of communication between the stress test clients and +servers. The messages sent back and forth are random lazy bytestrings ranging in +length from 128 to 256 bytes (see [here](../test-stress/Test/Stress/Common.hs)). + +# The Server + +The server is the simplest to run. Simply specify the port it should bind to and +whether it should use TLS. For example, to start a secure server on port 50051: + +``` +cabal run test-stress -- server --secure --port 50051 +``` + +Use the `--help` flag to see all available options. + +By default, it will use the certificates and keys in the [`../data`](../data/) +directory, just like the demo server. See the [demo server's +documentation](./demo-server.md) for more information. + +# The Client + +The client takes options that specify the server it should connect to, how many +times it should connect to the server, and what calls it should execute on those +connections. For example, to run a client that opens 3 connections to an +insecure server at port 50051 and makes a client streaming call with 1234 +messages and a server streaming call with 500 messages, repeating those calls on +each connection 10 times. + +```bash +cabal run test-stress -- client \ + --port 50051 \ + --num-connections 3 \ + --num-calls 5 \ + --client-streaming 1234 \ + --server-streaming 500 +``` + +Clients also support running each connection concurrently via the `--concurrent` +option. Clients can connect to secure servers (using the default certificates) +using the `--secure` option, but can be configured to use non-default +certificates via other command line options just like the demo client. See the +`--help` client option and the [demo client's documentation](./demo-client.md) +for more information. + +# The Driver + +The driver spawns a variety of servers and clients in separate processes, and +runs for a total of 60 seconds. Each process is run with a specific heap limit +(via the `-M` RTS flag), and the application will terminate with a non-zero exit +code if any of the processes are killed with a `heap overflow` exception. + +## Servers + +The driver spawns four total server processes. Each server is either secure or +insecure, and either stable or unstable. Secure servers require TLS, insecure +require non-TLS. Unstable servers are killed and restarted intermittently, +stable servers are left running for the duration of the driver's execution. + +## Clients + +The driver spawns 56 total client processes. Similar to the servers, each client +is either secure or insecure and stable or unstable. Each client only +communicates with one of the servers. Obviously, (in)secure clients only +communicate with an (in)secure servers. Each client-server pair only +communicates in one of the following "patterns": + +* **Many connections:** Open a connection, make a single non-streaming call, + repeat indefinitely. Think of this as calling `withConnection` over and over. +* **Many non-streaming calls:** Open a connection. Make a single non-streaming + call, repeat indefinitely. Think of this as calling `withRPC` and sending a + single message back and forth on a single connection over and over. +* **Client streaming:** Open a connection. Make a non-stop client streaming + call. +* **Many client streaming calls:** Open a connection. Make a client streaming + call with a few messages, repeat indefinitely. +* **Server streaming:** Same as client streaming, but server sends messages + non-stop. +* **Many server streaming calls:** Same as client streaming, but server streams. +* **Bidirectional streaming:** Same as client streaming, but both client and + server send messages indefinitely. +* **Many bidirectional streaming calls:** Same as client streaming, but both + client and server stream messages. + +## Summary chart generation + +The stress test driver can optionally create summary heap profile charts for the +stable components after the test is finished by passing the `--gen-charts` flag. +This will cause each stable component to emit an event log with heap profiling +events. The driver will parse the event logs and generate SVG plots of the +memory usage over time. diff --git a/grapesy.cabal b/grapesy.cabal index bc1d09fe..97b66639 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -216,8 +216,8 @@ library , http2-tls >= 0.4.1 && < 0.5 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 - , network >= 3.1 && < 3.3 - , network-run >= 0.4 && < 0.5 + , network >= 3.2.4 && < 3.3 + , network-run >= 0.4.1 && < 0.5 , proto-lens >= 0.7 && < 0.8 , proto-lens-runtime >= 0.7 && < 0.8 , random >= 1.2 && < 1.3 @@ -356,7 +356,7 @@ test-suite test-grapesy , http2 >= 5.3.4 && < 5.4 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 - , network >= 3.1 && < 3.3 + , network >= 3.2.4 && < 3.3 , prettyprinter >= 1.7 && < 1.8 , prettyprinter-ansi-terminal >= 1.1 && < 1.2 , proto-lens >= 0.7 && < 0.8 @@ -412,16 +412,16 @@ executable demo-client , grapesy build-depends: -- External dependencies - , async >= 2.2 && < 2.3 - , bytestring >= 0.10 && < 0.13 - , conduit >= 1.3 && < 1.4 - , contra-tracer >= 0.2 && < 0.3 - , exceptions >= 0.10 && < 0.11 - , network >= 3.1 && < 3.3 - , optparse-applicative >= 0.16 && < 0.19 - , proto-lens-runtime >= 0.7 && < 0.8 - , text >= 1.2 && < 2.2 - , transformers >= 0.5 && < 0.7 + , async >= 2.2 && < 2.3 + , bytestring >= 0.10 && < 0.13 + , conduit >= 1.3 && < 1.4 + , contra-tracer >= 0.2 && < 0.3 + , exceptions >= 0.10 && < 0.11 + , network >= 3.2.4 && < 3.3 + , optparse-applicative >= 0.16 && < 0.19 + , proto-lens-runtime >= 0.7 && < 0.8 + , text >= 1.2 && < 2.2 + , transformers >= 0.5 && < 0.7 if !flag(build-demo) buildable: @@ -458,45 +458,74 @@ executable demo-server , grapesy build-depends: -- External dependencies - , aeson >= 1.5 && < 2.3 - , bytestring >= 0.10 && < 0.13 - , containers >= 0.6 && < 0.8 - , exceptions >= 0.10 && < 0.11 - , network >= 3.1 && < 3.3 - , optparse-applicative >= 0.16 && < 0.19 - , proto-lens-runtime >= 0.7 && < 0.8 - , text >= 1.2 && < 2.2 - , time >= 1.9 && < 1.13 - , transformers >= 0.5 && < 0.7 + , aeson >= 1.5 && < 2.3 + , bytestring >= 0.10 && < 0.13 + , containers >= 0.6 && < 0.8 + , exceptions >= 0.10 && < 0.11 + , network >= 3.2.4 && < 3.3 + , optparse-applicative >= 0.16 && < 0.19 + , proto-lens-runtime >= 0.7 && < 0.8 + , text >= 1.2 && < 2.2 + , time >= 1.9 && < 1.13 + , transformers >= 0.5 && < 0.7 if !flag(build-demo) buildable: False -executable test-stress +test-suite test-stress import: - , lang , common-executable-flags + , lang + default-extensions: + RecordWildCards + type: + exitcode-stdio-1.0 hs-source-dirs: test-stress + , proto main-is: Main.hs other-modules: Test.Stress.Client Test.Stress.Cmdline + Test.Stress.Common + Test.Stress.Driver + Test.Stress.Driver.Summary Test.Stress.Server - Test.Stress.Server.API + + Proto.API.Trivial + + Paths_grapesy + autogen-modules: + Paths_grapesy build-depends: -- Internal dependencies , grapesy build-depends: -- External dependencies - , optparse-applicative >= 0.16 && < 0.19 + , async >= 2.2 && < 2.3 + , bytestring >= 0.10 && < 0.13 + , Chart >= 1.9 && < 1.10 + , Chart-diagrams >= 1.9 && < 1.10 + , directory >= 1.3 && < 1.4 + , exceptions >= 0.10 && < 0.11 + , filepath >= 1.4.2.1 && < 1.6 + , ghc-events >= 0.17 && < 0.20 + , http2 >= 5.3.4 && < 5.4 + , network >= 3.2.4 && < 3.3 + , optparse-applicative >= 0.16 && < 0.19 + , process >= 1.6.12 && < 1.7 + , tls >= 1.7 && < 2.2 + , random >= 1.2 && < 1.3 if !flag(build-stress-test) buildable: False + if flag(snappy) + cpp-options: -DSNAPPY + test-suite grapesy-interop import: , lang @@ -560,14 +589,14 @@ test-suite grapesy-interop build-depends: , grapesy build-depends: - , ansi-terminal >= 1.1 && < 1.2 - , bytestring >= 0.10 && < 0.13 - , exceptions >= 0.10 && < 0.11 - , mtl >= 2.2 && < 2.4 - , network >= 3.1 && < 3.3 - , optparse-applicative >= 0.16 && < 0.19 - , proto-lens-runtime >= 0.7 && < 0.8 - , text >= 1.2 && < 2.2 + , ansi-terminal >= 1.1 && < 1.2 + , bytestring >= 0.10 && < 0.13 + , exceptions >= 0.10 && < 0.11 + , mtl >= 2.2 && < 2.4 + , network >= 3.2.4 && < 3.3 + , optparse-applicative >= 0.16 && < 0.19 + , proto-lens-runtime >= 0.7 && < 0.8 + , text >= 1.2 && < 2.2 benchmark grapesy-kvstore import: diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index 070474e5..c94d131a 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -598,13 +598,20 @@ overrideRateLimits connParams clientConfig = clientConfig { openClientSocket :: HTTP2Settings -> AddrInfo -> IO Socket openClientSocket http2Settings = - Run.openClientSocketWithOptions socketOptions + Run.openClientSocketWithOpts socketOptions where - socketOptions :: [(SocketOption, Int)] + socketOptions :: [(SocketOption, SockOptValue)] socketOptions = concat [ - [ (NoDelay, 1) + [ ( NoDelay + , SockOptValue @Int 1 + ) | http2TcpNoDelay http2Settings ] + , [ ( Linger + , SockOptValue $ StructLinger { sl_onoff = 1, sl_linger = 0 } + ) + | http2TcpAbortiveClose http2Settings + ] ] -- | Write-buffer size diff --git a/src/Network/GRPC/Common/Compression.hs b/src/Network/GRPC/Common/Compression.hs index d50b840b..c273374c 100644 --- a/src/Network/GRPC/Common/Compression.hs +++ b/src/Network/GRPC/Common/Compression.hs @@ -101,8 +101,8 @@ only compr = chooseFirst (compr :| [noCompression]) -- | Insist on the specified algorithm, /no matter what the peer offers/ -- --- This is dangerous: if the peer does not supported the specified algorithm, --- it will be unable to decompress any messages. Primarily used for testing. +-- This is dangerous: if the peer does not support the specified algorithm, it +-- will be unable to decompress any messages. Primarily used for testing. -- -- See also 'only'. insist :: Compression -> Negotation diff --git a/src/Network/GRPC/Common/HTTP2Settings.hs b/src/Network/GRPC/Common/HTTP2Settings.hs index 242f4c73..83514442 100644 --- a/src/Network/GRPC/Common/HTTP2Settings.hs +++ b/src/Network/GRPC/Common/HTTP2Settings.hs @@ -70,6 +70,22 @@ data HTTP2Settings = HTTP2Settings { -- TL;DR: leave this at the default unless you know what you are doing. , http2TcpNoDelay :: Bool + -- | Set @SO_LINGER@ to a value of 0 + -- + -- Instead of following the normal shutdown sequence to close the TCP + -- connection, this will just send a @RST@ packet and immediately discard + -- the connection, freeing the local port. + -- + -- This should /not/ be enabled in the vast majority of cases. It is only + -- useful in specific scenarios, such as stress testing, where resource + -- (e.g. port) exhaustion is a greater concern than protocol adherence. + -- Even in such scenarios scenarios, it probably only makes sense to + -- enable this option on the client since they will be using a new + -- ephemeral port for each connection (unlike the server). + -- + -- TL;DR: leave this at the default unless you know what you are doing. + , http2TcpAbortiveClose :: Bool + -- | Ping rate limit -- -- This setting is specific to the [@http2@ @@ -169,6 +185,7 @@ defaultHTTP2Settings = HTTP2Settings { http2MaxConcurrentStreams = defMaxConcurrentStreams , http2StreamWindowSize = defInitialStreamWindowSize , http2ConnectionWindowSize = defInitialConnectionWindowSize + , http2TcpAbortiveClose = False , http2TcpNoDelay = True , http2OverridePingRateLimit = Just 100 , http2OverrideEmptyFrameRateLimit = Nothing diff --git a/src/Network/GRPC/Server/Run.hs b/src/Network/GRPC/Server/Run.hs index 9c0ad894..dc1d772b 100644 --- a/src/Network/GRPC/Server/Run.hs +++ b/src/Network/GRPC/Server/Run.hs @@ -61,6 +61,7 @@ data ServerConfig = ServerConfig { -- Set to 'Nothing' to disable. , serverSecure :: Maybe SecureConfig } + deriving (Show) -- | Offer insecure connection (no TLS) data InsecureConfig = InsecureConfig { @@ -296,9 +297,12 @@ runInsecure params cfg socketTMVar server = do (insecurePort cfg) $ \listenSock -> withTimeManager $ \mgr -> Run.runTCPServerWithSocket listenSock $ \clientSock -> do - when (http2TcpNoDelay serverHTTP2Settings) $ + when (http2TcpNoDelay serverHTTP2Settings) $ do -- See description of 'withServerSocket' - setSockOpt clientSock NoDelay True + setSocketOption clientSock NoDelay 1 + when (http2TcpAbortiveClose serverHTTP2Settings) $ do + setSockOpt clientSock Linger + (StructLinger { sl_onoff = 1, sl_linger = 0 }) withConfigForInsecure mgr clientSock $ \config -> HTTP2.run serverConfig config server where @@ -359,7 +363,10 @@ runSecure params cfg socketTMVar server = do "h2" $ \mgr backend -> do when (http2TcpNoDelay serverHTTP2Settings) $ -- See description of 'withServerSocket' - setSockOpt (HTTP2.TLS.requestSock backend) NoDelay True + setSocketOption (HTTP2.TLS.requestSock backend) NoDelay 1 + when (http2TcpAbortiveClose serverHTTP2Settings) $ do + setSockOpt (HTTP2.TLS.requestSock backend) Linger + (StructLinger { sl_onoff = 1, sl_linger = 0 }) withConfigForSecure mgr backend $ \config -> HTTP2.run serverConfig config server where diff --git a/src/Network/GRPC/Server/StreamType.hs b/src/Network/GRPC/Server/StreamType.hs index fa4eff7d..9f9c505d 100644 --- a/src/Network/GRPC/Server/StreamType.hs +++ b/src/Network/GRPC/Server/StreamType.hs @@ -111,7 +111,7 @@ class FromStreamingHandler (styp :: StreamingType) where -- | Construct 'RpcHandler' from streaming type specific handler -- -- Most applications will probably not need to call this function directly, - -- instead relying on 'fromMethods'/'fromServices'. If however you want to + -- instead relying on 'fromMethods'\/'fromServices'. If however you want to -- construct a list of 'RpcHandler's manually, without a type-level -- specification of the server's API, you can use 'fromStreamingHandler'. fromStreamingHandler :: forall k (rpc :: k) m. diff --git a/test-stress/Main.hs b/test-stress/Main.hs index ae63092e..0217cedd 100644 --- a/test-stress/Main.hs +++ b/test-stress/Main.hs @@ -2,19 +2,39 @@ module Main (main) where import Test.Stress.Client import Test.Stress.Cmdline +import Test.Stress.Driver +import Test.Stress.Common (say) import Test.Stress.Server {------------------------------------------------------------------------------- - Barebones stress test + Stress tests - Unlike the regular test suite, here we run the client and the server in + Unlike the regular test suite, we support running the client and the server in separate processes, so that we can run each with their own set of RTS flags. -------------------------------------------------------------------------------} main :: IO () main = do - cmdline <- getCmdline - case cmdRole cmdline of - Client test -> client cmdline test - Server -> server cmdline + -- Parse command-line options + cmdline@Cmdline{..} <- getCmdline + say (optsTracing cmdGlobalOpts) $ + "parsed command-line options: " ++ show cmdline + case cmdRole of + Client{..} -> + client + (optsTracing cmdGlobalOpts) + clientSecurity + clientServerPort + clientCompression + clientConnects + Server{..} -> + server + (optsTracing cmdGlobalOpts) + serverConfig + Driver{..} -> + driver + (optsTracing cmdGlobalOpts) + driverGenCharts + driverWorkingDir + driverDuration diff --git a/test-stress/Test/Stress/Client.hs b/test-stress/Test/Stress/Client.hs index 6bea0aff..38f5a44b 100644 --- a/test-stress/Test/Stress/Client.hs +++ b/test-stress/Test/Stress/Client.hs @@ -1,64 +1,210 @@ -module Test.Stress.Client (client) where - --- TODO: --- --- We should stress test a scenario where there is many reconnections. +module Test.Stress.Client + ( client + ) where +import Control.Concurrent +import Control.Concurrent.Async import Control.Exception import Control.Monad +import Data.ByteString.Lazy.Char8 qualified as BS.Char8 +import GHC.IO.Exception +import Network.HTTP2.Client (HTTP2Error(..)) +import Network.Socket +import Network.TLS (TLSException(..)) -import Network.GRPC.Client -import Network.GRPC.Client.Binary qualified as Binary +import Network.GRPC.Client hiding (Call) import Network.GRPC.Common +import Network.GRPC.Common.Compression (Compression) +import Network.GRPC.Common.Compression qualified as Compr +import Proto.API.Trivial import Test.Stress.Cmdline -import Test.Stress.Server.API - -{------------------------------------------------------------------------------- - Top-level --------------------------------------------------------------------------------} - -client :: Cmdline -> Test -> IO () -client _cmdline test = - case test of - ManyCalls n -> - withConnection params server $ \conn -> - forM_ [1 .. n] $ \i -> - singleNonStreaming conn i - ManyConnections n -> - forM_ [1 .. n] $ \i -> - withConnection params server $ \conn -> - singleNonStreaming conn i - ManyMessages _n _streamingType -> - -- TODO: - -- Need to implement the many-messages stress test - putStrLn "Not implemented" +import Test.Stress.Common + +------------------------------------------------------------------------------- +-- Top-level +------------------------------------------------------------------------------- + +client :: + Bool + -> Maybe ServerValidation + -> PortNumber + -> Compression + -> [Connect] + -> IO () +client v mServerValidation serverPort compr = + mapM_ $ runConnect v mServerValidation serverPort compr + +runConnect :: + Bool + -> Maybe ServerValidation + -> PortNumber + -> Compression + -> Connect + -> IO () +runConnect v mServerValidation serverPort compr Connect{..} = do + say' v serverPort $ + "running calls " ++ show connectCalls ++ + case connectExec of + Sequential -> " in sequence" + Concurrent -> " concurrently" + mapF (runCalls v mServerValidation serverPort compr callNum) $ + zip [1..] $ replicate connectNum connectCalls where - params :: ConnParams - params = def + mapF = + case connectExec of + Sequential -> mapM_ + Concurrent -> mapConcurrently_ - server :: Server - server = ServerInsecure address +runCalls :: + Bool + -> Maybe ServerValidation + -> PortNumber + -> Compression + -> Int + -> (Int, [Call]) + -> IO () +runCalls v mServerValidation serverPort compr callNum (connNum, calls) = do + say' v serverPort msg + let connParams = def { + connCompression = Compr.insist compr + , connHTTP2Settings = def { + http2TcpAbortiveClose = True + } + , connReconnectPolicy = + exponentialBackoff + (\d -> do + say' v serverPort $ "Reconnecting after " ++ show d ++ "μs" + threadDelay d + ) + 1 + (0.1, 0.1) + maxBound - address :: Address - address = Address { + } + allowCertainFailures $ + withConnection connParams server $ \conn -> + replicateM_ callNum $ mapM_ (runCall v serverPort conn) calls + where + addr :: Address + addr = Address { addressHost = "127.0.0.1" - , addressPort = defaultInsecurePort + , addressPort = serverPort , addressAuthority = Nothing } -{------------------------------------------------------------------------------- - Specific RPCs --------------------------------------------------------------------------------} + server :: Server + server = + case mServerValidation of + Just serverValidation -> + ServerSecure serverValidation SslKeyLogNone addr + Nothing -> + ServerInsecure addr + + allowCertainFailures :: IO () -> IO () + allowCertainFailures = + handle $ \case + e | Just ServerDisconnected{} <- fromException e -> + say' v serverPort $ "server disconnected: " ++ show e + | Just IOError{} <- fromException e -> + say' v serverPort "failed to connect" + | Just ConnectionIsTimeout <- fromException e -> + say' v serverPort "got ConnectionIsTimeout" + | Just BadThingHappen{} <- fromException e -> + say' v serverPort "got BadThingHappen" + | Just HandshakeFailed{} <- fromException e -> + say' v serverPort "got HandshakeFailed" + | Just BlockedIndefinitelyOnSTM{} <- fromException e -> + say' v serverPort "got BlockedIndefinitelyOnSTM" + | otherwise -> do + say' v serverPort $ "got exception: " ++ displayException e + throwIO e + + msg :: String + msg = + "opening connection " ++ show connNum ++ " to " + ++ case mServerValidation of + Just _ -> "secure " + Nothing -> "insecure " + ++ "server at port " ++ show serverPort ++ " with compression " + ++ show (Compr.compressionId compr) + +runCall :: Bool -> PortNumber -> Connection -> Call -> IO () +runCall v p conn = + \case + NonStreaming -> + nonStreaming v p conn + ClientStreaming n -> + clientStreaming v p conn n + ServerStreaming n -> + serverStreaming v p conn n + BiDiStreaming n -> + bidiStreaming v p conn n + +------------------------------------------------------------------------------- +-- Specific RPCs +------------------------------------------------------------------------------- -- | One non-streaming, round-trip call -singleNonStreaming :: Connection -> Word -> IO () -singleNonStreaming conn n = - withRPC conn def (Proxy @ManyShortLived) $ \call -> do - Binary.sendFinalInput call n - m <- fst <$> Binary.recvFinalOutput @Word call - unless (m == succ n) $ - throwIO . userError $ concat [ - "Unexpected " ++ show m ++ "; " - , "expected " ++ show (succ n) - ] +nonStreaming :: Bool -> PortNumber -> Connection -> IO () +nonStreaming v p conn = do + say' v p "initiating non-streaming call" + withRPC conn def (Proxy @(Trivial' "non-streaming")) $ \call -> do + sendFinalInput call =<< randomMsg + void $ recvFinalOutput call + say' v p "received final output for non-streaming call" + +-- | Client streaming +-- +-- Client sends the server @N@, followed by @N@ messages. +clientStreaming :: Bool -> PortNumber -> Connection -> Int -> IO () +clientStreaming v p conn n = do + say' v p "initiating client streaming call" + withRPC conn def (Proxy @(Trivial' "client-streaming")) $ \call -> do + say' v p $ "sending " ++ show n ++ " messages" + sendNextInput call $ BS.Char8.pack (show n) + msg <- randomMsg + forM_ [1 .. n-1] $ \_ -> + void $ sendNextInput call msg + sendFinalInput call msg + void $ recvFinalOutput call + say' v p "received final output for client streaming call" + +-- | Server streaming +-- +-- Client sends the server @N@, then receives @N@ messages from server. +serverStreaming :: Bool -> PortNumber -> Connection -> Int -> IO () +serverStreaming v p conn n = do + say' v p "initiating server streaming call" + withRPC conn def (Proxy @(Trivial' "server-streaming")) $ \call -> do + say' v p $ "receiving " ++ show n ++ " messages" + sendFinalInput call $ BS.Char8.pack (show n) + forM_ [1 .. n-1] $ \_ -> void $ recvNextOutput call + void $ recvFinalOutput call + say' v p "received final output for server streaming call" + +-- | Bidirectional streaming +-- +-- Client sends the server @N@, then alternates sending and receiving @N*2@ +-- total messages. +bidiStreaming :: Bool -> PortNumber -> Connection -> Int -> IO () +bidiStreaming v p conn n = do + say' v p "initiating bidi streaming call" + withRPC conn def (Proxy @(Trivial' "bidi-streaming")) $ \call -> do + say' v p $ "sending and receiving " ++ show n ++ " messages" + sendNextInput call $ BS.Char8.pack (show n) + msg <- randomMsg + forM_ [1 .. n-1] $ \_ -> do + sendNextInput call msg + void $ recvNextOutput call + sendFinalInput call msg + void $ recvFinalOutput call + say' v p "sent and received final messages for bidi streaming call" + +------------------------------------------------------------------------------- +-- Utils +------------------------------------------------------------------------------- + +say' :: Bool -> PortNumber -> String -> IO () +say' v p msg = + say v $ "(client " ++ show p ++ ") " ++ msg diff --git a/test-stress/Test/Stress/Cmdline.hs b/test-stress/Test/Stress/Cmdline.hs index c0528471..d8e8fe39 100644 --- a/test-stress/Test/Stress/Cmdline.hs +++ b/test-stress/Test/Stress/Cmdline.hs @@ -1,97 +1,427 @@ -module Test.Stress.Cmdline ( +{-# LANGUAGE CPP #-} + +module Test.Stress.Cmdline + ( -- * Types Cmdline(..) , Role(..) - , Test(..) + , GlobalOpts(..) + + -- ** Client-specific + , Connect(..) + , Exec(..) + , Call(..) + + -- ** Server-specific + , Security(..) + , TlsOpts(..) + + -- * Parser , getCmdline ) where -import Data.Foldable (asum) +import Control.Applicative +import Network.Socket (HostName, PortNumber) import Options.Applicative qualified as Opt -import Network.GRPC.Common.StreamType +import Network.GRPC.Client qualified as Client +import Network.GRPC.Common +import Network.GRPC.Server.Run +import Network.GRPC.Spec qualified as Spec + +import Paths_grapesy {------------------------------------------------------------------------------- - Definition + Definitions -------------------------------------------------------------------------------} +-- | Command specification data Cmdline = Cmdline { - cmdRole :: Role + cmdRole :: Role + , cmdGlobalOpts :: GlobalOpts } + deriving (Show) +-- | Should we run the client, servers, or both? data Role = - Client Test - | Server + -- | Run the clients + Client { + -- | Connect over TLS? + clientSecurity :: Maybe Client.ServerValidation + , clientServerPort :: PortNumber + , clientConnects :: [Connect] -data Test = - ManyConnections Word - | ManyCalls Word - | ManyMessages Word StreamingType + -- | Insist on this compression scheme for all messages + , clientCompression :: Spec.Compression + } -{------------------------------------------------------------------------------- - Parser top-level --------------------------------------------------------------------------------} + -- | Run the server + | Server { + serverConfig :: ServerConfig + } + + -- | Run the automatic stress test suite + | Driver { + driverWorkingDir :: Maybe FilePath + , driverDuration :: Int + , driverGenCharts :: Bool + } + deriving (Show) + +-- | Connections to execute +data Connect = Connect { + -- | Execute the connections concurrently or sequentially? + connectExec :: Exec + + -- | Number of connections + , connectNum :: Int + + -- | Number of times to repeat the calls on the connection + , callNum :: Int + + -- | Calls to make on the connections + , connectCalls :: [Call] + } + deriving (Show) + +-- | Concurrent or sequential execution +data Exec = Concurrent | Sequential + deriving (Show) + +-- | Types of RPCs +data Call = + -- | A single message back and forth + NonStreaming + + -- | Client sends @N@ messages to the server + | ClientStreaming Int + + -- | Server sends @N@ messages to the client + | ServerStreaming Int + + -- | Client and server send @N@ messages to each other + | BiDiStreaming Int + deriving (Show) + +data Security = + Insecure + | Secure + deriving (Show) + +data TlsOpts = TlsOpts { + tlsPubCert :: FilePath + , tlsChainCerts :: [FilePath] + , tlsPrivKey :: FilePath + } + deriving (Show) + +mkConfig :: Maybe TlsOpts -> HostName -> PortNumber -> ServerConfig +mkConfig mtls host port = + case mtls of + Just TlsOpts{..} -> + ServerConfig Nothing $ Just SecureConfig { + secureHost = host + , securePort = port + , securePubCert = tlsPubCert + , secureChainCerts = tlsChainCerts + , securePrivKey = tlsPrivKey + , secureSslKeyLog = SslKeyLogNone + } + Nothing -> + (`ServerConfig` Nothing) $ Just InsecureConfig { + insecureHost = Just host + , insecurePort = port + } + +data GlobalOpts = GlobalOpts { + optsTracing :: Bool + } + deriving (Show) + +------------------------------------------------------------------------------- +-- Top-level parsers +------------------------------------------------------------------------------- getCmdline :: IO Cmdline -getCmdline = Opt.execParser $ - Opt.info (parseCmdline Opt.<**> Opt.helper) Opt.fullDesc +getCmdline = do + defaultPub <- getDataFileName "grpc-demo.pem" + defaultPriv <- getDataFileName "grpc-demo.key" -parseCmdline :: Opt.Parser Cmdline -parseCmdline = + let info :: Opt.ParserInfo Cmdline + info = Opt.info + ( parseCmdline defaultPub defaultPriv + Opt.<**> Opt.helper + ) + Opt.fullDesc + + Opt.execParser info + +parseCmdline :: + FilePath + -> FilePath + -> Opt.Parser Cmdline +parseCmdline defaultPub defaultPriv = Cmdline - <$> parseRole + <$> (parseRole defaultPub defaultPriv <|> pure (Driver Nothing 60 False)) + <*> parseGlobalOpts -parseRole :: Opt.Parser Role -parseRole = Opt.subparser $ mconcat [ - sub "client" "Client" (Client <$> parseTest) - , sub "server" "Server" (pure Server) +parseRole :: FilePath -> FilePath -> Opt.Parser Role +parseRole defaultPub defaultPriv = Opt.subparser $ mconcat [ + sub "client" "Run the client" $ + parseClientRole defaultPub + , sub "server" "Run the server" $ + parseServerRole defaultPub defaultPriv + , sub "driver" "Run the stress test driver" $ + parseDriverRole ] -parseTest :: Opt.Parser Test -parseTest = Opt.subparser $ mconcat [ - sub - "many-connections" - "Many connections with single non-streaming RPC call" $ - ManyConnections - <$> parseWord "NUM_CONNECTIONS" "number of connections to open" - , sub - "many-calls" - "Many non-streaming RPC calls on a single connection" $ - ManyCalls - <$> parseWord "NUM_CALLS" "number of calls to make" - , sub - "many-messages" - "Single call on a single connection, many messages exchanged" $ - ManyMessages - <$> parseWord "NUM_MESSAGES" "number of messages" - <*> parseStreamingType - ] +------------------------------------------------------------------------------- +-- Client option parsers +------------------------------------------------------------------------------- -parseWord :: String -> String -> Opt.Parser Word -parseWord meta help = - Opt.argument Opt.auto $ - Opt.metavar meta <> Opt.help help - -parseStreamingType :: Opt.Parser StreamingType -parseStreamingType = asum [ - Opt.flag' ClientStreaming $ mconcat [ - Opt.long "stream-client" - , Opt.help "client-side streaming" - ] - , Opt.flag' ServerStreaming $ mconcat [ - Opt.long "stream-server" - , Opt.help "server-side streaming" - ] - , Opt.flag' BiDiStreaming $ mconcat [ - Opt.long "stream-bidi" - , Opt.help "Bidirectional streaming" - ] - - -- 'NoStreaming' is tested by the many-connections and many-calls cases - ] +parseClientRole :: FilePath -> Opt.Parser Role +parseClientRole defaultPub = + Client + <$> parseClientSecurity defaultPub + <*> parseClientPort + <*> parseClientConnects + <*> parseCompression -{------------------------------------------------------------------------------- - Internal auxiliary --------------------------------------------------------------------------------} +parseClientSecurity :: FilePath -> Opt.Parser (Maybe Client.ServerValidation) +parseClientSecurity defaultPub = + Opt.optional ( + Opt.flag' () (mconcat [ + Opt.long "secure" + , Opt.help "Connect over TLS" + ]) + *> parseServerValidation + ) + where + parseServerValidation :: Opt.Parser Client.ServerValidation + parseServerValidation = + aux + <$> (Opt.switch $ mconcat [ + Opt.long "no-server-validation" + , Opt.help "Skip server (certificate) validation" + ]) + <*> (Opt.switch $ mconcat [ + Opt.long "cert-store-from-system" + , Opt.help "Enable the system certificate store" + ]) + <*> (Opt.option Opt.str $ mconcat [ + Opt.long "cert-store-from-path" + , Opt.help "Load certificate store from file or directory (set to empty to disable)" + , Opt.metavar "PATH" + , Opt.value defaultPub + , Opt.showDefault + ]) + where + aux :: Bool -> Bool -> FilePath -> Client.ServerValidation + aux noServerValidation certStoreFromSystem certStoreFromPath = + if noServerValidation then + Client.NoServerValidation + else + Client.ValidateServer $ mconcat . concat $ [ + [ Client.certStoreFromSystem + | certStoreFromSystem + ] + + , [ Client.certStoreFromPath certStoreFromPath + | not (null certStoreFromPath) + ] + ] + +parseClientPort :: Opt.Parser PortNumber +parseClientPort = + Opt.option Opt.auto (mconcat [ + Opt.long "port" + , Opt.help "Connect to the server at PORT" + , Opt.metavar "PORT" + ]) + +parseClientConnects :: Opt.Parser [Connect] +parseClientConnects = + Opt.some $ Connect + <$> Opt.flag Sequential Concurrent (mconcat [ + Opt.long "concurrent" + , Opt.help "Open connections concurrently" + ]) + <*> Opt.option Opt.auto (mconcat [ + Opt.long "num-connections" + , Opt.help "Open N connections" + , Opt.metavar "N" + , Opt.value 1 + , Opt.showDefault + ]) + <*> Opt.option Opt.auto (mconcat [ + Opt.long "num-calls" + , Opt.help "Repeat the calls N times on each connection" + , Opt.metavar "N" + , Opt.value 1 + , Opt.showDefault + ]) + <*> Opt.some parseCall + +parseCall :: Opt.Parser Call +parseCall = + nonStreaming + <|> clientStreaming + <|> serverStreaming + <|> bidiStreaming + where + nonStreaming :: Opt.Parser Call + nonStreaming = + Opt.flag' NonStreaming $ mconcat [ + Opt.long "non-streaming" + , Opt.help "Make a single non-streaming call" + ] + + clientStreaming :: Opt.Parser Call + clientStreaming = + ClientStreaming + <$> Opt.option Opt.auto (mconcat [ + Opt.long "client-streaming" + , Opt.help "Stream N messages from the client" + , Opt.metavar "N" + ]) + + serverStreaming :: Opt.Parser Call + serverStreaming = + ServerStreaming + <$> Opt.option Opt.auto (mconcat [ + Opt.long "server-streaming" + , Opt.help "Stream N messages from the server" + , Opt.metavar "N" + ]) + + bidiStreaming :: Opt.Parser Call + bidiStreaming = + BiDiStreaming + <$> Opt.option Opt.auto (mconcat [ + Opt.long "bidi-streaming" + , Opt.help "Stream N messages from both the client and server" + , Opt.metavar "N" + ]) + +parseCompression :: Opt.Parser Spec.Compression +parseCompression = + gzip + <|> deflate +#ifdef SNAPPY + <|> snappy +#endif + <|> pure Spec.noCompression + where + gzip :: Opt.Parser Spec.Compression + gzip = + Opt.flag' Spec.gzip $ mconcat [ + Opt.long "gzip" + , Opt.help "Insist on gzip compression" + ] + + deflate :: Opt.Parser Spec.Compression + deflate = + Opt.flag' Spec.deflate $ mconcat [ + Opt.long "deflate" + , Opt.help "Insist on deflate compression" + ] +#ifdef SNAPPY + snappy :: Opt.Parser Spec.Compression + snappy = + Opt.flag' Spec.snappy $ mconcat [ + Opt.long "snappy" + , Opt.help "Insist on snappy compression" + ] +#endif + +------------------------------------------------------------------------------- +-- Server option parsers +------------------------------------------------------------------------------- + +parseServerRole :: FilePath -> FilePath -> Opt.Parser Role +parseServerRole defaultPub defaultPriv = + aux + <$> parseServerPort + <*> parseServerSecurity defaultPub defaultPriv + where + aux :: PortNumber -> Maybe TlsOpts -> Role + aux port mtls = Server $ mkConfig mtls "127.0.0.1" port + +parseServerPort :: Opt.Parser PortNumber +parseServerPort = + Opt.option Opt.auto (mconcat [ + Opt.long "port" + , Opt.help "Bind the server to port PORT" + , Opt.metavar "PORT" + ]) + +parseServerSecurity :: FilePath -> FilePath -> Opt.Parser (Maybe TlsOpts) +parseServerSecurity defaultPub defaultPriv = + Opt.optional $ + Opt.flag' () (mconcat [ + Opt.long "secure" + , Opt.help "Enable TLS on the servers" + ]) + *> parseTlsOpts defaultPub defaultPriv + +parseTlsOpts :: FilePath -> FilePath -> Opt.Parser TlsOpts +parseTlsOpts defaultPub defaultPriv = + TlsOpts + <$> Opt.strOption (mconcat [ + Opt.long "tls-pub" + , Opt.help "TLS public certificate (X.509 format)" + , Opt.metavar "CERT_FILE" + , Opt.showDefault + , Opt.value defaultPub + ]) + <*> Opt.many (Opt.strOption $ mconcat [ + Opt.long "tls-cert" + , Opt.metavar "CERT_FILE" + , Opt.help "TLS chain certificate (X.509 format)" + ]) + <*> Opt.strOption (mconcat [ + Opt.long "tls-priv" + , Opt.metavar "KEY_FILE" + , Opt.help "TLS private key" + , Opt.showDefault + , Opt.value defaultPriv + ]) + +------------------------------------------------------------------------------- +-- Driver option parsers +------------------------------------------------------------------------------- + +parseDriverRole :: Opt.Parser Role +parseDriverRole = + Driver + <$> Opt.optional (Opt.strOption (mconcat [ + Opt.long "working-dir" + , Opt.help "Write result files to this directory, must already exist" + , Opt.metavar "DIR" + ])) + <*> Opt.option Opt.auto (mconcat [ + Opt.long "duration" + , Opt.help "Run the stress tests for this many seconds" + , Opt.metavar "SECONDS" + , Opt.value 60 + , Opt.showDefault + ]) + <*> Opt.switch (mconcat [ + Opt.long "gen-charts" + , Opt.help "Generate heap profile charts for stable components" + ]) + +------------------------------------------------------------------------------- +-- Internal auxiliary +------------------------------------------------------------------------------- + +parseGlobalOpts :: Opt.Parser GlobalOpts +parseGlobalOpts = + GlobalOpts + <$> Opt.switch (mconcat [ + Opt.long "verbose" + , Opt.short 'v' + , Opt.help "Trace test execution to stdout" + ]) sub :: String -> String -> Opt.Parser a -> Opt.Mod Opt.CommandFields a sub cmd desc parser = diff --git a/test-stress/Test/Stress/Common.hs b/test-stress/Test/Stress/Common.hs new file mode 100644 index 00000000..97878f2d --- /dev/null +++ b/test-stress/Test/Stress/Common.hs @@ -0,0 +1,25 @@ +module Test.Stress.Common + ( -- * Logging + say + + -- * Miscellaneous + , randomMsg + ) where + +import Data.ByteString.Lazy qualified as Lazy +import System.Random + +-- | Generate a random 'Lazy.ByteString' between 128 and 256 bytes in length +randomMsg :: IO Lazy.ByteString +randomMsg = do + g1 <- getStdGen + let (l, g2) = randomR (128, 256) g1 + return . Lazy.fromStrict . fst $ genByteString l g2 + +-- | Log the message, if logging is enabled +say :: Bool -> String -> IO () +say enabled msg + | enabled + = putStrLn $ "test-stress: " ++ msg + | otherwise + = return () diff --git a/test-stress/Test/Stress/Driver.hs b/test-stress/Test/Stress/Driver.hs new file mode 100644 index 00000000..af1211e4 --- /dev/null +++ b/test-stress/Test/Stress/Driver.hs @@ -0,0 +1,333 @@ +module Test.Stress.Driver + ( driver + ) where + +import Control.Concurrent +import Control.Concurrent.Async +import Control.Exception +import Control.Monad +import Control.Monad.Catch (ExitCase(..), generalBracket) +import Data.IORef +import System.Environment +import System.Exit +import System.Process +import System.Random + +import Test.Stress.Common +import Test.Stress.Driver.Summary + +------------------------------------------------------------------------------- +-- Top-level +------------------------------------------------------------------------------- + +-- | Run the automatic stress test suite +-- +-- See documentation in the source repository for details. +driver :: Bool -> Bool -> Maybe FilePath -> Int -> IO () +driver v genCharts mwd duration = do + putStrLn $ + "Running stress test driver for " ++ show duration ++ " seconds..." + exitCodeRef <- newIORef ExitSuccess + bracket + ( do + runningServers <- mapM (forkComponent v genCharts mwd) servers + runningClients <- mapM (forkComponent v genCharts mwd) clients + return $ runningServers ++ runningClients + ) + (\running -> do + say v "(driver) stopping all components" + cancelMany running + say v "(driver) stopped all components" + ) + (\running -> do + result <- race (threadDelay (duration * 1_000_000)) (waitAnyCatch running) + case result of + Right (_, Left e) + | Just (TestFailure _) <- fromException e -> do + writeIORef exitCodeRef (ExitFailure 1) + throwIO e + | otherwise -> + return () + _ -> return () + ) + `catch` + \case + e | Just f@TestFailure{} <- fromException e -> do + putStrLn $ "stress test failed: " ++ show f + | otherwise -> do + say v $ "(driver) exiting cleanly, got exception: " ++ show e + + -- At this point, the heap profiles should all be written in the working + -- directory. We convert them each to charts of total heap usage over time + -- and combine into a summary document + when genCharts $ + createSummaryPlots v mwd + putStrLn "Done" + exitCode <- readIORef exitCodeRef + exitWith exitCode + +------------------------------------------------------------------------------- +-- Auxiliary +------------------------------------------------------------------------------- + +data Component = Component { + -- | Client or server? + componentType :: ClientServer + + -- | What port to bind/connect to + , componentPort :: Int + + -- | Use TLS? + , componentSecure :: Bool + + -- | Should the component stay running indefinitely? + -- + -- If 'False', component will be killed and restarted at random intervals + -- and it will not be configured to write a heap profile. + , componentStable :: Bool + + -- | Heap size limit in Megabytes (e.g. @Just 15@ becomes @-M15m@) + -- + -- Set to 'Nothing' for no limit. + , componentLimit :: Maybe Int + + -- | Name + -- + -- Determines the name of the heap profile file that will be written out. + , componentName :: String + } + deriving (Show) + +data ClientServer = + Client { + -- | Which compression should be used? + -- + -- We don't currently use this in the driver to avoid running too many + -- clients, but it is left here as an option. + clientCompr :: Maybe String + + -- | Specify which calls should be executed + , clientFlags :: [String] + } + | Server + deriving (Show, Eq) + +newtype TestFailure = TestFailure String + deriving (Show) + deriving anyclass Exception + +cmd :: Bool -> ClientServer -> [String] +cmd v t = mconcat [ + [ "-v" | v ] + , case t of + Client mcompr flags -> + mconcat [ + [ "client" ] + , [ "--"++compr | Just compr <- [mcompr] ] + , flags + ] + Server -> + [ "server" ] + ] + +forkComponent :: Bool -> Bool -> Maybe FilePath -> Component -> IO (Async ()) +forkComponent v genCharts mwd c = do + say v $ "(driver) forking component " ++ show c + async $ + runComponent v genCharts mwd c `catch` + \case + (e :: SomeException) + | Just AsyncCancelled <- fromException e -> + say v $ "(driver) cancelled forked component " ++ show c + | otherwise -> do + say v $ "(driver) component exited with " ++ show e + throwIO e + +runComponent :: Bool -> Bool -> Maybe FilePath -> Component -> IO () +runComponent v genCharts mwd c@Component{..} = do + say' $ "starting " ++ componentName + exe <- getExecutablePath + let cp = proc exe (mconcat [ + cmd v componentType + , [ "--port=" ++ show componentPort ] + , [ "--secure" | componentSecure ] + , if componentType == Server then + [ "+RTS", "-N", "-RTS" ] + else + [] + , filter (const $ componentStable && genCharts) [ + "+RTS" + , "-l" + , "-hT" + , "-ol" ++ componentName ++ ".eventlog" + , "-RTS" + ] + , case componentLimit of + Just limit -> [ + "+RTS" + , "-M" ++ show limit ++ "m" + , "-RTS" + ] + Nothing -> + [] + ]) + (k, ec) <- + generalBracket + (do + (_, _, _, ph) <- + createProcess_ + ("runComponent (" ++ componentName ++ ")") + cp {cwd = mwd} + return ph + ) + (\ph ec -> do + terminateProcess ph + return ec + ) + watchComponent + case ec of + ExitCaseSuccess _ -> k + ExitCaseException e + | Just (TestFailure _) <- fromException e -> + throwIO e + | otherwise -> + k + ExitCaseAbort -> throwIO $ TestFailure $ componentName ++ " aborted" + where + watchComponent :: ProcessHandle -> IO (IO ()) + watchComponent ph = + if componentStable then do + say' $ "watching " ++ componentName + ec <- waitForProcess ph + case ec of + ExitFailure _ -> do + say' $ "unexpected ExitFailure from " ++ show componentName + throwIO $ TestFailure componentName + ExitSuccess -> do + say' $ componentName ++ " exited successfully, rerunning" + return $ runComponent v genCharts mwd c + else do + d <- randomRIO (16_000_000, 20_000_000) + say' $ + "waiting " ++ show d ++ "µs before killing " ++ show componentName + threadDelay d + mec <- getProcessExitCode ph + case mec of + Just (ExitFailure _) -> do + say' $ "unexpected ExitFailure from " ++ show componentName + throwIO $ TestFailure componentName + _ -> do + say' $ "terminating " ++ show componentName + terminateProcess ph + d' <- randomRIO (200_000, 500_000) + say' $ + "waiting " ++ show d ++ "µs before restarting " ++ + show componentName + threadDelay d' + return $ runComponent v genCharts mwd c + + say' :: String -> IO () + say' = say v . ("(driver) " ++) + +servers :: [Component] +servers = [ + Component { + componentType = Server + , componentPort = 50000 + , componentSecure = False + , componentStable = False + , componentLimit = Just 60 + , componentName = "server-unstable-insecure" + } + , Component { + componentType = Server + , componentPort = 50001 + , componentSecure = True + , componentStable = False + , componentLimit = Just 100 + , componentName = "server-unstable-secure" + } + , Component { + componentType = Server + , componentPort = 50002 + , componentSecure = False + , componentStable = True + , componentLimit = Just 60 + , componentName = "server-stable-insecure" + } + , Component { + componentType = Server + , componentPort = 50003 + , componentSecure = True + , componentStable = True + , componentLimit = Just 100 + , componentName = "server-stable-secure" + } + ] + +clients :: [Component] +clients = [ + Component { + componentType = Client Nothing flags + , componentPort = snd portSecurity + , componentSecure = fst portSecurity + , componentStable = stable + , componentLimit = Just 60 + , componentName = mconcat [ + "client-" + , show (snd portSecurity) ++ "-" + , if stable then "stable-" else "unstable-" + , if fst portSecurity then "secure-" else "insecure-" + , nameStr + ] + } + | (flags, nameStr) <- [ + ( [ indefinitely "--num-connections" + , "--non-streaming" + ] + , "non-streaming-many-connections" + ) + , ( [ indefinitely "--num-calls" + , "--non-streaming" + ] + , "non-streaming-many-calls" + ) + , ( [ indefinitely "--client-streaming" + ] + , "client-stream" + ) + , ( [ indefinitely "--num-calls" + , "--client-streaming=10" + ] + , "client-stream-many-calls" + ) + , ( [ indefinitely "--server-streaming" + ] + , "server-stream" + ) + , ( [ indefinitely "--num-calls" + , "--server-streaming=10" + ] + , "server-stream-many-calls" + ) + , ( [ indefinitely "--bidi-streaming" + ] + , "bidi-stream" + ) + , ( [ indefinitely "--num-calls" + , "--bidi-streaming=10" + ] + , "bidi-stream-many-calls" + ) + ] + , portSecurity <- [ + (False, 50000) + , (False, 50002) + , (True , 50001) + , (True , 50003) + ] + , stable <- [True, False] + ] + where + indefinitely :: String -> String + indefinitely = (++ "=100000000") diff --git a/test-stress/Test/Stress/Driver/Summary.hs b/test-stress/Test/Stress/Driver/Summary.hs new file mode 100644 index 00000000..b9644cbe --- /dev/null +++ b/test-stress/Test/Stress/Driver/Summary.hs @@ -0,0 +1,112 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Test.Stress.Driver.Summary + ( createSummaryPlots + , eventlogToSvg + ) where + +import Control.Exception +import Data.ByteString.Lazy.Char8 qualified as BS.Lazy +import Data.List +import Data.Maybe +import Data.Word +import GHC.RTS.Events +import GHC.RTS.Events.Incremental +import Graphics.Rendering.Chart.Backend.Diagrams +import Graphics.Rendering.Chart.Easy hiding ((<.>)) +import System.Directory +import System.Exit +import System.FilePath + +import Test.Stress.Common + +createSummaryPlots :: Bool -> Maybe FilePath -> IO () +createSummaryPlots v mwd = do + cwd <- getCurrentDirectory + let wd = fromMaybe cwd mwd + putStrLn $ "Creating summary plots in " ++ wd ++ "..." + wdFiles <- map (wd ) <$> listDirectory wd + let wdElFiles = filter (".eventlog" `isSuffixOf`) wdFiles + say' v $ "found event logs:" + mapM_ (say' v) $ map (" " ++) wdElFiles + mapM_ (\e -> handleFailure e $ eventlogToSvg v e) wdElFiles + where + handleFailure :: FilePath -> IO () -> IO () + handleFailure f = + handle $ \case + e | Just UserInterrupt <- fromException e -> + exitFailure + | otherwise -> do + putStrLn $ "failed to generate summary plot for " ++ f + print e + +eventlogToSvg :: Bool -> FilePath -> IO () +eventlogToSvg v elFile = do + say' v $ "generating plot file for " ++ elFile + elBytes <- BS.Lazy.readFile elFile + case readEventLog elBytes of + Right (EventLog _ (Data events), _merr) -> do + samples <- goEvents emptySamples events + toFile def (elFile <.> "svg") $ do + layout_title .= elFile + layout_x_axis . laxis_title .= "Time (seconds)" + layout_y_axis . laxis_title .= "Size (megabytes)" + plot (line "live bytes" [samplesLiveBytes samples]) + plot (line "blocks size" [samplesBlocksSize samples]) + plot (line "heap size" [samplesHeapSize samples]) + say' v $ "finished plot for " ++ elFile + Left err -> + putStrLn $ + "Failed to create summary plot from " ++ elFile ++ ": " ++ err + where + goEvents :: Samples -> [Event] -> IO Samples + goEvents acc [] = + return acc + goEvents acc (Event t ei _:es) = do + acc' <- addEvent acc (t,ei) + `catch` (\(_e :: SomeException) -> return acc) + goEvents acc' es + + addEvent :: Samples -> (Timestamp, EventInfo) -> IO Samples + addEvent acc (t, ei) = + case ei of + HeapLive _ s -> return $ acc { + samplesLiveBytes = + insertBy byFirst + (timeConv t, sizeConv s) (samplesLiveBytes acc) + } + BlocksSize _ s -> return $ acc { + samplesBlocksSize = + insertBy byFirst + (timeConv t, sizeConv s) (samplesBlocksSize acc) + } + HeapSize _ s -> return $ acc { + samplesHeapSize = + insertBy byFirst + (timeConv t, sizeConv s) (samplesHeapSize acc) + } + _ -> return acc + where + timeConv :: Word64 -> Double + sizeConv :: Word64 -> Double + timeConv = (/ 1_000_000_000) . fromIntegral -- nanoseconds to seconds + sizeConv = (/ 1_000_000) . fromIntegral -- bytes to megabytes + +say' :: Bool -> String -> IO () +say' v = say v . ("(summary) " ++) + +byFirst :: Ord a => (a, b) -> (a, b) -> Ordering +byFirst (x1, _) (x2, _) = compare x1 x2 + +------------------------------------------------------------------------------- +-- Internal auxiliary +------------------------------------------------------------------------------- + +data Samples = Samples { + samplesLiveBytes :: [(Double, Double)] + , samplesBlocksSize :: [(Double, Double)] + , samplesHeapSize :: [(Double, Double)] + } + +emptySamples :: Samples +emptySamples = Samples [] [] [] diff --git a/test-stress/Test/Stress/Server.hs b/test-stress/Test/Stress/Server.hs index a57b79b6..a98904f0 100644 --- a/test-stress/Test/Stress/Server.hs +++ b/test-stress/Test/Stress/Server.hs @@ -1,27 +1,117 @@ -module Test.Stress.Server (server) where +module Test.Stress.Server + ( server + ) where + +import Control.Exception +import Control.Monad +import Data.ByteString.Lazy.Char8 qualified as BS.Char8 +import Data.IORef import Network.GRPC.Common +import Network.GRPC.Server import Network.GRPC.Server.Run -import Network.GRPC.Server.StreamType -import Network.GRPC.Server.StreamType.Binary qualified as Binary +import Proto.API.Trivial -import Test.Stress.Cmdline -import Test.Stress.Server.API +import Test.Stress.Common +import System.Exit (exitFailure) {------------------------------------------------------------------------------- Top-level -------------------------------------------------------------------------------} -server :: Cmdline -> IO () -server _cmdline = - runServerWithHandlers def config [ - fromMethod $ - Binary.mkNonStreaming @ManyShortLived @Word $ - return . succ - ] +server :: Bool -> ServerConfig -> IO () +server v config = handle swallowInterruptOrKilled $ do + idRef <- newIORef "unknown" + s <- mkGrpcServer def (handlers v idRef) + forkServer def config s $ \runningServer -> do + p <- getServerPort runningServer + writeIORef idRef $ show p + say v $ "server running on port " ++ show p + waitServer runningServer + where + swallowInterruptOrKilled :: SomeException -> IO () + swallowInterruptOrKilled e + | Just UserInterrupt <- asyncExceptionFromException e + = say v "server received user interrupt, exiting gracefully" + | Just ThreadKilled <- asyncExceptionFromException e + = say v "server thread killed, exiting gracefully" + | otherwise + = do + putStrLn $ "got unexpected server exception: " ++ show e + exitFailure + +{------------------------------------------------------------------------------- + Handlers +-------------------------------------------------------------------------------} + +handlers :: Bool -> IORef String -> [SomeRpcHandler IO] +handlers v idRef = [ + someRpcHandler @(Trivial' "non-streaming") $ + mkRpcHandler $ clientDisconnectOkay . nonStreaming + , someRpcHandler @(Trivial' "server-streaming") $ + mkRpcHandler $ clientDisconnectOkay . serverStreaming + , someRpcHandler @(Trivial' "client-streaming") $ + mkRpcHandler $ clientDisconnectOkay . clientStreaming + , someRpcHandler @(Trivial' "bidi-streaming") $ + mkRpcHandler $ clientDisconnectOkay . bidiStreaming + ] where - config :: ServerConfig - config = ServerConfig { - serverInsecure = Just $ InsecureConfig Nothing defaultInsecurePort - , serverSecure = Nothing - } + -- Single message from client, single message from server + nonStreaming :: Call (Trivial' "non-streaming") -> IO () + nonStreaming call = do + say' "handling non-streaming call" + msg <- recvFinalInput call + sendFinalOutput call $ (msg, NoMetadata) + say' "sent final output for non-streaming call" + + -- Client sends message containing number N, then client streams N messages + -- to server + clientStreaming :: Call (Trivial' "client-streaming") -> IO () + clientStreaming call = do + say' "handling client streaming call" + inp <- read @Int . BS.Char8.unpack <$> recvNextInput call + say' $ "receiving " ++ show inp ++ " messages" + forM_ [1 .. inp-1] $ \_ -> void $ recvNextInput call + msg <- recvFinalInput call + sendFinalOutput call (msg, NoMetadata) + say' $ "sent final output for client streaming call" + + -- Client sends message containing number N, then server streams N messages + -- to client + serverStreaming :: Call (Trivial' "server-streaming") -> IO () + serverStreaming call = do + say' "handling server streaming call" + inp <- read @Int . BS.Char8.unpack <$> recvNextInput call + say' $ "sending " ++ show inp ++ " messages" + msg <- randomMsg + forM_ [1 .. inp-1] $ \_ -> sendNextOutput call msg + sendFinalOutput call (msg, NoMetadata) + say' $ "sent final output for server streaming call" + + -- Client sends message containing number N, then client and server send N*2 + -- total messages back and forth. + bidiStreaming :: Call (Trivial' "bidi-streaming") -> IO () + bidiStreaming call = do + say' "handling bidi streaming call" + inp <- read @Int . BS.Char8.unpack <$> recvNextInput call + say' $ "sending and receiving " ++ show inp ++ " messages" + msg <- randomMsg + forM_ [1 .. inp-1] $ \_ -> do + void $ recvNextInput call + sendNextOutput call msg + void $ recvFinalInput call + sendFinalOutput call (msg, NoMetadata) + say' $ "sent and received final messages for bidi streaming call" + + clientDisconnectOkay :: IO () -> IO () + clientDisconnectOkay = + handle $ \case + e | Just ClientDisconnected{} <- fromException e -> do + say' "client disconnected" + | otherwise -> + throwIO e + + say' :: String -> IO () + say' msg = do + sid <- readIORef idRef + say v $ "(server " ++ sid ++ ") " ++ msg diff --git a/test-stress/Test/Stress/Server/API.hs b/test-stress/Test/Stress/Server/API.hs deleted file mode 100644 index d53cdf63..00000000 --- a/test-stress/Test/Stress/Server/API.hs +++ /dev/null @@ -1,14 +0,0 @@ -{-# OPTIONS_GHC -Wno-orphans #-} - -module Test.Stress.Server.API ( - ManyShortLived - ) where - -import Network.GRPC.Common -import Network.GRPC.Common.Binary - -type ManyShortLived = RawRpc "stresstest" "manyshortlived" - -type instance RequestMetadata (RawRpc "stresstest" meth) = NoMetadata -type instance ResponseInitialMetadata (RawRpc "stresstest" meth) = NoMetadata -type instance ResponseTrailingMetadata (RawRpc "stresstest" meth) = NoMetadata diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 10bd22c3..fb5afc42 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -259,6 +259,7 @@ send Channel{channelOutbound, channelSentFinal} msg = FlowStateRegular regular -> do StreamElem.whenDefinitelyFinal msg $ \_trailers -> writeTVar channelSentFinal $ Just callStack + putTMVar (flowMsg regular) msg FlowStateNoMessages _ -> -- For outgoing messages, the caller decides to use Trailers-Only,