Skip to content

Commit

Permalink
Merge pull request #3635 from wireapp/pcapriotti/mls-message-queue
Browse files Browse the repository at this point in the history
[WPB-4984] Queue remote MLS messages
  • Loading branch information
pcapriotti authored Oct 10, 2023
2 parents 32af741 + 4048b46 commit 226a4ef
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 249 deletions.
1 change: 1 addition & 0 deletions changelog.d/6-federation/wpb-4984-queueing
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remote MLS messages get queued via RabbitMQ
2 changes: 1 addition & 1 deletion integration/test/MLS/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ createGroup cid conv = do
Nothing -> pure ()
resetGroup cid conv

createSubConv :: ClientIdentity -> String -> App ()
createSubConv :: HasCallStack => ClientIdentity -> String -> App ()
createSubConv cid subId = do
mls <- getMLSState
sub <- getSubConversation cid mls.convId subId >>= getJSON 200
Expand Down
12 changes: 7 additions & 5 deletions integration/test/Test/MLS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Data.ByteString.Base64 qualified as Base64
import Data.ByteString.Char8 qualified as B8
import Data.Text.Encoding qualified as T
import MLS.Util
import Notifications
import SetupHelpers
import Testlib.Prelude

Expand Down Expand Up @@ -282,9 +283,11 @@ testMLSProtocolUpgrade secondDomain = do
void $ createPendingProposalCommit alice1 >>= sendAndConsumeCommitBundle
void $ createExternalCommit bob1 Nothing >>= sendAndConsumeCommitBundle

-- charlie is added to the group
void $ uploadNewKeyPackage charlie1
void $ createAddCommit alice1 [charlie] >>= sendAndConsumeCommitBundle
void $ withWebSocket bob $ \ws -> do
-- charlie is added to the group
void $ uploadNewKeyPackage charlie1
void $ createAddCommit alice1 [charlie] >>= sendAndConsumeCommitBundle
awaitMatch 10 isNewMLSMessageNotif ws

supportMLS alice
bindResponse (putConversationProtocol bob conv "mls") $ \resp -> do
Expand All @@ -300,8 +303,7 @@ testMLSProtocolUpgrade secondDomain = do
bindResponse (putConversationProtocol bob conv "mls") $ \resp -> do
resp.status `shouldMatchInt` 200
for_ wss $ \ws -> do
let isMessage n = nPayload n %. "type" `isEqual` "conversation.mls-message-add"
n <- awaitMatch 3 isMessage ws
n <- awaitMatch 3 isNewMLSMessageNotif ws
msg <- asByteString (nPayload n %. "data") >>= showMessage alice1
let leafIndexCharlie = 2
msg %. "message.content.body.Proposal.Remove.removed" `shouldMatchInt` leafIndexCharlie
Expand Down
41 changes: 32 additions & 9 deletions integration/test/Test/MLS/Message.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,42 @@
module Test.MLS.Message where

import API.Galley
import MLS.Util
import Notifications
import SetupHelpers
import Testlib.Prelude

-- | Test happy case of federated MLS message sending in both directions.
testApplicationMessage :: HasCallStack => App ()
testApplicationMessage = do
-- local alice and alex, remote bob
[alice, alex, bob, betty] <-
createUsers
[OwnDomain, OwnDomain, OtherDomain, OtherDomain]
for_ [alex, bob, betty] $ \user -> connectTwoUsers alice user

clients@[alice1, _alice2, alex1, _alex2, bob1, _bob2, _, _] <-
traverse
(createMLSClient def)
[alice, alice, alex, alex, bob, bob, betty, betty]
traverse_ uploadNewKeyPackage clients
void $ createNewGroup alice1

withWebSockets [alice, alex, bob, betty] $ \wss -> do
-- alice adds all other users (including her own client)
void $ createAddCommit alice1 [alice, alex, bob, betty] >>= sendAndConsumeCommitBundle
traverse_ (awaitMatch 10 isMemberJoinNotif) wss

-- alex sends a message
void $ createApplicationMessage alex1 "hello" >>= sendAndConsumeMessage
traverse_ (awaitMatch 10 isNewMLSMessageNotif) wss

