Skip to content

Commit

Permalink
Implement temporary queues
Browse files Browse the repository at this point in the history
  • Loading branch information
pcapriotti authored and supersven committed Dec 11, 2024
1 parent 67dab2f commit dd5f9cd
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 21 deletions.
22 changes: 20 additions & 2 deletions libs/wire-api/src/Wire/API/Notification.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ module Wire.API.Notification
userNotificationExchangeName,
userNotificationDlxName,
userNotificationDlqName,
RabbitMqClientId (..),
clientNotificationQueueName,
userRoutingKey,
temporaryRoutingKey,
clientRoutingKey,
)
where
Expand All @@ -49,6 +51,7 @@ import Control.Lens.Operators ((?~))
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Aeson.Types qualified as Aeson
import Data.Bits
import Data.ByteString.Conversion
import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap
import Data.Id
import Data.Json.Util
Expand Down Expand Up @@ -188,12 +191,27 @@ userNotificationDlxName = "dead-user-notifications"
userNotificationDlqName :: Text
userNotificationDlqName = "dead-user-notifications"

clientNotificationQueueName :: UserId -> ClientId -> Text
data RabbitMqClientId
= RabbitMqClientId ClientId
| RabbitMqTempId Text

instance ToByteString RabbitMqClientId where
builder (RabbitMqClientId cid) = builder cid
builder (RabbitMqTempId temp) = builder temp

clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text
clientNotificationQueueName uid cid =
"user-notifications." <> clientRoutingKey uid cid
"user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid

userRoutingKey :: UserId -> Text
userRoutingKey = idToText

clientRoutingKey :: UserId -> ClientId -> Text
clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid

temporaryRoutingKey :: UserId -> Text
temporaryRoutingKey uid = userRoutingKey uid <> ".temporary"

