From dd5f9cd9a699c3defb1249f301c481018046ee37 Mon Sep 17 00:00:00 2001 From: Paolo Capriotti Date: Wed, 4 Dec 2024 11:12:06 +0100 Subject: [PATCH] Implement temporary queues --- libs/wire-api/src/Wire/API/Notification.hs | 22 ++++++++++-- .../src/Wire/API/Routes/Public/Cannon.hs | 3 +- services/cannon/cannon.cabal | 1 + services/cannon/src/Cannon/API/Public.hs | 6 ++-- services/cannon/src/Cannon/RabbitMq.hs | 13 +++++-- .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 36 ++++++++++++++----- services/cannon/src/Cannon/WS.hs | 3 +- services/gundeck/src/Gundeck/Client.hs | 2 +- services/gundeck/src/Gundeck/Push.hs | 4 ++- 9 files changed, 69 insertions(+), 21 deletions(-) diff --git a/libs/wire-api/src/Wire/API/Notification.hs b/libs/wire-api/src/Wire/API/Notification.hs index cafb68bfacf..4077d25416a 100644 --- a/libs/wire-api/src/Wire/API/Notification.hs +++ b/libs/wire-api/src/Wire/API/Notification.hs @@ -38,8 +38,10 @@ module Wire.API.Notification userNotificationExchangeName, userNotificationDlxName, userNotificationDlqName, + RabbitMqClientId (..), clientNotificationQueueName, userRoutingKey, + temporaryRoutingKey, clientRoutingKey, ) where @@ -49,6 +51,7 @@ import Control.Lens.Operators ((?~)) import Data.Aeson (FromJSON (..), ToJSON (..)) import Data.Aeson.Types qualified as Aeson import Data.Bits +import Data.ByteString.Conversion import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap import Data.Id import Data.Json.Util @@ -188,12 +191,27 @@ userNotificationDlxName = "dead-user-notifications" userNotificationDlqName :: Text userNotificationDlqName = "dead-user-notifications" -clientNotificationQueueName :: UserId -> ClientId -> Text +data RabbitMqClientId + = RabbitMqClientId ClientId + | RabbitMqTempId Text + +instance ToByteString RabbitMqClientId where + builder (RabbitMqClientId cid) = builder cid + builder (RabbitMqTempId temp) = builder temp + +clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text clientNotificationQueueName uid cid = - "user-notifications." <> clientRoutingKey uid cid + "user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid userRoutingKey :: UserId -> Text userRoutingKey = idToText clientRoutingKey :: UserId -> ClientId -> Text clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid + +temporaryRoutingKey :: UserId -> Text +temporaryRoutingKey uid = userRoutingKey uid <> ".temporary" + +rabbitMqClientToText :: RabbitMqClientId -> Text +rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid +rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp diff --git a/libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs b/libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs index 03742511080..1e3a18c962d 100644 --- a/libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs +++ b/libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs @@ -50,8 +50,7 @@ type CannonAPI = :> "events" :> ZUser :> QueryParam' - [ -- Make this optional in https://wearezeta.atlassian.net/browse/WPB-11173 - Required, + [ Optional, Strict, Description "Client ID" ] diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index 25ad0624593..f3f5278e4bd 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -105,6 +105,7 @@ library , lens >=4.4 , lens-family-core >=1.1 , metrics-wai >=0.4 + , MonadRandom , mwc-random >=0.13 , prometheus-client , retry >=0.7 diff --git a/services/cannon/src/Cannon/API/Public.hs b/services/cannon/src/Cannon/API/Public.hs index e895429ae8b..db3b4b2d8fd 100644 --- a/services/cannon/src/Cannon/API/Public.hs +++ b/services/cannon/src/Cannon/API/Public.hs @@ -42,7 +42,7 @@ streamData userId connId clientId con = do e <- wsenv liftIO $ wsapp (mkKey userId connId) clientId e con -consumeEvents :: UserId -> ClientId -> PendingConnection -> Cannon () -consumeEvents userId clientId con = do +consumeEvents :: UserId -> Maybe ClientId -> PendingConnection -> Cannon () +consumeEvents userId mClientId con = do e <- wsenv - liftIO $ rabbitMQWebSocketApp userId clientId e con + liftIO $ rabbitMQWebSocketApp userId mClientId e con diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index d5a228bd410..33c506b9dd0 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -218,8 +218,13 @@ ackMessage chan deliveryTag multiple = do inner <- readMVar chan.inner Q.ackMsg inner deliveryTag multiple -createChannel :: (Ord key) => RabbitMqPool key -> Text -> key -> Codensity IO RabbitMqChannel -createChannel pool queue key = do +type QueueName = Text + +-- TODO: can the action simply be run after creating the consumer? +type CreateQueue = Q.Channel -> Codensity IO () + +createChannel :: (Ord key) => RabbitMqPool key -> QueueName -> CreateQueue -> key -> Codensity IO RabbitMqChannel +createChannel pool queueName createQueue key = do closedVar <- lift newEmptyMVar inner <- lift newEmptyMVar msgVar <- lift newEmptyMVar @@ -254,9 +259,11 @@ createChannel pool queue key = do if connSize > pool.opts.maxChannels then pure True else do + createQueue chan + liftIO $ Q.addChannelExceptionHandler chan handleException putMVar inner chan - void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do + void $ liftIO $ Q.consumeMsgs chan queueName Q.Ack $ \(message, envelope) -> do putMVar msgVar (Just (message, envelope)) takeMVar closedVar diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index ede22279071..9d4bb7604c4 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -7,11 +7,13 @@ import Cannon.RabbitMq import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async -import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try) +import Control.Exception (Handler (..), bracket, catch, catches, finally, throwIO, try) import Control.Lens hiding ((#)) import Control.Monad.Codensity +import Control.Monad.Random.Class import Data.Aeson hiding (Key) import Data.Id +import Data.Text qualified as T import Imports hiding (min, threadDelay) import Network.AMQP qualified as Q import Network.WebSockets @@ -20,11 +22,11 @@ import System.Logger qualified as Log import Wire.API.Event.WebSocketProtocol import Wire.API.Notification -rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp -rabbitMQWebSocketApp uid cid e pendingConn = do +rabbitMQWebSocketApp :: UserId -> Maybe ClientId -> Env -> ServerApp +rabbitMQWebSocketApp uid mcid e pendingConn = do bracket openWebSocket closeWebSocket $ \wsConn -> ( do - sendFullSyncMessageIfNeeded wsConn uid cid e + traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid sendNotifications wsConn ) `catches` [ handleClientMisbehaving wsConn, @@ -34,7 +36,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do where logClient = Log.field "user" (idToText uid) - . Log.field "client" (clientToText cid) + . Log.field "client" (maybe "" clientToText mcid) openWebSocket = acceptRequest pendingConn @@ -108,8 +110,21 @@ rabbitMQWebSocketApp uid cid e pendingConn = do sendNotifications :: WS.Connection -> IO () sendNotifications wsConn = lowerCodensity $ do + cid <- lift $ mkRabbitMqClientId mcid let key = mkKeyRabbit uid cid - chan <- createChannel e.pool (clientNotificationQueueName uid cid) key + let queueName = clientNotificationQueueName uid cid + + let createQueue chan = case mcid of + Nothing -> Codensity $ \k -> + ( do + for_ [userRoutingKey uid, temporaryRoutingKey uid] $ + Q.bindQueue chan queueName userNotificationExchangeName + k () + ) + `finally` Q.deleteQueue chan queueName + Just _ -> pure () + + chan <- createChannel e.pool queueName createQueue key let consumeRabbitMq = forever $ do eventData <- getEventData chan @@ -171,10 +186,10 @@ rabbitMQWebSocketApp uid cid e pendingConn = do sendFullSyncMessageIfNeeded :: WS.Connection -> UserId -> - ClientId -> Env -> + ClientId -> IO () -sendFullSyncMessageIfNeeded wsConn uid cid env = do +sendFullSyncMessageIfNeeded wsConn uid env cid = do row <- C.runClient env.cassandra do retry x5 $ query1 q (params LocalQuorum (uid, cid)) for_ row $ \_ -> sendFullSyncMessage uid cid wsConn env @@ -220,3 +235,8 @@ data WebSocketServerError deriving (Show) instance Exception WebSocketServerError + +mkRabbitMqClientId :: Maybe ClientId -> IO RabbitMqClientId +mkRabbitMqClientId (Just cid) = pure (RabbitMqClientId cid) +mkRabbitMqClientId Nothing = + RabbitMqTempId . T.pack <$> replicateM 8 (getRandomR ('a', 'z')) diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index 1653c82fbd9..1249ce82b88 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -80,6 +80,7 @@ import System.Logger qualified as Logger import System.Logger.Class hiding (Error, Settings, close, (.=)) import System.Random.MWC (GenIO, uniform) import UnliftIO.Async (async, cancel, pooledMapConcurrentlyN_) +import Wire.API.Notification import Wire.API.Presence ----------------------------------------------------------------------------- @@ -93,7 +94,7 @@ newtype Key = Key mkKey :: UserId -> ConnId -> Key mkKey u c = Key (toByteString' u, fromConnId c) -mkKeyRabbit :: UserId -> ClientId -> Key +mkKeyRabbit :: UserId -> RabbitMqClientId -> Key mkKeyRabbit u c = Key (toByteString' u, toByteString' c) instance ToByteString Key where diff --git a/services/gundeck/src/Gundeck/Client.hs b/services/gundeck/src/Gundeck/Client.hs index 486ab4b63c8..3adc397ca35 100644 --- a/services/gundeck/src/Gundeck/Client.hs +++ b/services/gundeck/src/Gundeck/Client.hs @@ -50,7 +50,7 @@ setupConsumableNotifications :: ClientId -> IO Text setupConsumableNotifications chan uid cid = do - let qName = clientNotificationQueueName uid cid + let qName = clientNotificationQueueName uid (RabbitMqClientId cid) void $ declareQueue chan diff --git a/services/gundeck/src/Gundeck/Push.hs b/services/gundeck/src/Gundeck/Push.hs index 994a8987eff..7c236d3ec0a 100644 --- a/services/gundeck/src/Gundeck/Push.hs +++ b/services/gundeck/src/Gundeck/Push.hs @@ -313,7 +313,9 @@ pushViaRabbitMq p = do RecipientClientsAll -> Set.singleton $ userRoutingKey r._recipientId RecipientClientsSome (toList -> cs) -> - Set.fromList $ map (clientRoutingKey r._recipientId) cs + Set.fromList $ + temporaryRoutingKey r._recipientId + : map (clientRoutingKey r._recipientId) cs for_ routingKeys $ \routingKey -> mpaPublishToRabbitMq routingKey qMsg