-- bob sends a message
void $ createApplicationMessage bob1 "hey" >>= sendAndConsumeMessage
traverse_ (awaitMatch 10 isNewMLSMessageNotif) wss

testAppMessageSomeReachable :: HasCallStack => App ()
testAppMessageSomeReachable = do
(alice1, charlie) <- startDynamicBackends [mempty] $ \[thirdDomain] -> do
alice1 <- startDynamicBackends [mempty] $ \[thirdDomain] -> do
ownDomain <- make OwnDomain & asString
otherDomain <- make OtherDomain & asString
[alice, bob, charlie] <- createAndConnectUsers [ownDomain, otherDomain, thirdDomain]
Expand All @@ -19,11 +47,6 @@ testAppMessageSomeReachable = do
void $ withWebSocket charlie $ \ws -> do
void $ createAddCommit alice1 [bob, charlie] >>= sendAndConsumeCommitBundle
awaitMatch 10 isMemberJoinNotif ws
pure (alice1, charlie)

mp <- createApplicationMessage alice1 "hi, bob!"
bindResponse (postMLSMessage mp.sender mp.message) $ \resp -> do
resp.status `shouldMatchInt` 201
pure alice1

charlieId <- charlie %. "qualified_id"
resp.json %. "failed_to_send" `shouldMatchSet` [charlieId]
void $ createApplicationMessage alice1 "hi, bob!" >>= sendAndConsumeMessage
4 changes: 2 additions & 2 deletions integration/test/Test/MLS/SubConversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ testDeleteParentOfSubConv secondDomain = do
(_, qcnv) <- createNewGroup alice1
withWebSocket bob $ \ws -> do
void $ createAddCommit alice1 [bob] >>= sendAndConsumeCommitBundle
void $ awaitMatch 3 isMemberJoinNotif ws
void $ awaitMatch 10 isMemberJoinNotif ws

-- bob creates a subconversation and adds his own client
createSubConv bob1 "conference"
-- bob adds his client to the subconversation
void $ createPendingProposalCommit bob1 >>= sendAndConsumeCommitBundle

-- alice joins with her own client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,26 @@ import Data.Qualified
import Data.UUID qualified as UUID
import Imports
import Wire.API.MLS.Message
import Wire.API.Unreachable

testObject_MLSMessageSendingStatus1 :: MLSMessageSendingStatus
testObject_MLSMessageSendingStatus1 =
MLSMessageSendingStatus
{ mmssEvents = [],
mmssTime = toUTCTimeMillis (read "1864-04-12 12:22:43.673 UTC"),
mmssFailedToSendTo = mempty
mmssTime = toUTCTimeMillis (read "1864-04-12 12:22:43.673 UTC")
}

testObject_MLSMessageSendingStatus2 :: MLSMessageSendingStatus
testObject_MLSMessageSendingStatus2 =
MLSMessageSendingStatus
{ mmssEvents = [],
mmssTime = toUTCTimeMillis (read "2001-04-12 12:22:43.673 UTC"),
mmssFailedToSendTo = unreachableFromList failed1
mmssTime = toUTCTimeMillis (read "2001-04-12 12:22:43.673 UTC")
}

testObject_MLSMessageSendingStatus3 :: MLSMessageSendingStatus
testObject_MLSMessageSendingStatus3 =
MLSMessageSendingStatus
{ mmssEvents = [],
mmssTime = toUTCTimeMillis (read "1999-04-12 12:22:43.673 UTC"),
mmssFailedToSendTo = unreachableFromList failed2
mmssTime = toUTCTimeMillis (read "1999-04-12 12:22:43.673 UTC")
}