rabbitMqClientToText :: RabbitMqClientId -> Text
rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid
rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp
3 changes: 1 addition & 2 deletions libs/wire-api/src/Wire/API/Routes/Public/Cannon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type CannonAPI =
:> "events"
:> ZUser
:> QueryParam'
[ -- Make this optional in https://wearezeta.atlassian.net/browse/WPB-11173
Required,
[ Optional,
Strict,
Description "Client ID"
]
Expand Down
1 change: 1 addition & 0 deletions services/cannon/cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ library
, lens >=4.4
, lens-family-core >=1.1
, metrics-wai >=0.4
, MonadRandom
, mwc-random >=0.13
, prometheus-client
, retry >=0.7
Expand Down
6 changes: 3 additions & 3 deletions services/cannon/src/Cannon/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ streamData userId connId clientId con = do
e <- wsenv
liftIO $ wsapp (mkKey userId connId) clientId e con

consumeEvents :: UserId -> ClientId -> PendingConnection -> Cannon ()
consumeEvents userId clientId con = do
consumeEvents :: UserId -> Maybe ClientId -> PendingConnection -> Cannon ()
consumeEvents userId mClientId con = do
e <- wsenv
liftIO $ rabbitMQWebSocketApp userId clientId e con
liftIO $ rabbitMQWebSocketApp userId mClientId e con
13 changes: 10 additions & 3 deletions services/cannon/src/Cannon/RabbitMq.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,13 @@ ackMessage chan deliveryTag multiple = do
inner <- readMVar chan.inner
Q.ackMsg inner deliveryTag multiple

createChannel :: (Ord key) => RabbitMqPool key -> Text -> key -> Codensity IO RabbitMqChannel
createChannel pool queue key = do
type QueueName = Text

-- TODO: can the action simply be run after creating the consumer?
type CreateQueue = Q.Channel -> Codensity IO ()

createChannel :: (Ord key) => RabbitMqPool key -> QueueName -> CreateQueue -> key -> Codensity IO RabbitMqChannel
createChannel pool queueName createQueue key = do
closedVar <- lift newEmptyMVar
inner <- lift newEmptyMVar
msgVar <- lift newEmptyMVar
Expand Down Expand Up @@ -254,9 +259,11 @@ createChannel pool queue key = do
if connSize > pool.opts.maxChannels
then pure True
else do
createQueue chan

liftIO $ Q.addChannelExceptionHandler chan handleException
putMVar inner chan
void $ liftIO $ Q.consumeMsgs chan queue Q.Ack $ \(message, envelope) -> do
void $ liftIO $ Q.consumeMsgs chan queueName Q.Ack $ \(message, envelope) -> do
putMVar msgVar (Just (message, envelope))
takeMVar closedVar

Expand Down
36 changes: 28 additions & 8 deletions services/cannon/src/Cannon/RabbitMqConsumerApp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import Cannon.RabbitMq
import Cannon.WS hiding (env)
import Cassandra as C hiding (batch)
import Control.Concurrent.Async
import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try)
import Control.Exception (Handler (..), bracket, catch, catches, finally, throwIO, try)
import Control.Lens hiding ((#))
import Control.Monad.Codensity
import Control.Monad.Random.Class
import Data.Aeson hiding (Key)
import Data.Id
import Data.Text qualified as T
import Imports hiding (min, threadDelay)
import Network.AMQP qualified as Q
import Network.WebSockets
Expand All @@ -20,11 +22,11 @@ import System.Logger qualified as Log
import Wire.API.Event.WebSocketProtocol
import Wire.API.Notification

rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp
rabbitMQWebSocketApp uid cid e pendingConn = do
rabbitMQWebSocketApp :: UserId -> Maybe ClientId -> Env -> ServerApp
rabbitMQWebSocketApp uid mcid e pendingConn = do
bracket openWebSocket closeWebSocket $ \wsConn ->
( do
sendFullSyncMessageIfNeeded wsConn uid cid e
traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid
sendNotifications wsConn
)
`catches` [ handleClientMisbehaving wsConn,
Expand All @@ -34,7 +36,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do
where
logClient =
Log.field "user" (idToText uid)
. Log.field "client" (clientToText cid)
. Log.field "client" (maybe "<temporary>" clientToText mcid)

openWebSocket =
acceptRequest pendingConn
Expand Down Expand Up @@ -108,8 +110,21 @@ rabbitMQWebSocketApp uid cid e pendingConn = do

sendNotifications :: WS.Connection -> IO ()
sendNotifications wsConn = lowerCodensity $ do
cid <- lift $ mkRabbitMqClientId mcid
let key = mkKeyRabbit uid cid
chan <- createChannel e.pool (clientNotificationQueueName uid cid) key
let queueName = clientNotificationQueueName uid cid

let createQueue chan = case mcid of
Nothing -> Codensity $ \k ->
( do
for_ [userRoutingKey uid, temporaryRoutingKey uid] $
Q.bindQueue chan queueName userNotificationExchangeName
k ()
)
`finally` Q.deleteQueue chan queueName
Just _ -> pure ()

chan <- createChannel e.pool queueName createQueue key

let consumeRabbitMq = forever $ do
eventData <- getEventData chan
Expand Down Expand Up @@ -171,10 +186,10 @@ rabbitMQWebSocketApp uid cid e pendingConn = do
sendFullSyncMessageIfNeeded ::
WS.Connection ->
UserId ->
ClientId ->
Env ->
ClientId ->
IO ()
sendFullSyncMessageIfNeeded wsConn uid cid env = do
sendFullSyncMessageIfNeeded wsConn uid env cid = do
row <- C.runClient env.cassandra do
retry x5 $ query1 q (params LocalQuorum (uid, cid))
for_ row $ \_ -> sendFullSyncMessage uid cid wsConn env
Expand Down Expand Up @@ -220,3 +235,8 @@ data WebSocketServerError
deriving (Show)

instance Exception WebSocketServerError

mkRabbitMqClientId :: Maybe ClientId -> IO RabbitMqClientId
mkRabbitMqClientId (Just cid) = pure (RabbitMqClientId cid)
mkRabbitMqClientId Nothing =
RabbitMqTempId . T.pack <$> replicateM 8 (getRandomR ('a', 'z'))
3 changes: 2 additions & 1 deletion services/cannon/src/Cannon/WS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import System.Logger qualified as Logger
import System.Logger.Class hiding (Error, Settings, close, (.=))
import System.Random.MWC (GenIO, uniform)
import UnliftIO.Async (async, cancel, pooledMapConcurrentlyN_)
import Wire.API.Notification
import Wire.API.Presence

-----------------------------------------------------------------------------
Expand All @@ -93,7 +94,7 @@ newtype Key = Key
mkKey :: UserId -> ConnId -> Key
mkKey u c = Key (toByteString' u, fromConnId c)

mkKeyRabbit :: UserId -> ClientId -> Key
mkKeyRabbit :: UserId -> RabbitMqClientId -> Key
mkKeyRabbit u c = Key (toByteString' u, toByteString' c)

instance ToByteString Key where
Expand Down
2 changes: 1 addition & 1 deletion services/gundeck/src/Gundeck/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ setupConsumableNotifications ::
ClientId ->
IO Text
setupConsumableNotifications chan uid cid = do
let qName = clientNotificationQueueName uid cid
let qName = clientNotificationQueueName uid (RabbitMqClientId cid)
void $
declareQueue
chan
Expand Down
4 changes: 3 additions & 1 deletion services/gundeck/src/Gundeck/Push.hs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ pushViaRabbitMq p = do
RecipientClientsAll ->
Set.singleton $ userRoutingKey r._recipientId
RecipientClientsSome (toList -> cs) ->
Set.fromList $ map (clientRoutingKey r._recipientId) cs
Set.fromList $
temporaryRoutingKey r._recipientId
: map (clientRoutingKey r._recipientId) cs
for_ routingKeys $ \routingKey ->
mpaPublishToRabbitMq routingKey qMsg

Expand Down

0 comments on commit dd5f9cd

Please sign in to comment.