diff --git a/integration/default.nix b/integration/default.nix index 76522a8d9c7..32715de71bc 100644 --- a/integration/default.nix +++ b/integration/default.nix @@ -64,6 +64,8 @@ , retry , saml2-web-sso , scientific +, servant +, servant-client , split , stm , streaming-commons @@ -161,6 +163,8 @@ mkDerivation { retry saml2-web-sso scientific + servant + servant-client split stm streaming-commons diff --git a/integration/integration.cabal b/integration/integration.cabal index b264da1c346..8c4e3eb456c 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -259,6 +259,8 @@ library , retry , saml2-web-sso , scientific + , servant + , servant-client , split , stm , streaming-commons diff --git a/integration/test/SetupHelpers.hs b/integration/test/SetupHelpers.hs index 15c2c128219..e9afe467a84 100644 --- a/integration/test/SetupHelpers.hs +++ b/integration/test/SetupHelpers.hs @@ -36,6 +36,7 @@ import SAML2.WebSSO.Test.Util (SampleIdP (..), makeSampleIdPMetadata) import Testlib.JSON import Testlib.MockIntegrationService (mkLegalHoldSettings) import Testlib.Prelude +import Testlib.Printing (indent) import qualified Text.XML as XML import qualified Text.XML.Cursor as XML import qualified Text.XML.DSig as SAML @@ -426,6 +427,12 @@ addUsersToFailureContext namesAndUsers action = do allLines <- unlines <$> (mapM mkLine namesAndUsers) addFailureContext allLines action +addJSONToFailureContext :: (MakesValue a) => String -> a -> App b -> App b +addJSONToFailureContext name ctx action = do + jsonStr <- prettyJSON ctx + let ctxStr = unlines [name <> ":", indent 2 jsonStr] + addFailureContext ctxStr action + registerTestIdPWithMeta :: (HasCallStack, MakesValue owner) => owner -> App Response registerTestIdPWithMeta owner = fst <$> registerTestIdPWithMetaWithPrivateCreds owner diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index 6edfc70d0db..4f7e9cd0620 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -11,6 +11,7 @@ import Control.Monad.RWS (asks) import Control.Monad.Trans.Class import Control.Retry import Data.ByteString.Conversion (toByteString') +import Data.Proxy (Proxy (..)) import qualified Data.Text as Text import Data.Timeout import MLS.Util @@ -18,9 +19,11 @@ import Network.AMQP.Extended import Network.RabbitMqAdmin import qualified Network.WebSockets as WS import Notifications +import Servant.API (AsApi, ToServant, toServant) +import Servant.API.Generic (fromServant) +import qualified Servant.Client as Servant import SetupHelpers import Testlib.Prelude hiding (assertNoEvent) -import Testlib.ResourcePool (acquireResources) import UnliftIO hiding (handle) testConsumeEventsOneWebSocket :: (HasCallStack) => App () @@ -103,6 +106,57 @@ testConsumeTempEvents = do ackEvent ws e +testConsumeTempEventsWithoutOwnClient :: (HasCallStack) => App () +testConsumeTempEventsWithoutOwnClient = do + [alice, bob] <- createAndConnectUsers [OwnDomain, OwnDomain] + + runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do + handle <- randomHandle + putHandle bob handle >>= assertSuccess + + -- We cannot use 'assertEvent' here because there is a race between the temp + -- queue being created and rabbitmq fanning out the previous events. + void $ assertFindsEvent ws $ \e -> do + e %. "type" `shouldMatch` "event" + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.id" `shouldMatch` objId bob + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + + ackEvent ws e + +testTemporaryQueuesAreDeletedAfterUse :: (HasCallStack) => App () +testTemporaryQueuesAreDeletedAfterUse = do + startDynamicBackendsReturnResources [def] $ \[beResource] -> do + let domain = beResource.berDomain + rabbitmqAdmin <- mkRabbitMqAdminClientForResource beResource + queuesBeforeWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 + let deadNotifsQueue = Queue {name = fromString "dead-user-notifications", vhost = fromString beResource.berVHost} + queuesBeforeWS.items `shouldMatch` [deadNotifsQueue] + + [alice, bob] <- createAndConnectUsers [domain, domain] + + runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do + handle <- randomHandle + putHandle bob handle >>= assertSuccess + + queuesDuringWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 + addJSONToFailureContext "queuesDuringWS" queuesDuringWS $ do + length queuesDuringWS.items `shouldMatchInt` 2 + + -- We cannot use 'assertEvent' here because there is a race between the temp + -- queue being created and rabbitmq fanning out the previous events. + void $ assertFindsEvent ws $ \e -> do + e %. "type" `shouldMatch` "event" + e %. "data.event.payload.0.type" `shouldMatch` "user.update" + e %. "data.event.payload.0.user.id" `shouldMatch` objId bob + e %. "data.event.payload.0.user.handle" `shouldMatch` handle + + ackEvent ws e + + -- Use let binding here so 'shouldMatchEventually' retries the whole request + let queuesAfterWSM = rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1 + fmap (.items) queuesAfterWSM `shouldEventuallyMatch` ([deadNotifsQueue]) + testMLSTempEvents :: (HasCallStack) => App () testMLSTempEvents = do [alice, bob] <- createAndConnectUsers [OwnDomain, OwnDomain] @@ -128,7 +182,9 @@ testMLSTempEvents = do -- FUTUREWORK: we should not rely on events arriving in this particular order - void $ assertEvent ws $ \e -> do + -- We cannot use 'assertEvent' here because there is a race between the temp + -- queue being created and rabbitmq fanning out the previous events. + void $ assertFindsEvent ws $ \e -> do e %. "type" `shouldMatch` "event" e %. "data.event.payload.0.type" `shouldMatch` "conversation.member-join" user <- assertOne =<< (e %. "data.event.payload.0.data.users" & asList) @@ -413,22 +469,17 @@ testChannelLimit = withModifiedBackend lift $ assertNoEvent_ ws testChannelKilled :: (HasCallStack) => App () -testChannelKilled = lowerCodensity $ do - pool <- lift $ asks (.resourcePool) - [backend] <- acquireResources 1 pool - - domain <- startDynamicBackend backend mempty - alice <- lift $ randomUser domain def +testChannelKilled = startDynamicBackendsReturnResources [def] $ \[backend] -> do + let domain = backend.berDomain + alice <- randomUser domain def [c1, c2] <- - lift - $ replicateM 2 + replicateM 2 $ addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201 >>= (%. "id") >>= asString - ws <- createEventsWebSocket alice (Just c1) - lift $ do + runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do assertEvent ws $ \e -> do e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" e %. "data.event.payload.0.client.id" `shouldMatch` c1 @@ -536,14 +587,35 @@ sendAck ws deliveryTag multiple = assertEvent :: (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a assertEvent ws expectations = do - timeout 10_000_000 (readChan ws.events) >>= \case - Nothing -> assertFailure "No event received for 1s" + timeOutSeconds <- asks (.timeOutSeconds) + timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case + Nothing -> assertFailure $ "No event received for " <> show timeOutSeconds <> "s" Just (Left _) -> assertFailure "Websocket closed when waiting for more events" Just (Right e) -> do pretty <- prettyJSON e addFailureContext ("event:\n" <> pretty) $ expectations e +-- | Tolerates and consumes other events before expected event +assertFindsEvent :: forall a. (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a +assertFindsEvent ws expectations = go 0 + where + go :: Int -> App a + go ignoredEventCount = do + timeOutSeconds <- asks (.timeOutSeconds) + timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case + Nothing -> assertFailure $ show ignoredEventCount <> " event(s) received, no matching event received for " <> show timeOutSeconds <> "s" + Just (Left _) -> assertFailure "Websocket closed when waiting for more events" + Just (Right ev) -> do + (expectations ev) + `catch` \(_ :: AssertionFailure) -> do + ignoredEventType <- + maybe (pure "No Type") asString + =<< lookupField ev "data.event.payload.0.type" + ackEvent ws ev + addJSONToFailureContext ("Ignored Event (" <> ignoredEventType <> ")") ev + $ go (ignoredEventCount + 1) + data NoEvent = NoEvent | WebSocketDied instance ToJSON NoEvent where @@ -576,6 +648,16 @@ consumeAllEvents ws = do killConnection :: (HasCallStack) => BackendResource -> App () killConnection backend = do + rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend + connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack backend.berVHost) + connection <- + assertOne + [ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0") + ] + void $ rabbitmqAdminClient.deleteConnection connection.name + +mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (Servant.AsClientT App)) +mkRabbitMqAdminClientForResource backend = do rc <- asks (.rabbitMQConfig) let opts = RabbitMqAdminOpts @@ -589,12 +671,4 @@ killConnection backend = do else Nothing } servantClient <- liftIO $ mkRabbitMqAdminClientEnv opts - name <- do - connections <- liftIO $ listConnectionsByVHost servantClient opts.vHost - connection <- - assertOne - [ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0") - ] - pure connection.name - - void $ liftIO $ deleteConnection servantClient name + pure . fromServant $ Servant.hoistClient (Proxy @(ToServant AdminAPI AsApi)) (liftIO @App) (toServant servantClient) diff --git a/integration/test/Testlib/Assertions.hs b/integration/test/Testlib/Assertions.hs index 8ab015156f9..569eb5d4ca2 100644 --- a/integration/test/Testlib/Assertions.hs +++ b/integration/test/Testlib/Assertions.hs @@ -7,7 +7,9 @@ import Control.Exception as E import Control.Lens ((^?)) import qualified Control.Lens.Plated as LP import Control.Monad +import qualified Control.Monad.Catch as Catch import Control.Monad.Reader +import Control.Retry import Data.Aeson (Value) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Diff as AD @@ -62,6 +64,15 @@ shouldMatch :: App () shouldMatch = shouldMatchWithMsg Nothing +-- | Retries every 100ms until timeOutSeconds from Env is reached +shouldEventuallyMatch :: (MakesValue a, MakesValue b, HasCallStack) => a -> b -> App () +shouldEventuallyMatch a b = do + timeout <- asks (.timeOutSeconds) + recovering + (limitRetriesByCumulativeDelay (timeout * 1_000_000) $ constantDelay 100_000) + ((\_ -> Catch.Handler $ \(_ :: AssertionFailure) -> pure True) : skipAsyncExceptions) + (const $ a `shouldMatch` b) + shouldMatchWithMsg :: (MakesValue a, MakesValue b, HasCallStack) => -- | Message to be added to failure report diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index 31716b94e9c..40338685ec3 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -4,6 +4,7 @@ module Testlib.ModService ( withModifiedBackend, startDynamicBackend, startDynamicBackends, + startDynamicBackendsReturnResources, traverseConcurrentlyCodensity, ) where @@ -120,6 +121,10 @@ traverseConcurrentlyCodensity f args = do startDynamicBackends :: [ServiceOverrides] -> ([String] -> App a) -> App a startDynamicBackends beOverrides k = do + startDynamicBackendsReturnResources beOverrides (\resources -> k $ map (.berDomain) resources) + +startDynamicBackendsReturnResources :: [ServiceOverrides] -> ([BackendResource] -> App a) -> App a +startDynamicBackendsReturnResources beOverrides k = do let startDynamicBackendsCodensity = do when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported." pool <- asks (.resourcePool) @@ -128,7 +133,7 @@ startDynamicBackends beOverrides k = do traverseConcurrentlyCodensity (void . uncurry startDynamicBackend) (zip resources beOverrides) - pure $ map (.berDomain) resources + pure resources runCodensity startDynamicBackendsCodensity k startDynamicBackend :: (HasCallStack) => BackendResource -> ServiceOverrides -> Codensity App String diff --git a/libs/wire-api/src/Wire/API/Notification.hs b/libs/wire-api/src/Wire/API/Notification.hs index 4077d25416a..5183f977e25 100644 --- a/libs/wire-api/src/Wire/API/Notification.hs +++ b/libs/wire-api/src/Wire/API/Notification.hs @@ -38,7 +38,6 @@ module Wire.API.Notification userNotificationExchangeName, userNotificationDlxName, userNotificationDlqName, - RabbitMqClientId (..), clientNotificationQueueName, userRoutingKey, temporaryRoutingKey, @@ -51,7 +50,6 @@ import Control.Lens.Operators ((?~)) import Data.Aeson (FromJSON (..), ToJSON (..)) import Data.Aeson.Types qualified as Aeson import Data.Bits -import Data.ByteString.Conversion import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap import Data.Id import Data.Json.Util @@ -191,17 +189,9 @@ userNotificationDlxName = "dead-user-notifications" userNotificationDlqName :: Text userNotificationDlqName = "dead-user-notifications" -data RabbitMqClientId - = RabbitMqClientId ClientId - | RabbitMqTempId Text - -instance ToByteString RabbitMqClientId where - builder (RabbitMqClientId cid) = builder cid - builder (RabbitMqTempId temp) = builder temp - -clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text +clientNotificationQueueName :: UserId -> ClientId -> Text clientNotificationQueueName uid cid = - "user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid + "user-notifications." <> userRoutingKey uid <> "." <> clientToText cid userRoutingKey :: UserId -> Text userRoutingKey = idToText @@ -211,7 +201,3 @@ clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid temporaryRoutingKey :: UserId -> Text temporaryRoutingKey uid = userRoutingKey uid <> ".temporary" - -rabbitMqClientToText :: RabbitMqClientId -> Text -rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid -rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index f3f5278e4bd..25ad0624593 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -105,7 +105,6 @@ library , lens >=4.4 , lens-family-core >=1.1 , metrics-wai >=0.4 - , MonadRandom , mwc-random >=0.13 , prometheus-client , retry >=0.7 diff --git a/services/cannon/default.nix b/services/cannon/default.nix index bde88697e85..c62056faa23 100644 --- a/services/cannon/default.nix +++ b/services/cannon/default.nix @@ -31,7 +31,6 @@ , lens-family-core , lib , metrics-wai -, MonadRandom , mwc-random , prometheus-client , QuickCheck @@ -92,7 +91,6 @@ mkDerivation { lens lens-family-core metrics-wai - MonadRandom mwc-random prometheus-client retry diff --git a/services/cannon/src/Cannon/RabbitMq.hs b/services/cannon/src/Cannon/RabbitMq.hs index e6ba7d777cc..2bec0a61c66 100644 --- a/services/cannon/src/Cannon/RabbitMq.hs +++ b/services/cannon/src/Cannon/RabbitMq.hs @@ -27,6 +27,7 @@ import Data.List.Extra import Data.Map qualified as Map import Data.Text qualified as T import Data.Timeout +import Data.Unique import Imports hiding (threadDelay) import Network.AMQP qualified as Q import Network.AMQP.Extended @@ -41,16 +42,16 @@ data RabbitMqPoolException instance Exception RabbitMqPoolException -data PooledConnection key = PooledConnection +data PooledConnection = PooledConnection { connId :: Word64, inner :: Q.Connection, - channels :: !(Map key Q.Channel) + channels :: !(Map Unique Q.Channel) } -data RabbitMqPool key = RabbitMqPool +data RabbitMqPool = RabbitMqPool { opts :: RabbitMqPoolOptions, nextId :: TVar Word64, - connections :: TVar [PooledConnection key], + connections :: TVar [PooledConnection], -- | draining mode draining :: TVar Bool, logger :: Logger, @@ -64,7 +65,7 @@ data RabbitMqPoolOptions = RabbitMqPoolOptions retryEnabled :: Bool } -createRabbitMqPool :: (Ord key) => RabbitMqPoolOptions -> Logger -> Codensity IO (RabbitMqPool key) +createRabbitMqPool :: RabbitMqPoolOptions -> Logger -> Codensity IO RabbitMqPool createRabbitMqPool opts logger = Codensity $ bracket create destroy where create = do @@ -78,7 +79,7 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy pure pool destroy pool = putMVar pool.deadVar () -drainRabbitMqPool :: (ToByteString key) => RabbitMqPool key -> DrainOpts -> IO () +drainRabbitMqPool :: RabbitMqPool -> DrainOpts -> IO () drainRabbitMqPool pool opts = do atomically $ writeTVar pool.draining True @@ -118,11 +119,11 @@ drainRabbitMqPool pool opts = do (liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond)) Log.info pool.logger $ Log.msg (Log.val "Draining complete") where - closeChannel :: (ToByteString key) => Log.Logger -> (key, Q.Channel) -> IO () + closeChannel :: Log.Logger -> (Unique, Q.Channel) -> IO () closeChannel l (key, chan) = do - Log.info l $ + Log.debug l $ Log.msg (Log.val "closing rabbitmq channel") - . Log.field "key" (toByteString' key) + . Log.field "key_hash" (toByteString' $ hashUnique key) Q.closeChannel chan logExpired :: Log.Logger -> Word64 -> IO () @@ -139,7 +140,7 @@ drainRabbitMqPool pool opts = do . Log.field "batchSize" batchSize . Log.field "maxNumberOfBatches" m -createConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key) +createConnection :: RabbitMqPool -> IO PooledConnection createConnection pool = mask_ $ do conn <- openConnection pool mpconn <- runMaybeT . atomically $ do @@ -176,7 +177,7 @@ createConnection pool = mask_ $ do putMVar closedVar () pure pconn -openConnection :: RabbitMqPool key -> IO Q.Connection +openConnection :: RabbitMqPool -> IO Q.Connection openConnection pool = do -- This might not be the correct connection ID that will eventually be -- assigned to this connection, since there are potential races with other @@ -229,13 +230,14 @@ ackMessage chan deliveryTag multiple = do type QueueName = Text -type CreateQueue = Q.Channel -> Codensity IO () +type CreateQueue = Q.Channel -> Codensity IO QueueName -createChannel :: (Ord key) => RabbitMqPool key -> QueueName -> CreateQueue -> key -> Codensity IO RabbitMqChannel -createChannel pool queueName createQueue key = do +createChannel :: RabbitMqPool -> CreateQueue -> Codensity IO RabbitMqChannel +createChannel pool createQueue = do closedVar <- lift newEmptyMVar inner <- lift newEmptyMVar msgVar <- lift newEmptyMVar + key <- lift newUnique let handleException e = do retry <- case (Q.isNormalChannelClose e, fromException e) of @@ -267,7 +269,7 @@ createChannel pool queueName createQueue key = do if connSize > pool.opts.maxChannels then pure True else do - createQueue chan + queueName <- createQueue chan liftIO $ Q.addChannelExceptionHandler chan handleException putMVar inner chan @@ -286,7 +288,7 @@ createChannel pool queueName createQueue key = do `finally` putMVar msgVar Nothing pure RabbitMqChannel {inner = inner, msgVar = msgVar} -acquireConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key) +acquireConnection :: RabbitMqPool -> IO PooledConnection acquireConnection pool = do findConnection pool >>= \case Nothing -> do @@ -301,7 +303,7 @@ acquireConnection pool = do pure conn Just conn -> pure conn -findConnection :: RabbitMqPool key -> IO (Maybe (PooledConnection key)) +findConnection :: RabbitMqPool -> IO (Maybe PooledConnection) findConnection pool = (either throwIO pure <=< (atomically . runExceptT . runMaybeT)) $ do conns <- lift . lift $ readTVar pool.connections guard (notNull conns) @@ -313,7 +315,7 @@ findConnection pool = (either throwIO pure <=< (atomically . runExceptT . runMay else mzero pure pconn -releaseConnection :: (Ord key) => RabbitMqPool key -> key -> PooledConnection key -> IO () +releaseConnection :: RabbitMqPool -> Unique -> PooledConnection -> IO () releaseConnection pool key conn = atomically $ do modifyTVar pool.connections $ map $ \c -> if c.connId == conn.connId diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index b1d1c1196bb..312e7c7937f 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -7,13 +7,11 @@ import Cannon.RabbitMq import Cannon.WS hiding (env) import Cassandra as C hiding (batch) import Control.Concurrent.Async -import Control.Exception (Handler (..), bracket, catch, catches, finally, throwIO, try) +import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try) import Control.Lens hiding ((#)) import Control.Monad.Codensity -import Control.Monad.Random.Class import Data.Aeson hiding (Key) import Data.Id -import Data.Text qualified as T import Imports hiding (min, threadDelay) import Network.AMQP (newQueue) import Network.AMQP qualified as Q @@ -111,22 +109,20 @@ rabbitMQWebSocketApp uid mcid e pendingConn = do sendNotifications :: WS.Connection -> IO () sendNotifications wsConn = lowerCodensity $ do - cid <- lift $ mkRabbitMqClientId mcid - let key = mkKeyRabbit uid cid - let queueName = clientNotificationQueueName uid cid - let createQueue chan = case mcid of - Nothing -> Codensity $ \k -> - ( do - void $ Q.declareQueue chan newQueue {Q.queueName = queueName} - for_ [userRoutingKey uid, temporaryRoutingKey uid] $ - Q.bindQueue chan queueName userNotificationExchangeName - k () - ) - `finally` Q.deleteQueue chan queueName - Just _ -> pure () - - chan <- createChannel e.pool queueName createQueue key + Nothing -> Codensity $ \k -> do + (queueName, _, _) <- + Q.declareQueue chan $ + newQueue + { Q.queueExclusive = True, + Q.queueAutoDelete = True + } + for_ [userRoutingKey uid, temporaryRoutingKey uid] $ + Q.bindQueue chan queueName userNotificationExchangeName + k queueName + Just cid -> Codensity $ \k -> k $ clientNotificationQueueName uid cid + + chan <- createChannel e.pool createQueue let consumeRabbitMq = forever $ do eventData <- getEventData chan @@ -237,8 +233,3 @@ data WebSocketServerError deriving (Show) instance Exception WebSocketServerError - -mkRabbitMqClientId :: Maybe ClientId -> IO RabbitMqClientId -mkRabbitMqClientId (Just cid) = pure (RabbitMqClientId cid) -mkRabbitMqClientId Nothing = - RabbitMqTempId . T.pack <$> replicateM 8 (getRandomR ('a', 'z')) diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index 146bd5519bd..70700906fec 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -44,6 +44,7 @@ import Control.Monad.Catch import Control.Monad.Codensity import Data.Id import Data.Text.Encoding +import Data.Unique import Imports import Network.AMQP qualified as Q import Network.AMQP.Extended (AmqpEndpoint) @@ -63,7 +64,7 @@ data Env = Env { opts :: !Opts, applog :: !Logger, websockets :: !(Dict Key Websocket), - rabbitConnections :: (Dict Key Q.Connection), + rabbitConnections :: (Dict Unique Q.Connection), reqId :: !RequestId, env :: !WS.Env } @@ -103,7 +104,7 @@ mkEnv :: ClientState -> Logger -> Dict Key Websocket -> - Dict Key Q.Connection -> + Dict Unique Q.Connection -> Manager -> GenIO -> Clock -> diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index 1249ce82b88..d35002dcf95 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -40,7 +40,6 @@ module Cannon.WS connIdent, Key, mkKey, - mkKeyRabbit, key2bytes, client, sendMsg, @@ -70,6 +69,7 @@ import Data.Id (ClientId, ConnId (..), UserId, defRequestId) import Data.List.Extra (chunksOf) import Data.Text.Encoding (decodeUtf8) import Data.Timeout (TimeoutUnit (..), (#)) +import Data.Unique import Imports hiding (threadDelay) import Network.AMQP qualified as Q import Network.HTTP.Types.Method @@ -80,7 +80,6 @@ import System.Logger qualified as Logger import System.Logger.Class hiding (Error, Settings, close, (.=)) import System.Random.MWC (GenIO, uniform) import UnliftIO.Async (async, cancel, pooledMapConcurrentlyN_) -import Wire.API.Notification import Wire.API.Presence ----------------------------------------------------------------------------- @@ -94,9 +93,6 @@ newtype Key = Key mkKey :: UserId -> ConnId -> Key mkKey u c = Key (toByteString' u, fromConnId c) -mkKeyRabbit :: UserId -> RabbitMqClientId -> Key -mkKeyRabbit u c = Key (toByteString' u, toByteString' c) - instance ToByteString Key where builder = B.fromByteString . key2bytes @@ -155,12 +151,12 @@ data Env = Env logg :: !Logger, manager :: !Manager, websockets :: !(Dict Key Websocket), - rabbitConnections :: !(Dict Key Q.Connection), + rabbitConnections :: !(Dict Unique Q.Connection), rand :: !GenIO, clock :: !Clock, drainOpts :: DrainOpts, cassandra :: ClientState, - pool :: RabbitMqPool Key + pool :: RabbitMqPool } setRequestId :: RequestId -> Env -> Env @@ -203,12 +199,12 @@ env :: Logger -> Manager -> Dict Key Websocket -> - Dict Key Q.Connection -> + Dict Unique Q.Connection -> GenIO -> Clock -> DrainOpts -> ClientState -> - RabbitMqPool Key -> + RabbitMqPool -> Env env leh lp gh gp = Env leh lp (Bilge.host gh . Bilge.port gp $ empty) (RequestId defRequestId) diff --git a/services/gundeck/src/Gundeck/Client.hs b/services/gundeck/src/Gundeck/Client.hs index 3adc397ca35..486ab4b63c8 100644 --- a/services/gundeck/src/Gundeck/Client.hs +++ b/services/gundeck/src/Gundeck/Client.hs @@ -50,7 +50,7 @@ setupConsumableNotifications :: ClientId -> IO Text setupConsumableNotifications chan uid cid = do - let qName = clientNotificationQueueName uid (RabbitMqClientId cid) + let qName = clientNotificationQueueName uid cid void $ declareQueue chan diff --git a/services/gundeck/src/Gundeck/Push.hs b/services/gundeck/src/Gundeck/Push.hs index 7c236d3ec0a..aa31968b22f 100644 --- a/services/gundeck/src/Gundeck/Push.hs +++ b/services/gundeck/src/Gundeck/Push.hs @@ -185,7 +185,11 @@ splitPush clientsFull p = do -- Checking for rabbitmqClientIds first ensures that we fall back to -- old behaviour even if legacyClientIds is empty too. This way we -- won't break things before clients are ready for it. - (That rcpt) + -- + -- We return all clients for RabbitMQ even if there are no real + -- clients so a temporary client can still read the notifications on + -- RabbitMQ. + (These rcpt {_recipientClients = RecipientClientsAll} rcpt) (_, []) -> (This rcpt) (r : rs, l : ls) ->