Skip to content

Commit

Permalink
gundeck/cannon: Fixup for temporary recipients of events (#4379)
Browse files Browse the repository at this point in the history
* gundeck: Allow temporary recipients to receive events even when the user doesn't have any regular RabbitMQ clients

* integration: Add test to ensure temp notif queues are deleted

Also:
- Extract function to create RabbitMQAdminClient for a backend resource
- Extract function to runDynamicBackends returning the BackendResources instead
of returning just the domains.

* cannon: Use exclusive auto-delete queues for temp clients

Also:

* Use Data.Unique.Unqiue to keep track of channels instead of (ByteString,
ByteString) because it is way less fuss to generate the Unqiue and it doesn't
really need to be (UserId, ClientId) tuple. This also removes polymorphism for
the RabbitMqPool type.

* Log conneciton close while draining at Debug level instead of Info

* integration: Make tests less flaky by expecting races

The race in these tests is between the RabbitMQ exchange doing the fanout for
previous events and the test creating a temporary queue.
  • Loading branch information
akshaymankar authored Dec 19, 2024
1 parent 34f8e17 commit 682bdd2
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 97 deletions.
4 changes: 4 additions & 0 deletions integration/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
, retry
, saml2-web-sso
, scientific
, servant
, servant-client
, split
, stm
, streaming-commons
Expand Down Expand Up @@ -161,6 +163,8 @@ mkDerivation {
retry
saml2-web-sso
scientific
servant
servant-client
split
stm
streaming-commons
Expand Down
2 changes: 2 additions & 0 deletions integration/integration.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ library
, retry
, saml2-web-sso
, scientific
, servant
, servant-client
, split
, stm
, streaming-commons
Expand Down
7 changes: 7 additions & 0 deletions integration/test/SetupHelpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import SAML2.WebSSO.Test.Util (SampleIdP (..), makeSampleIdPMetadata)
import Testlib.JSON
import Testlib.MockIntegrationService (mkLegalHoldSettings)
import Testlib.Prelude
import Testlib.Printing (indent)
import qualified Text.XML as XML
import qualified Text.XML.Cursor as XML
import qualified Text.XML.DSig as SAML
Expand Down Expand Up @@ -426,6 +427,12 @@ addUsersToFailureContext namesAndUsers action = do
allLines <- unlines <$> (mapM mkLine namesAndUsers)
addFailureContext allLines action

addJSONToFailureContext :: (MakesValue a) => String -> a -> App b -> App b
addJSONToFailureContext name ctx action = do
jsonStr <- prettyJSON ctx
let ctxStr = unlines [name <> ":", indent 2 jsonStr]
addFailureContext ctxStr action

registerTestIdPWithMeta :: (HasCallStack, MakesValue owner) => owner -> App Response
registerTestIdPWithMeta owner = fst <$> registerTestIdPWithMetaWithPrivateCreds owner

Expand Down
120 changes: 97 additions & 23 deletions integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import Control.Monad.RWS (asks)
import Control.Monad.Trans.Class
import Control.Retry
import Data.ByteString.Conversion (toByteString')
import Data.Proxy (Proxy (..))
import qualified Data.Text as Text
import Data.Timeout
import MLS.Util
import Network.AMQP.Extended
import Network.RabbitMqAdmin
import qualified Network.WebSockets as WS
import Notifications
import Servant.API (AsApi, ToServant, toServant)
import Servant.API.Generic (fromServant)
import qualified Servant.Client as Servant
import SetupHelpers
import Testlib.Prelude hiding (assertNoEvent)
import Testlib.ResourcePool (acquireResources)
import UnliftIO hiding (handle)

testConsumeEventsOneWebSocket :: (HasCallStack) => App ()
Expand Down Expand Up @@ -103,6 +106,57 @@ testConsumeTempEvents = do

ackEvent ws e

testConsumeTempEventsWithoutOwnClient :: (HasCallStack) => App ()
testConsumeTempEventsWithoutOwnClient = do
[alice, bob] <- createAndConnectUsers [OwnDomain, OwnDomain]

runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do
handle <- randomHandle
putHandle bob handle >>= assertSuccess

-- We cannot use 'assertEvent' here because there is a race between the temp
-- queue being created and rabbitmq fanning out the previous events.
void $ assertFindsEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "user.update"
e %. "data.event.payload.0.user.id" `shouldMatch` objId bob
e %. "data.event.payload.0.user.handle" `shouldMatch` handle

ackEvent ws e

testTemporaryQueuesAreDeletedAfterUse :: (HasCallStack) => App ()
testTemporaryQueuesAreDeletedAfterUse = do
startDynamicBackendsReturnResources [def] $ \[beResource] -> do
let domain = beResource.berDomain
rabbitmqAdmin <- mkRabbitMqAdminClientForResource beResource
queuesBeforeWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
let deadNotifsQueue = Queue {name = fromString "dead-user-notifications", vhost = fromString beResource.berVHost}
queuesBeforeWS.items `shouldMatch` [deadNotifsQueue]

[alice, bob] <- createAndConnectUsers [domain, domain]

runCodensity (createEventsWebSocket alice Nothing) $ \ws -> do
handle <- randomHandle
putHandle bob handle >>= assertSuccess

queuesDuringWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
addJSONToFailureContext "queuesDuringWS" queuesDuringWS $ do
length queuesDuringWS.items `shouldMatchInt` 2

-- We cannot use 'assertEvent' here because there is a race between the temp
-- queue being created and rabbitmq fanning out the previous events.
void $ assertFindsEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "user.update"
e %. "data.event.payload.0.user.id" `shouldMatch` objId bob
e %. "data.event.payload.0.user.handle" `shouldMatch` handle

ackEvent ws e

-- Use let binding here so 'shouldMatchEventually' retries the whole request
let queuesAfterWSM = rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
fmap (.items) queuesAfterWSM `shouldEventuallyMatch` ([deadNotifsQueue])

testMLSTempEvents :: (HasCallStack) => App ()
testMLSTempEvents = do
[alice, bob] <- createAndConnectUsers [OwnDomain, OwnDomain]
Expand All @@ -128,7 +182,9 @@ testMLSTempEvents = do

-- FUTUREWORK: we should not rely on events arriving in this particular order

void $ assertEvent ws $ \e -> do
-- We cannot use 'assertEvent' here because there is a race between the temp
-- queue being created and rabbitmq fanning out the previous events.
void $ assertFindsEvent ws $ \e -> do
e %. "type" `shouldMatch` "event"
e %. "data.event.payload.0.type" `shouldMatch` "conversation.member-join"
user <- assertOne =<< (e %. "data.event.payload.0.data.users" & asList)
Expand Down Expand Up @@ -413,22 +469,17 @@ testChannelLimit = withModifiedBackend
lift $ assertNoEvent_ ws

testChannelKilled :: (HasCallStack) => App ()
testChannelKilled = lowerCodensity $ do
pool <- lift $ asks (.resourcePool)
[backend] <- acquireResources 1 pool

domain <- startDynamicBackend backend mempty
alice <- lift $ randomUser domain def
testChannelKilled = startDynamicBackendsReturnResources [def] $ \[backend] -> do
let domain = backend.berDomain
alice <- randomUser domain def
[c1, c2] <-
lift
$ replicateM 2
replicateM 2
$ addClient alice def {acapabilities = Just ["consumable-notifications"]}
>>= getJSON 201
>>= (%. "id")
>>= asString

ws <- createEventsWebSocket alice (Just c1)
lift $ do
runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c1
Expand Down Expand Up @@ -536,14 +587,35 @@ sendAck ws deliveryTag multiple =

assertEvent :: (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a
assertEvent ws expectations = do
timeout 10_000_000 (readChan ws.events) >>= \case
Nothing -> assertFailure "No event received for 1s"
timeOutSeconds <- asks (.timeOutSeconds)
timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case
Nothing -> assertFailure $ "No event received for " <> show timeOutSeconds <> "s"
Just (Left _) -> assertFailure "Websocket closed when waiting for more events"
Just (Right e) -> do
pretty <- prettyJSON e
addFailureContext ("event:\n" <> pretty)
$ expectations e

-- | Tolerates and consumes other events before expected event
assertFindsEvent :: forall a. (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a
assertFindsEvent ws expectations = go 0
where
go :: Int -> App a
go ignoredEventCount = do
timeOutSeconds <- asks (.timeOutSeconds)
timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case
Nothing -> assertFailure $ show ignoredEventCount <> " event(s) received, no matching event received for " <> show timeOutSeconds <> "s"
Just (Left _) -> assertFailure "Websocket closed when waiting for more events"
Just (Right ev) -> do
(expectations ev)
`catch` \(_ :: AssertionFailure) -> do
ignoredEventType <-
maybe (pure "No Type") asString
=<< lookupField ev "data.event.payload.0.type"
ackEvent ws ev
addJSONToFailureContext ("Ignored Event (" <> ignoredEventType <> ")") ev
$ go (ignoredEventCount + 1)

data NoEvent = NoEvent | WebSocketDied

instance ToJSON NoEvent where
Expand Down Expand Up @@ -576,6 +648,16 @@ consumeAllEvents ws = do

killConnection :: (HasCallStack) => BackendResource -> App ()
killConnection backend = do
rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend
connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack backend.berVHost)
connection <-
assertOne
[ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0")
]
void $ rabbitmqAdminClient.deleteConnection connection.name

mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (Servant.AsClientT App))
mkRabbitMqAdminClientForResource backend = do
rc <- asks (.rabbitMQConfig)
let opts =
RabbitMqAdminOpts
Expand All @@ -589,12 +671,4 @@ killConnection backend = do
else Nothing
}
servantClient <- liftIO $ mkRabbitMqAdminClientEnv opts
name <- do
connections <- liftIO $ listConnectionsByVHost servantClient opts.vHost
connection <-
assertOne
[ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0")
]
pure connection.name

void $ liftIO $ deleteConnection servantClient name
pure . fromServant $ Servant.hoistClient (Proxy @(ToServant AdminAPI AsApi)) (liftIO @App) (toServant servantClient)
11 changes: 11 additions & 0 deletions integration/test/Testlib/Assertions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import Control.Exception as E
import Control.Lens ((^?))
import qualified Control.Lens.Plated as LP
import Control.Monad
import qualified Control.Monad.Catch as Catch
import Control.Monad.Reader
import Control.Retry
import Data.Aeson (Value)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Diff as AD
Expand Down Expand Up @@ -62,6 +64,15 @@ shouldMatch ::
App ()
shouldMatch = shouldMatchWithMsg Nothing

-- | Retries every 100ms until timeOutSeconds from Env is reached
shouldEventuallyMatch :: (MakesValue a, MakesValue b, HasCallStack) => a -> b -> App ()
shouldEventuallyMatch a b = do
timeout <- asks (.timeOutSeconds)
recovering
(limitRetriesByCumulativeDelay (timeout * 1_000_000) $ constantDelay 100_000)
((\_ -> Catch.Handler $ \(_ :: AssertionFailure) -> pure True) : skipAsyncExceptions)
(const $ a `shouldMatch` b)

shouldMatchWithMsg ::
(MakesValue a, MakesValue b, HasCallStack) =>
-- | Message to be added to failure report
Expand Down
7 changes: 6 additions & 1 deletion integration/test/Testlib/ModService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Testlib.ModService
( withModifiedBackend,
startDynamicBackend,
startDynamicBackends,
startDynamicBackendsReturnResources,
traverseConcurrentlyCodensity,
)
where
Expand Down Expand Up @@ -120,6 +121,10 @@ traverseConcurrentlyCodensity f args = do

startDynamicBackends :: [ServiceOverrides] -> ([String] -> App a) -> App a
startDynamicBackends beOverrides k = do
startDynamicBackendsReturnResources beOverrides (\resources -> k $ map (.berDomain) resources)

startDynamicBackendsReturnResources :: [ServiceOverrides] -> ([BackendResource] -> App a) -> App a
startDynamicBackendsReturnResources beOverrides k = do
let startDynamicBackendsCodensity = do
when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported."
pool <- asks (.resourcePool)
Expand All @@ -128,7 +133,7 @@ startDynamicBackends beOverrides k = do
traverseConcurrentlyCodensity
(void . uncurry startDynamicBackend)
(zip resources beOverrides)
pure $ map (.berDomain) resources
pure resources
runCodensity startDynamicBackendsCodensity k

startDynamicBackend :: (HasCallStack) => BackendResource -> ServiceOverrides -> Codensity App String
Expand Down
18 changes: 2 additions & 16 deletions libs/wire-api/src/Wire/API/Notification.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module Wire.API.Notification
userNotificationExchangeName,
userNotificationDlxName,
userNotificationDlqName,
RabbitMqClientId (..),
clientNotificationQueueName,
userRoutingKey,
temporaryRoutingKey,
Expand All @@ -51,7 +50,6 @@ import Control.Lens.Operators ((?~))
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Aeson.Types qualified as Aeson
import Data.Bits
import Data.ByteString.Conversion
import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap
import Data.Id
import Data.Json.Util
Expand Down Expand Up @@ -191,17 +189,9 @@ userNotificationDlxName = "dead-user-notifications"
userNotificationDlqName :: Text
userNotificationDlqName = "dead-user-notifications"

data RabbitMqClientId
= RabbitMqClientId ClientId
| RabbitMqTempId Text

instance ToByteString RabbitMqClientId where
builder (RabbitMqClientId cid) = builder cid
builder (RabbitMqTempId temp) = builder temp

clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text
clientNotificationQueueName :: UserId -> ClientId -> Text
clientNotificationQueueName uid cid =
"user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid
"user-notifications." <> userRoutingKey uid <> "." <> clientToText cid

userRoutingKey :: UserId -> Text
userRoutingKey = idToText
Expand All @@ -211,7 +201,3 @@ clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid

temporaryRoutingKey :: UserId -> Text
temporaryRoutingKey uid = userRoutingKey uid <> ".temporary"

rabbitMqClientToText :: RabbitMqClientId -> Text
rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid
rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp
1 change: 0 additions & 1 deletion services/cannon/cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ library
, lens >=4.4
, lens-family-core >=1.1
, metrics-wai >=0.4
, MonadRandom
, mwc-random >=0.13
, prometheus-client
, retry >=0.7
Expand Down
2 changes: 0 additions & 2 deletions services/cannon/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
, lens-family-core
, lib
, metrics-wai
, MonadRandom
, mwc-random
, prometheus-client
, QuickCheck
Expand Down Expand Up @@ -92,7 +91,6 @@ mkDerivation {
lens
lens-family-core
metrics-wai
MonadRandom
mwc-random
prometheus-client
retry
Expand Down
Loading

0 comments on commit 682bdd2

Please sign in to comment.