failed1 :: [Qualified UserId]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
{
"events": [],
"time": "2001-04-12T12:22:43.673Z",
"failed_to_send": [
{
"domain": "offline.example.com",
"id": "00000000-0000-0000-0000-000200000008"
}
]
"time": "2001-04-12T12:22:43.673Z"
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
{
"events": [],
"time": "1999-04-12T12:22:43.673Z",
"failed_to_send": [
{
"domain": "golden.example.com",
"id": "00000000-0000-0000-0000-000200000008"
},
{
"domain": "golden.example.com",
"id": "00000000-0000-0000-0000-000100000007"
}
]
"time": "1999-04-12T12:22:43.673Z"
}
14 changes: 1 addition & 13 deletions libs/wire-api/src/Wire/API/MLS/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import Wire.API.MLS.Proposal
import Wire.API.MLS.ProtocolVersion
import Wire.API.MLS.Serialisation
import Wire.API.MLS.Welcome
import Wire.API.Unreachable
import Wire.Arbitrary

data WireFormatTag
Expand Down Expand Up @@ -377,11 +376,7 @@ verifyMessageSignature ctx msgContent authData pubkey = isJust $ do

data MLSMessageSendingStatus = MLSMessageSendingStatus
{ mmssEvents :: [Event],
mmssTime :: UTCTimeMillis,
-- | An optional list of unreachable users an application message could not
-- be sent to. In case of commits and unreachable users use the
-- MLSMessageResponseUnreachableBackends data constructor.
mmssFailedToSendTo :: Maybe UnreachableUsers
mmssTime :: UTCTimeMillis
}
deriving (Eq, Show)
deriving (A.ToJSON, A.FromJSON, S.ToSchema) via Schema MLSMessageSendingStatus
Expand All @@ -400,10 +395,3 @@ instance ToSchema MLSMessageSendingStatus where
"time"
(description ?~ "The time of sending the message.")
schema
<*> mmssFailedToSendTo
.= maybe_
( optFieldWithDocModifier
"failed_to_send"
(description ?~ "List of federated users who could not be reached and did not receive the message")
schema
)
14 changes: 5 additions & 9 deletions services/galley/src/Galley/API/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ federationSitemap =
:<|> Named @"get-one2one-conversation" getOne2OneConversation

onClientRemoved ::
( Member ConversationStore r,
( Member BackendNotificationQueueAccess r,
Member ConversationStore r,
Member ExternalAccess r,
Member FederatorAccess r,
Member GundeckAccess r,
Member (Input Env) r,
Member (Input (Local ())) r,
Expand Down Expand Up @@ -374,7 +374,6 @@ sendMessage originDomain msr = do
onUserDeleted ::
( Member BackendNotificationQueueAccess r,
Member ConversationStore r,
Member FederatorAccess r,
Member FireAndForget r,
Member ExternalAccess r,
Member GundeckAccess r,
Expand Down Expand Up @@ -627,7 +626,7 @@ sendMLSMessage remoteDomain msr = handleMLSMessageErrors $ do
msg <- noteS @'MLSUnsupportedMessage $ mkIncomingMessage raw
(ctype, qConvOrSub) <- getConvFromGroupId msg.groupId
when (qUnqualified qConvOrSub /= msr.convOrSubId) $ throwS @'MLSGroupConversationMismatch
MLSMessageResponseUpdates . map lcuUpdate . fst
MLSMessageResponseUpdates . map lcuUpdate
<$> postMLSMessage
loc
(tUntagged sender)
Expand Down Expand Up @@ -660,11 +659,8 @@ getSubConversationForRemoteUser domain GetSubConversationsRequest {..} =

leaveSubConversation ::
( HasLeaveSubConversationEffects r,
Members
'[ Input (Local ()),
Resource
]
r
Member (Input (Local ())) r,
Member Resource r
) =>
Domain ->
LeaveSubConversationRequest ->
Expand Down
23 changes: 10 additions & 13 deletions services/galley/src/Galley/API/MLS/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ import Wire.API.MLS.GroupInfo
import Wire.API.MLS.Message
import Wire.API.MLS.Serialisation
import Wire.API.MLS.SubConversation
import Wire.API.Unreachable

-- FUTUREWORK
-- - Check that the capabilities of a leaf node in an add proposal contains all
Expand Down Expand Up @@ -140,11 +139,11 @@ postMLSMessageFromLocalUser lusr c conn smsg = do
assertMLSEnabled
imsg <- noteS @'MLSUnsupportedMessage $ mkIncomingMessage smsg
(ctype, cnvOrSub) <- getConvFromGroupId imsg.groupId
(events, unreachables) <-
first (map lcuEvent)
events <-
map lcuEvent
<$> postMLSMessage lusr (tUntagged lusr) c ctype cnvOrSub (Just conn) imsg
t <- toUTCTimeMillis <$> input
pure $ MLSMessageSendingStatus events t unreachables
pure $ MLSMessageSendingStatus events t

postMLSCommitBundle ::
( HasProposalEffects r,
Expand Down Expand Up @@ -188,7 +187,7 @@ postMLSCommitBundleFromLocalUser lusr c conn bundle = do
map lcuEvent
<$> postMLSCommitBundle lusr (tUntagged lusr) c ctype qConvOrSub (Just conn) ibundle
t <- toUTCTimeMillis <$> input
pure $ MLSMessageSendingStatus events t mempty
pure $ MLSMessageSendingStatus events t

postMLSCommitBundleToLocalConv ::
( HasProposalEffects r,
Expand Down Expand Up @@ -259,7 +258,6 @@ postMLSCommitBundleToLocalConv qusr c conn bundle ctype lConvOrSubId = do
storeGroupInfo (tUnqualified lConvOrSub).id (GroupInfoData bundle.groupInfo.raw)

propagateMessage qusr (Just c) lConvOrSub conn bundle.rawMessage (tUnqualified lConvOrSub).members
>>= mapM_ (throw . unreachableUsersToUnreachableBackends)

for_ bundle.welcome $ \welcome ->
sendWelcomes lConvOrSubId qusr conn newClients welcome
Expand Down Expand Up @@ -342,7 +340,7 @@ postMLSMessage ::
Qualified ConvOrSubConvId ->
Maybe ConnId ->
IncomingMessage ->
Sem r ([LocalConversationUpdate], Maybe UnreachableUsers)
Sem r [LocalConversationUpdate]
postMLSMessage loc qusr c ctype qconvOrSub con msg = do
foldQualified
loc
Expand Down Expand Up @@ -383,7 +381,7 @@ postMLSMessageToLocalConv ::
IncomingMessage ->
ConvType ->
Local ConvOrSubConvId ->
Sem r ([LocalConversationUpdate], Maybe UnreachableUsers)
Sem r [LocalConversationUpdate]
postMLSMessageToLocalConv qusr c con msg ctype convOrSubId = do
lConvOrSub <- fetchConvOrSub qusr msg.groupId ctype convOrSubId
let convOrSub = tUnqualified lConvOrSub
Expand Down Expand Up @@ -413,8 +411,8 @@ postMLSMessageToLocalConv qusr c con msg ctype convOrSubId = do
(epochInt msg.epoch < epochInt convOrSub.mlsMeta.cnvmlsEpoch - 2)
$ throwS @'MLSStaleMessage

unreachables <- propagateMessage qusr (Just c) lConvOrSub con msg.rawMessage (tUnqualified lConvOrSub).members
pure ([], unreachables)
propagateMessage qusr (Just c) lConvOrSub con msg.rawMessage (tUnqualified lConvOrSub).members
pure []

postMLSMessageToRemoteConv ::
( Members MLSMessageStaticErrors r,
Expand All @@ -427,7 +425,7 @@ postMLSMessageToRemoteConv ::
Maybe ConnId ->
IncomingMessage ->
Remote ConvOrSubConvId ->
Sem r ([LocalConversationUpdate], Maybe UnreachableUsers)
Sem r [LocalConversationUpdate]
postMLSMessageToRemoteConv loc qusr senderClient con msg rConvOrSubId = do
-- only local users can send messages to remote conversations
lusr <- foldQualified loc pure (\_ -> throwS @'ConvAccessDenied) qusr
Expand Down Expand Up @@ -455,11 +453,10 @@ postMLSMessageToRemoteConv loc qusr senderClient con msg rConvOrSubId = do
\sent to. The remote end returned: "
<> LT.pack (intercalate ", " (show <$> Set.toList (Set.map domainText ds)))
MLSMessageResponseUpdates updates -> do
lcus <- fmap fst . runOutputList $
fmap fst . runOutputList $
for_ updates $ \update -> do
me <- updateLocalStateOfRemoteConv (qualifyAs rConvOrSubId update) con
for_ me $ \e -> output (LocalConversationUpdate e update)
pure (lcus, Nothing)
MLSMessageResponseNonFederatingBackends e -> throw e

storeGroupInfo ::
Expand Down
Loading

0 comments on commit 226a4ef

Please sign in to comment.