From 968ae3ddd5b4780c288bd8e841244a00cd1584a2 Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Wed, 24 Jul 2024 09:14:59 +0200 Subject: [PATCH] Move `startRPC` to `.Call` This separates all connection stuff from all RPC specific stuff. --- src/Network/GRPC/Client/Call.hs | 121 ++++++++++++++++++- src/Network/GRPC/Client/Connection.hs | 167 ++++++-------------------- 2 files changed, 157 insertions(+), 131 deletions(-) diff --git a/src/Network/GRPC/Client/Call.hs b/src/Network/GRPC/Client/Call.hs index 3040a798..10a128f3 100644 --- a/src/Network/GRPC/Client/Call.hs +++ b/src/Network/GRPC/Client/Call.hs @@ -36,21 +36,30 @@ import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Bitraversable +import Data.ByteString.Char8 qualified as BS.Strict.C8 import Data.Default -import Data.Maybe (isJust) +import Data.Foldable (asum) +import Data.List (intersperse) +import Data.Maybe (fromMaybe, isJust) import Data.Proxy import Data.Text qualified as Text +import Data.Version import GHC.Stack -import Network.GRPC.Client.Connection (Connection, Call(..)) +import Network.GRPC.Client.Connection (Connection, ConnParams, Call(..)) import Network.GRPC.Client.Connection qualified as Connection import Network.GRPC.Client.Session import Network.GRPC.Common +import Network.GRPC.Common.Compression qualified as Compression import Network.GRPC.Common.StreamElem qualified as StreamElem import Network.GRPC.Spec +import Network.GRPC.Util.GHC +import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..)) import Network.GRPC.Util.Session qualified as Session import Network.GRPC.Util.Thread qualified as Thread +import Paths_grapesy qualified as Grapesy + {------------------------------------------------------------------------------- Open a call -------------------------------------------------------------------------------} @@ -90,7 +99,7 @@ withRPC :: forall m rpc a. => Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a withRPC conn callParams proxy k = fmap fst $ generalBracket - (liftIO $ Connection.startRPC conn proxy callParams) + (liftIO $ startRPC conn proxy callParams) closeRPC k where @@ -155,6 +164,112 @@ withRPC conn callParams proxy k = fmap fst $ , grpcErrorMetadata = [] } +-- | Open new channel to the server +-- +-- This is a non-blocking call; the connection will be set up in a +-- background thread; if this takes time, then the first call to +-- 'sendInput' or 'recvOutput' will block, but the call to 'startRPC' +-- itself will not block. This non-blocking nature makes this safe to use +-- in 'bracket' patterns. +startRPC :: forall rpc. + (SupportsClientRpc rpc, HasCallStack) + => Connection + -> Proxy rpc + -> CallParams rpc + -> IO (Call rpc) +startRPC conn _ callParams = do + (connClosed, connToServer) <- Connection.getConnectionToServer conn + cOut <- Connection.getOutboundCompression conn + metadata <- buildMetadataIO $ callRequestMetadata callParams + let flowStart :: Session.FlowStart (ClientOutbound rpc) + flowStart = Session.FlowStartRegular $ OutboundHeaders { + outHeaders = requestHeaders cOut metadata + , outCompression = fromMaybe noCompression cOut + } + + let serverClosedConnection :: + Either TrailersOnly' ProperTrailers' + -> SomeException + serverClosedConnection = + either toException toException + . grpcClassifyTermination + . either (fst . trailersOnlyToProperTrailers) id + + channel <- + Session.setupRequestChannel + callSession + connToServer + serverClosedConnection + flowStart + + -- Spawn a thread to monitor the connection, and close the new channel when + -- the connection is closed. To prevent a memory leak by hanging on to the + -- channel for the lifetime of the connection, the thread also terminates in + -- the (normal) case that the channel is closed before the connection is. + _ <- forkLabelled "grapesy:monitorConnection" $ do + status <- atomically $ do + (Left <$> Thread.waitForNormalOrAbnormalThreadTermination + (Session.channelOutbound channel)) + `orElse` + (Right <$> readTMVar connClosed) + case status of + Left _ -> return () -- Channel closed before the connection + Right mErr -> do + let exitReason :: ExitCase () + exitReason = + case mErr of + Nothing -> ExitCaseSuccess () + Just exitWithException -> + ExitCaseException . toException $ + ServerDisconnected exitWithException callStack + _mAlreadyClosed <- Session.close channel exitReason + return () + + return $ Call callSession channel + where + connParams :: ConnParams + connParams = Connection.connParams conn + + requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders + requestHeaders cOut metadata = RequestHeaders{ + requestTimeout = + asum [ + callTimeout callParams + , Connection.connDefaultTimeout connParams + ] + , requestMetadata = + customMetadataMapFromList metadata + , requestCompression = + compressionId <$> cOut + , requestAcceptCompression = Just $ + Compression.offer $ Connection.connCompression connParams + , requestContentType = + Connection.connContentType connParams + , requestMessageType = + Just MessageTypeDefault + , requestUserAgent = Just $ + mconcat [ + "grpc-haskell-grapesy/" + , mconcat . intersperse "." $ + map (BS.Strict.C8.pack . show) $ + versionBranch Grapesy.version + ] + , requestIncludeTE = + True + , requestTraceContext = + Nothing + , requestPreviousRpcAttempts = + Nothing + , requestUnrecognized = + () + } + + callSession :: ClientSession rpc + callSession = ClientSession { + clientCompression = Connection.connCompression connParams + , clientUpdateMeta = Connection.updateConnectionMeta conn + } + {------------------------------------------------------------------------------- Open (ongoing) call -------------------------------------------------------------------------------} diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index a3f28c81..b18a62f0 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -10,8 +10,6 @@ module Network.GRPC.Client.Connection ( -- * Definition Connection -- opaque , Call(..) - , connParams - , startRPC , withConnection -- * Configuration , Server(..) @@ -20,19 +18,18 @@ module Network.GRPC.Client.Connection ( , ConnParams(..) , ReconnectPolicy(..) , exponentialBackoff + -- * Using the connection + , connParams + , getConnectionToServer + , getOutboundCompression + , updateConnectionMeta ) where import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch -import Data.ByteString.Char8 qualified as BS.Strict.C8 import Data.Default -import Data.Foldable (asum) -import Data.List (intersperse) -import Data.Maybe (fromMaybe) -import Data.Proxy -import Data.Version import GHC.Stack import Network.HPACK qualified as HPACK import Network.HTTP2.Client qualified as HTTP2.Client @@ -46,18 +43,13 @@ import Network.GRPC.Client.Meta (Meta) import Network.GRPC.Client.Meta qualified as Meta import Network.GRPC.Client.Session import Network.GRPC.Common.Compression qualified as Compr -import Network.GRPC.Common.Compression qualified as Compression import Network.GRPC.Common.HTTP2Settings import Network.GRPC.Spec import Network.GRPC.Util.GHC -import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..)) import Network.GRPC.Util.Session qualified as Session -import Network.GRPC.Util.Thread import Network.GRPC.Util.TLS (ServerValidation(..), SslKeyLog(..)) import Network.GRPC.Util.TLS qualified as Util.TLS -import Paths_grapesy qualified as Grapesy - {---------------------------------------------------2---------------------------- Connection API @@ -292,123 +284,42 @@ withConnection connParams server k = do k Connection {connParams, connMetaVar, connStateVar} `finally` putMVar connOutOfScope () --- | Open new channel to the server +{------------------------------------------------------------------------------- + Making use of the connection +-------------------------------------------------------------------------------} + +-- | Get connection to the server -- --- This is a non-blocking call; the connection will be set up in a --- background thread; if this takes time, then the first call to --- 'sendInput' or 'recvOutput' will block, but the call to 'startRPC' --- itself will not block. This non-blocking nature makes this safe to use --- in 'bracket' patterns. -startRPC :: forall rpc. - (SupportsClientRpc rpc, HasCallStack) +-- Returns two things: the connection to the server, as well as a @TMVar@ that +-- should be monitored to see if that connection is still live. +getConnectionToServer :: forall. + HasCallStack => Connection - -> Proxy rpc - -> CallParams rpc - -> IO (Call rpc) -startRPC Connection{connMetaVar, connParams, connStateVar} _ callParams = do - (connClosed, conn) <- - atomically $ do - connState <- readTVar connStateVar - case connState of - ConnectionNotReady -> retry - ConnectionReady connClosed conn -> return (connClosed, conn) - ConnectionAbandoned err -> throwSTM err - ConnectionOutOfScope -> error "impossible" - - cOut <- Meta.outboundCompression <$> currentMeta - metadata <- buildMetadataIO $ callRequestMetadata callParams - let flowStart :: Session.FlowStart (ClientOutbound rpc) - flowStart = Session.FlowStartRegular $ OutboundHeaders { - outHeaders = requestHeaders cOut metadata - , outCompression = fromMaybe noCompression cOut - } - - let serverClosedConnection :: - Either TrailersOnly' ProperTrailers' - -> SomeException - serverClosedConnection = - either toException toException - . grpcClassifyTermination - . either (fst . trailersOnlyToProperTrailers) id - - channel <- - Session.setupRequestChannel - callSession - conn - serverClosedConnection - flowStart - - -- Spawn a thread to monitor the connection, and close the new channel when - -- the connection is closed. To prevent a memory leak by hanging on to the - -- channel for the lifetime of the connection, the thread also terminates in - -- the (normal) case that the channel is closed before the connection is. - _ <- forkLabelled "grapesy:monitorConnection" $ do - status <- atomically $ do - (Left <$> waitForNormalOrAbnormalThreadTermination - (Session.channelOutbound channel)) - `orElse` - (Right <$> readTMVar connClosed) - case status of - Left _ -> return () -- Channel closed before the connection - Right mErr -> do - let exitReason :: ExitCase () - exitReason = - case mErr of - Nothing -> ExitCaseSuccess () - Just exitWithException -> - ExitCaseException . toException $ - ServerDisconnected exitWithException callStack - _mAlreadyClosed <- Session.close channel exitReason - return () - - return $ Call callSession channel - where - currentMeta :: IO Meta - currentMeta = readMVar connMetaVar - - updateMeta :: ResponseHeaders' -> IO () - updateMeta hdrs = - modifyMVar_ connMetaVar $ Meta.update (connCompression connParams) hdrs - - requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders - requestHeaders cOut metadata = RequestHeaders{ - requestTimeout = - asum [ - callTimeout callParams - , connDefaultTimeout connParams - ] - , requestMetadata = - customMetadataMapFromList metadata - , requestCompression = - compressionId <$> cOut - , requestAcceptCompression = Just $ - Compression.offer $ connCompression connParams - , requestContentType = - connContentType connParams - , requestMessageType = - Just MessageTypeDefault - , requestUserAgent = Just $ - mconcat [ - "grpc-haskell-grapesy/" - , mconcat . intersperse "." $ - map (BS.Strict.C8.pack . show) $ - versionBranch Grapesy.version - ] - , requestIncludeTE = - True - , requestTraceContext = - Nothing - , requestPreviousRpcAttempts = - Nothing - , requestUnrecognized = - () - } - - callSession :: ClientSession rpc - callSession = ClientSession { - clientCompression = connCompression connParams - , clientUpdateMeta = updateMeta - } + -> IO (TMVar (Maybe SomeException), Session.ConnectionToServer) +getConnectionToServer Connection{connStateVar} = atomically $ do + connState <- readTVar connStateVar + case connState of + ConnectionNotReady -> retry + ConnectionReady connClosed conn -> return (connClosed, conn) + ConnectionAbandoned err -> throwSTM err + ConnectionOutOfScope -> error "impossible" + +-- | Get outbound compression algorithm +-- +-- This is stateful, because it depends on whether or not compression negotation +-- has happened yet: before the remote peer has told us which compression +-- algorithms it can support, we must use no compression. +getOutboundCompression :: Connection -> IO (Maybe Compression) +getOutboundCompression Connection{connMetaVar} = + Meta.outboundCompression <$> readMVar connMetaVar + +-- | Update connection metadata +-- +-- Amongst other things, this updates the compression algorithm to be used +-- (see also 'getOutboundCompression'). +updateConnectionMeta :: Connection -> ResponseHeaders' -> IO () +updateConnectionMeta Connection{connMetaVar, connParams} hdrs = + modifyMVar_ connMetaVar $ Meta.update (connCompression connParams) hdrs {------------------------------------------------------------------------------- Internal auxiliary