diff --git a/changelog.d/5-internal/background-worker-nosync b/changelog.d/5-internal/background-worker-nosync new file mode 100644 index 00000000000..b9eda2712c5 --- /dev/null +++ b/changelog.d/5-internal/background-worker-nosync @@ -0,0 +1 @@ +background-worker: Get list of domains from RabbitMQ instead of brig for pushing backend notifications \ No newline at end of file diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index 7e3612252d8..1a03ad0d5e4 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -21,14 +21,6 @@ data: host: federator port: 8080 - galley: - host: galley - port: 8080 - - brig: - host: brig - port: 8080 - rabbitmq: {{toYaml .rabbitmq | indent 6 }} backendNotificationPusher: diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index fcae0115bfc..a7a552a4536 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -26,6 +26,7 @@ config: backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms pushBackoffMaxWait: 300000000 # microseconds, so 300s + remotesRefreshInterval: 300000000 # microseconds, so 300s serviceAccount: # When setting this to 'false', either make sure that a service account named diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 4bd06d953fe..7185c30b4f7 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -77,21 +77,6 @@ brig: setMaxConvSize: 16 # See helmfile for the real value setFederationDomain: integration.example.com - setFederationDomainConfigs: - # 'setFederationDomainConfigs' is deprecated as of https://github.com/wireapp/wire-server/pull/3260. See - # https://docs.wire.com/understand/federation/backend-communication.html#configuring-remote-connections - # for details. - - domain: integration.example.com - search_policy: full_search - - domain: federation-test-helper.{{ .Release.Namespace }}.svc.cluster.local - search_policy: full_search - # Remove these after fixing https://wearezeta.atlassian.net/browse/WPB-3796 - - domain: dyn-backend-1 - search_policy: full_search - - domain: dyn-backend-2 - search_policy: full_search - - domain: dyn-backend-3 - search_policy: full_search setFederationStrategy: allowAll setFederationDomainConfigsUpdateFreq: 10 set2FACodeGenerationDelaySecs: 5 @@ -321,6 +306,7 @@ background-worker: backendNotificationPusher: pushBackoffMinWait: 1000 # 1ms pushBackoffMaxWait: 500000 # 0.5s + remotesRefreshInterval: 1000000 # 1s secrets: rabbitmq: username: {{ .Values.rabbitmqUsername }} diff --git a/hack/helmfile.yaml b/hack/helmfile.yaml index b40cd73d623..878cb016f50 100644 --- a/hack/helmfile.yaml +++ b/hack/helmfile.yaml @@ -127,14 +127,6 @@ releases: value: {{ .Values.federationDomain1 }} - name: cargohold.config.settings.federationDomain value: {{ .Values.federationDomain1 }} - - name: brig.config.optSettings.setFederationDomainConfigs[0].domain - value: {{ .Values.federationDomain2 }} - - name: brig.config.optSettings.setFederationDomainConfigs[2].domain - value: {{ .Values.dynBackendDomain1 }} - - name: brig.config.optSettings.setFederationDomainConfigs[3].domain - value: {{ .Values.dynBackendDomain2 }} - - name: brig.config.optSettings.setFederationDomainConfigs[4].domain - value: {{ .Values.dynBackendDomain3 }} needs: - 'databases-ephemeral' @@ -151,13 +143,5 @@ releases: value: {{ .Values.federationDomain2 }} - name: cargohold.config.settings.federationDomain value: {{ .Values.federationDomain2 }} - - name: brig.config.optSettings.setFederationDomainConfigs[0].domain - value: {{ .Values.federationDomain1 }} - - name: brig.config.optSettings.setFederationDomainConfigs[2].domain - value: {{ .Values.dynBackendDomain1 }} - - name: brig.config.optSettings.setFederationDomainConfigs[3].domain - value: {{ .Values.dynBackendDomain2 }} - - name: brig.config.optSettings.setFederationDomainConfigs[4].domain - value: {{ .Values.dynBackendDomain3 }} needs: - 'databases-ephemeral' diff --git a/integration/test/API/Brig.hs b/integration/test/API/Brig.hs index 82abfb5a6bb..abad2be9a11 100644 --- a/integration/test/API/Brig.hs +++ b/integration/test/API/Brig.hs @@ -21,6 +21,14 @@ getUser user target = do joinHttpPath ["users", domain, uid] submit "GET" req +getUserByHandle :: (HasCallStack, MakesValue user, MakesValue domain) => user -> domain -> String -> App Response +getUserByHandle user domain handle = do + domainStr <- asString domain + req <- + baseRequest user Brig Versioned $ + joinHttpPath ["users", "by-handle", domainStr, handle] + submit "GET" req + getClient :: (HasCallStack, MakesValue user, MakesValue client) => user -> @@ -39,6 +47,12 @@ deleteUser user = do submit "DELETE" $ req & addJSONObject ["password" .= defPassword] +putHandle :: (HasCallStack, MakesValue user) => user -> String -> App Response +putHandle user handle = do + req <- baseRequest user Brig Versioned "/self/handle" + submit "PUT" $ + req & addJSONObject ["handle" .= handle] + data AddClient = AddClient { ctype :: String, internal :: Bool, diff --git a/integration/test/API/Common.hs b/integration/test/API/Common.hs index 85c978cb7a3..125c6150bf1 100644 --- a/integration/test/API/Common.hs +++ b/integration/test/API/Common.hs @@ -26,6 +26,14 @@ randomEmail = liftIO $ do chars = mkArray $ ['A' .. 'Z'] <> ['a' .. 'z'] <> ['0' .. '9'] pick = (chars !) <$> randomRIO (Array.bounds chars) +randomHandle :: App String +randomHandle = liftIO $ do + n <- randomRIO (50, 256) + replicateM n pick + where + chars = mkArray $ ['a' .. 'z'] <> ['0' .. '9'] <> "_-." + pick = (chars !) <$> randomRIO (Array.bounds chars) + randomHex :: Int -> App String randomHex n = liftIO $ replicateM n pick where diff --git a/integration/test/SetupHelpers.hs b/integration/test/SetupHelpers.hs index b3c6b2dd5b5..3e2f2313894 100644 --- a/integration/test/SetupHelpers.hs +++ b/integration/test/SetupHelpers.hs @@ -6,7 +6,6 @@ import API.Brig qualified as Brig import API.BrigInternal qualified as Internal import API.Common import API.Galley -import Control.Concurrent (threadDelay) import Control.Monad.Reader import Data.Aeson hiding ((.=)) import Data.Aeson.Types qualified as Aeson @@ -17,12 +16,6 @@ import Data.UUID.V4 (nextRandom) import GHC.Stack import Testlib.Prelude --- | `n` should be 2 x `setFederationDomainConfigsUpdateFreq` in the config -connectAllDomainsAndWaitToSync :: HasCallStack => Int -> [String] -> App () -connectAllDomainsAndWaitToSync n domains = do - sequence_ [Internal.createFedConn x (Internal.FedConn y "full_search") | x <- domains, y <- domains, x /= y] - liftIO $ threadDelay (n * 1000 * 1000) -- wait for federation status to be updated - randomUser :: (HasCallStack, MakesValue domain) => domain -> Internal.CreateUser -> App Value randomUser domain cu = bindResponse (Internal.createUser domain cu) $ \resp -> do resp.status `shouldMatchInt` 201 diff --git a/integration/test/Test/Brig.hs b/integration/test/Test/Brig.hs index 4b5623a3ecc..8179641174b 100644 --- a/integration/test/Test/Brig.hs +++ b/integration/test/Test/Brig.hs @@ -1,3 +1,5 @@ +{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} + module Test.Brig where import API.Brig qualified as Public @@ -5,7 +7,6 @@ import API.BrigInternal qualified as Internal import API.Common qualified as API import API.GalleyInternal qualified as Internal import Control.Concurrent (threadDelay) -import Data.Aeson qualified as Aeson import Data.Aeson.Types hiding ((.=)) import Data.Set qualified as Set import Data.String.Conversions @@ -143,18 +144,35 @@ testSwagger = do testRemoteUserSearch :: HasCallStack => App () testRemoteUserSearch = do - let overrides = - setField "optSettings.setFederationStrategy" "allowDynamic" - >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) - startDynamicBackends [def {brigCfg = overrides}, def {brigCfg = overrides}] $ \dynDomains -> do - domains@[d1, d2] <- pure dynDomains - connectAllDomainsAndWaitToSync 1 domains - [u1, u2] <- createAndConnectUsers [d1, d2] + startDynamicBackends [def, def] $ \[d1, d2] -> do + void $ Internal.createFedConn d2 (Internal.FedConn d1 "full_search") + + u1 <- randomUser d1 def + u2 <- randomUser d2 def Internal.refreshIndex d2 uidD2 <- objId u2 + bindResponse (Public.searchContacts u1 (u2 %. "name") d2) $ \resp -> do resp.status `shouldMatchInt` 200 docs <- resp.json %. "documents" >>= asList case docs of [] -> assertFailure "Expected a non empty result, but got an empty one" doc : _ -> doc %. "id" `shouldMatch` uidD2 + +testRemoteUserSearchExactHandle :: HasCallStack => App () +testRemoteUserSearchExactHandle = do + startDynamicBackends [def, def] $ \[d1, d2] -> do + void $ Internal.createFedConn d2 (Internal.FedConn d1 "exact_handle_search") + + u1 <- randomUser d1 def + u2 <- randomUser d2 def + u2Handle <- API.randomHandle + bindResponse (Public.putHandle u2 u2Handle) $ assertSuccess + Internal.refreshIndex d2 + + bindResponse (Public.searchContacts u1 u2Handle d2) $ \resp -> do + resp.status `shouldMatchInt` 200 + docs <- resp.json %. "documents" >>= asList + case docs of + [] -> assertFailure "Expected a non empty result, but got an empty one" + doc : _ -> objQid doc `shouldMatch` objQid u2 diff --git a/integration/test/Test/Conversation.hs b/integration/test/Test/Conversation.hs index afd2e07aaad..9a42bf0493d 100644 --- a/integration/test/Test/Conversation.hs +++ b/integration/test/Test/Conversation.hs @@ -39,9 +39,8 @@ import Testlib.ResourcePool testDynamicBackendsFullyConnectedWhenAllowAll :: HasCallStack => App () testDynamicBackendsFullyConnectedWhenAllowAll = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} - startDynamicBackends [overrides, overrides, overrides] $ \dynDomains -> do + -- The default setting is 'allowAll' + startDynamicBackends [def, def, def] $ \dynDomains -> do [domainA, domainB, domainC] <- pure dynDomains uidA <- randomUser domainA def {team = True} uidB <- randomUser domainA def {team = True} @@ -65,66 +64,55 @@ testDynamicBackendsNotFederating = do { brigCfg = setField "optSettings.setFederationStrategy" "allowNone" } - startDynamicBackends [overrides, overrides, overrides] $ - \dynDomains -> do - [domainA, domainB, domainC] <- pure dynDomains - uidA <- randomUser domainA def {team = True} - retryT - $ bindResponse - (getFederationStatus uidA [domainB, domainC]) - $ \resp -> do - resp.status `shouldMatchInt` 533 - resp.json %. "unreachable_backends" `shouldMatchSet` [domainB, domainC] + startDynamicBackends [overrides, overrides, overrides] $ \[domainA, domainB, domainC] -> do + uidA <- randomUser domainA def {team = True} + retryT + $ bindResponse + (getFederationStatus uidA [domainB, domainC]) + $ \resp -> do + resp.status `shouldMatchInt` 533 + resp.json %. "unreachable_backends" `shouldMatchSet` [domainB, domainC] testDynamicBackendsFullyConnectedWhenAllowDynamic :: HasCallStack => App () testDynamicBackendsFullyConnectedWhenAllowDynamic = do - let overrides = - setField "optSettings.setFederationStrategy" "allowDynamic" - >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) - startDynamicBackends - [ def {brigCfg = overrides}, - def {brigCfg = overrides}, - def {brigCfg = overrides} - ] - $ \dynDomains -> do - domains@[domainA, domainB, domainC] <- pure dynDomains - sequence_ [createFedConn x (FedConn y "full_search") | x <- domains, y <- domains, x /= y] - uidA <- randomUser domainA def {team = True} - uidB <- randomUser domainB def {team = True} - uidC <- randomUser domainC def {team = True} - let assertConnected u d d' = - bindResponse - (getFederationStatus u [d, d']) - $ \resp -> do - resp.status `shouldMatchInt` 200 - resp.json %. "status" `shouldMatch` "fully-connected" - retryT $ assertConnected uidA domainB domainC - retryT $ assertConnected uidB domainA domainC - retryT $ assertConnected uidC domainA domainB + withFederatingBackendsAllowDynamic $ \(domainA, domainB, domainC) -> do + -- Allowing 'full_search' or any type of search is how we enable federation + -- between backends when the federation strategy is 'allowDynamic'. + sequence_ + [ createFedConn x (FedConn y "full_search") + | x <- [domainA, domainB, domainC], + y <- [domainA, domainB, domainC], + x /= y + ] + uidA <- randomUser domainA def {team = True} + uidB <- randomUser domainB def {team = True} + uidC <- randomUser domainC def {team = True} + let assertConnected u d d' = + bindResponse + (getFederationStatus u [d, d']) + $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "status" `shouldMatch` "fully-connected" + retryT $ assertConnected uidA domainB domainC + retryT $ assertConnected uidB domainA domainC + retryT $ assertConnected uidC domainA domainB testDynamicBackendsNotFullyConnected :: HasCallStack => App () testDynamicBackendsNotFullyConnected = do - let overrides = - def - { brigCfg = - setField "optSettings.setFederationStrategy" "allowDynamic" - >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) - } - startDynamicBackends [overrides, overrides, overrides] $ - \[domainA, domainB, domainC] -> do - -- A is connected to B and C, but B and C are not connected to each other - void $ createFedConn domainA $ FedConn domainB "full_search" - void $ createFedConn domainB $ FedConn domainA "full_search" - void $ createFedConn domainA $ FedConn domainC "full_search" - void $ createFedConn domainC $ FedConn domainA "full_search" - uidA <- randomUser domainA def {team = True} - retryT - $ bindResponse - (getFederationStatus uidA [domainB, domainC]) - $ \resp -> do - resp.status `shouldMatchInt` 200 - resp.json %. "status" `shouldMatch` "non-fully-connected" - resp.json %. "not_connected" `shouldMatchSet` [domainB, domainC] + withFederatingBackendsAllowDynamic $ \(domainA, domainB, domainC) -> do + -- A is connected to B and C, but B and C are not connected to each other + void $ createFedConn domainA $ FedConn domainB "full_search" + void $ createFedConn domainB $ FedConn domainA "full_search" + void $ createFedConn domainA $ FedConn domainC "full_search" + void $ createFedConn domainC $ FedConn domainA "full_search" + uidA <- randomUser domainA def {team = True} + retryT + $ bindResponse + (getFederationStatus uidA [domainB, domainC]) + $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "status" `shouldMatch` "non-fully-connected" + resp.json %. "not_connected" `shouldMatchSet` [domainB, domainC] testFederationStatus :: HasCallStack => App () testFederationStatus = do @@ -151,55 +139,34 @@ testFederationStatus = do testCreateConversationFullyConnected :: HasCallStack => App () testCreateConversationFullyConnected = do - let setFederationConfig = - setField "optSettings.setFederationStrategy" "allowDynamic" - >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) - startDynamicBackends - [ def {brigCfg = setFederationConfig}, - def {brigCfg = setFederationConfig}, - def {brigCfg = setFederationConfig} - ] - $ \dynDomains -> do - domains@[domainA, domainB, domainC] <- pure dynDomains - connectAllDomainsAndWaitToSync 1 domains - [u1, u2, u3] <- createAndConnectUsers [domainA, domainB, domainC] - bindResponse (postConversation u1 (defProteus {qualifiedUsers = [u2, u3]})) $ \resp -> do - resp.status `shouldMatchInt` 201 + startDynamicBackends [def, def, def] $ \[domainA, domainB, domainC] -> do + [u1, u2, u3] <- createAndConnectUsers [domainA, domainB, domainC] + bindResponse (postConversation u1 (defProteus {qualifiedUsers = [u2, u3]})) $ \resp -> do + resp.status `shouldMatchInt` 201 testCreateConversationNonFullyConnected :: HasCallStack => App () testCreateConversationNonFullyConnected = do - let setFederationConfig = - setField "optSettings.setFederationStrategy" "allowDynamic" - >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) - startDynamicBackends - [ def {brigCfg = setFederationConfig}, - def {brigCfg = setFederationConfig}, - def {brigCfg = setFederationConfig} - ] - $ \dynDomains -> do - [domainA, domainB, domainC] <- pure dynDomains - - -- A is connected to B and C, but B and C are not connected to each other - void $ createFedConn domainA $ FedConn domainB "full_search" - void $ createFedConn domainB $ FedConn domainA "full_search" - void $ createFedConn domainA $ FedConn domainC "full_search" - void $ createFedConn domainC $ FedConn domainA "full_search" - liftIO $ threadDelay (2 * 1000 * 1000) - - u1 <- randomUser domainA def - u2 <- randomUser domainB def - u3 <- randomUser domainC def - connectUsers u1 u2 - connectUsers u1 u3 - - bindResponse (postConversation u1 (defProteus {qualifiedUsers = [u2, u3]})) $ \resp -> do - resp.status `shouldMatchInt` 409 - resp.json %. "non_federating_backends" `shouldMatchSet` [domainB, domainC] + withFederatingBackendsAllowDynamic $ \(domainA, domainB, domainC) -> do + -- A is connected to B and C, but B and C are not connected to each other + void $ createFedConn domainA $ FedConn domainB "full_search" + void $ createFedConn domainB $ FedConn domainA "full_search" + void $ createFedConn domainA $ FedConn domainC "full_search" + void $ createFedConn domainC $ FedConn domainA "full_search" + liftIO $ threadDelay (2 * 1000 * 1000) + + u1 <- randomUser domainA def + u2 <- randomUser domainB def + u3 <- randomUser domainC def + connectUsers u1 u2 + connectUsers u1 u3 + + bindResponse (postConversation u1 (defProteus {qualifiedUsers = [u2, u3]})) $ \resp -> do + resp.status `shouldMatchInt` 409 + resp.json %. "non_federating_backends" `shouldMatchSet` [domainB, domainC] testAddMembersFullyConnectedProteus :: HasCallStack => App () testAddMembersFullyConnectedProteus = do - withFederatingBackendsAllowDynamic $ \(domainA, domainB, domainC) -> do - connectAllDomainsAndWaitToSync 2 [domainA, domainB, domainC] + startDynamicBackends [def, def, def] $ \[domainA, domainB, domainC] -> do [u1, u2, u3] <- createAndConnectUsers [domainA, domainB, domainC] -- create conversation with no users cid <- postConversation u1 (defProteus {qualifiedUsers = []}) >>= getJSON 201 @@ -293,10 +260,8 @@ testAddMemberV1 domain = do testConvWithUnreachableRemoteUsers :: HasCallStack => App () testConvWithUnreachableRemoteUsers = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} ([alice, alex, bob, charlie, dylan], domains) <- - startDynamicBackends [overrides, overrides] $ \domains -> do + startDynamicBackends [def, def] $ \domains -> do own <- make OwnDomain & asString other <- make OtherDomain & asString users <- createAndConnectUsers $ [own, own, other] <> domains @@ -313,10 +278,8 @@ testConvWithUnreachableRemoteUsers = do testAddReachableWithUnreachableRemoteUsers :: HasCallStack => App () testAddReachableWithUnreachableRemoteUsers = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} ([alex, bob], conv, domains) <- - startDynamicBackends [overrides, overrides] $ \domains -> do + startDynamicBackends [def, def] $ \domains -> do own <- make OwnDomain & asString other <- make OtherDomain & asString [alice, alex, bob, charlie, dylan] <- @@ -338,10 +301,8 @@ testAddReachableWithUnreachableRemoteUsers = do testAddUnreachable :: HasCallStack => App () testAddUnreachable = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} ([alex, charlie], [charlieDomain, dylanDomain], conv) <- - startDynamicBackends [overrides, overrides] $ \domains -> do + startDynamicBackends [def, def] $ \domains -> do own <- make OwnDomain & asString [alice, alex, charlie, dylan] <- createAndConnectUsers $ [own, own] <> domains @@ -474,10 +435,8 @@ testMultiIngressGuestLinks = do testAddUserWhenOtherBackendOffline :: HasCallStack => App () testAddUserWhenOtherBackendOffline = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} ([alice, alex], conv) <- - startDynamicBackends [overrides] $ \domains -> do + startDynamicBackends [def] $ \domains -> do own <- make OwnDomain & asString [alice, alex, charlie] <- createAndConnectUsers $ [own, own] <> domains @@ -644,18 +603,6 @@ testDeleteTeamConversationWithUnreachableRemoteMembers = do runCodensity (acquireResources 1 resourcePool) $ \[dynBackend] -> do (bob, bobClient) <- runCodensity (startDynamicBackend dynBackend mempty) $ \_ -> do - -- FUTUREWORK: get rid of this once the background worker is able to listen to all queues - do - ownDomain <- make OwnDomain & asString - otherDomain <- make OtherDomain & asString - let domains = [ownDomain, otherDomain, dynBackend.berDomain] - sequence_ - [ createFedConn x (FedConn y "full_search") - | x <- domains, - y <- domains, - x /= y - ] - bob <- randomUser dynBackend.berDomain def bobClient <- objId $ bindResponse (addClient bob def) $ getJSON 201 connectUsers alice bob @@ -676,9 +623,7 @@ testLeaveConversationSuccess = do createAndConnectUsers [OwnDomain, OwnDomain, OtherDomain, OtherDomain] [aClient, bClient] <- forM [alice, bob] $ \user -> objId $ bindResponse (addClient user def) $ getJSON 201 - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} - startDynamicBackends [overrides] $ \[dynDomain] -> do + startDynamicBackends [def] $ \[dynDomain] -> do eve <- randomUser dynDomain def eClient <- objId $ bindResponse (addClient eve def) $ getJSON 201 connectUsers alice eve @@ -697,9 +642,7 @@ testLeaveConversationSuccess = do testOnUserDeletedConversations :: HasCallStack => App () testOnUserDeletedConversations = do - let overrides = - def {brigCfg = setField "optSettings.setFederationStrategy" "allowAll"} - startDynamicBackends [overrides] $ \[dynDomain] -> do + startDynamicBackends [def] $ \[dynDomain] -> do [ownDomain, otherDomain] <- forM [OwnDomain, OtherDomain] asString [alice, alex, bob, bart, chad] <- createAndConnectUsers [ownDomain, ownDomain, otherDomain, otherDomain, dynDomain] diff --git a/integration/test/Test/Demo.hs b/integration/test/Test/Demo.hs index 547c6598220..36d58cbab57 100644 --- a/integration/test/Test/Demo.hs +++ b/integration/test/Test/Demo.hs @@ -1,3 +1,5 @@ +{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} + -- | This module is meant to show how Testlib can be used module Test.Demo where @@ -9,7 +11,6 @@ import Control.Monad.Cont import GHC.Stack import SetupHelpers import Testlib.Prelude -import UnliftIO.Concurrent (threadDelay) -- | Legalhold clients cannot be deleted. testCantDeleteLHClient :: HasCallStack => App () @@ -174,22 +175,10 @@ testIndependentESIndices = do testDynamicBackendsFederation :: HasCallStack => App () testDynamicBackendsFederation = do - startDynamicBackends [def, def] $ \dynDomains -> do - [aDynDomain, anotherDynDomain] <- pure dynDomains - _ <- Internal.createFedConn anotherDynDomain (Internal.FedConn aDynDomain "full_search") - threadDelay 2_000_000 - - u1 <- randomUser aDynDomain def - u2 <- randomUser anotherDynDomain def - uid2 <- objId u2 - Internal.refreshIndex anotherDynDomain - - bindResponse (Public.searchContacts u1 (u2 %. "name") anotherDynDomain) $ \resp -> do - resp.status `shouldMatchInt` 200 - docs <- resp.json %. "documents" >>= asList - case docs of - [] -> assertFailure "Expected a non empty result, but got an empty one" - doc : _ -> doc %. "id" `shouldMatch` uid2 + startDynamicBackends [def, def] $ \[aDynDomain, anotherDynDomain] -> do + [u1, u2] <- createAndConnectUsers [aDynDomain, anotherDynDomain] + bindResponse (Public.getConnection u1 u2) assertSuccess + bindResponse (Public.getConnection u2 u1) assertSuccess testWebSockets :: HasCallStack => App () testWebSockets = do diff --git a/integration/test/Test/Federation.hs b/integration/test/Test/Federation.hs index 97ec011f028..b690430146c 100644 --- a/integration/test/Test/Federation.hs +++ b/integration/test/Test/Federation.hs @@ -4,7 +4,6 @@ module Test.Federation where import API.Brig qualified as API -import API.BrigInternal qualified as API import API.Galley import Control.Lens import Control.Monad.Codensity @@ -32,18 +31,6 @@ testNotificationsForOfflineBackends = do -- except for setup and assertions. Perhaps there is a better name. runCodensity (acquireResources 1 resourcePool) $ \[downBackend] -> do (downUser1, downClient1, downUser2, upBackendConv, downBackendConv) <- runCodensity (startDynamicBackend downBackend mempty) $ \_ -> do - -- FUTUREWORK: get rid of this once the background worker is able to listen to all queues - do - ownDomain <- make OwnDomain & asString - otherDomain <- make OtherDomain & asString - let domains = [ownDomain, otherDomain, downBackend.berDomain] - sequence_ - [ API.createFedConn x (API.FedConn y "full_search") - | x <- domains, - y <- domains, - x /= y - ] - downUser1 <- randomUser downBackend.berDomain def downUser2 <- randomUser downBackend.berDomain def downClient1 <- objId $ bindResponse (API.addClient downUser1 def) $ getJSON 201 diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index f0c7083d066..44eaec6d2d7 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -29,7 +29,6 @@ library build-depends: aeson , amqp - , async , base , containers , exceptions diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index 9762cc70825..02a2a69851a 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -8,14 +8,6 @@ federatorInternal: host: 127.0.0.1 port: 8097 -galley: - host: 127.0.0.1 - port: 8085 - -brig: - host: 127.0.0.1 - port: 8082 - rabbitmq: host: 127.0.0.1 port: 5672 @@ -23,5 +15,6 @@ rabbitmq: adminPort: 15672 backendNotificationPusher: - pushBackoffMinWait: 1000 - pushBackoffMaxWait: 1000000 + pushBackoffMinWait: 1000 # 1ms + pushBackoffMaxWait: 1000000 # 1s + remotesRefreshInterval: 10000 # 10ms diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index ce67f35095a..4a6288d5097 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -5,7 +5,6 @@ { mkDerivation , aeson , amqp -, async , base , bytestring , containers @@ -51,7 +50,6 @@ mkDerivation { libraryHaskellDepends = [ aeson amqp - async base containers exceptions diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index f52f165dbbd..793bafd418d 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -11,7 +11,6 @@ import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Text qualified as Text import Imports -import Network.AMQP (cancelConsumer) import Network.AMQP qualified as Q import Network.AMQP.Extended import Network.AMQP.Lifted qualified as QL @@ -21,7 +20,6 @@ import System.Logger.Class qualified as Log import UnliftIO import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client -import Wire.API.Routes.FederationDomainConfig import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util @@ -114,14 +112,14 @@ startPusher consumersRef chan = do -- delivered in order. markAsWorking BackendNotificationPusher lift $ Q.qos chan 0 1 False - env <- ask -- Make sure threads aren't dangling if/when this async thread is killed let cleanup :: (Exception e, MonadThrow m, MonadIO m) => e -> m () cleanup e = do consumers <- liftIO $ readIORef consumersRef - traverse_ (liftIO . cancelConsumer chan . fst) $ Map.elems consumers + traverse_ (liftIO . Q.cancelConsumer chan . fst) $ Map.elems consumers throwM e + timeBeforeNextRefresh <- asks (.backendNotificationsConfig.remotesRefreshInterval) -- If this thread is cancelled, catch the exception, kill the consumers, and carry on. -- FUTUREWORK?: -- If this throws an exception on the Chan / in the forever loop, the exception will @@ -131,26 +129,11 @@ startPusher consumersRef chan = do [ Handler $ cleanup @SomeException, Handler $ cleanup @SomeAsyncException ] + $ forever $ do - -- Get an initial set of domains from the sync thread - -- The Chan that we will be waiting on isn't initialised with a - -- value until the domain update loop runs the callback for the - -- first time. - initRemotes <- liftIO $ readIORef env.remoteDomains - -- Get an initial set of consumers for the domains pulled from the IORef - -- so that we aren't just sitting around not doing anything for a bit at - -- the start. - ensureConsumers consumersRef chan $ domain <$> initRemotes.remotes - -- Wait for updates to the domains, this is where the bulk of the action - -- is going to take place - forever $ do - -- Wait for a new set of domains. This is a blocking action - -- so we will only move past here when we get a new set of domains. - -- It is a bit nicer than having another timeout value, as Brig is - -- already providing one in the domain update message. - chanRemotes <- liftIO $ readChan env.remoteDomainsChan - -- Make new consumers for the new domains, clean up old ones from the consumer map. - ensureConsumers consumersRef chan $ domain <$> chanRemotes.remotes + remotes <- getRemoteDomains + ensureConsumers consumersRef chan remotes + threadDelay timeBeforeNextRefresh ensureConsumers :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> [Domain] -> AppT IO () ensureConsumers consumers chan domains = do @@ -161,10 +144,10 @@ ensureConsumers consumers chan domains = do traverse_ (ensureConsumer consumers chan) domains -- Loop over all of the dropped domains. These need to be cancelled as they are no longer -- on the domain list. - traverse_ (cancelConsumer' consumers chan) droppedDomains + traverse_ (cancelConsumer consumers chan) droppedDomains -cancelConsumer' :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> Domain -> AppT IO () -cancelConsumer' consumers chan domain = do +cancelConsumer :: IORef (Map Domain (Q.ConsumerTag, MVar ())) -> Q.Channel -> Domain -> AppT IO () +cancelConsumer consumers chan domain = do Log.info $ Log.msg (Log.val "Stopping consumer") . Log.field "domain" (domainText domain) -- The ' version of atomicModifyIORef is strict in the function update and is useful -- for not leaking memory. diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index 7709cb8bd52..31a9c769034 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -2,7 +2,6 @@ module Wire.BackgroundWorker where -import Control.Concurrent.Async (cancel) import Data.Domain import Data.Map.Strict qualified as Map import Data.Metrics.Servant qualified as Metrics @@ -20,17 +19,16 @@ import Wire.BackgroundWorker.Options run :: Opts -> IO () run opts = do - (env, syncThread) <- mkEnv opts + env <- mkEnv opts (notifChanRef, notifConsumersRef) <- runAppT env $ BackendNotificationPusher.startWorker opts.rabbitmq let -- cleanup will run in a new thread when the signal is caught, so we need to use IORefs and -- specific exception types to message threads to clean up l = logger env cleanup = do - cancel syncThread -- Notification pusher thread - Log.info (logger env) $ Log.msg (Log.val "Cancelling the notification pusher thread") + Log.info l $ Log.msg (Log.val "Cancelling the notification pusher thread") readIORef notifChanRef >>= traverse_ \chan -> do - Log.info (logger env) $ Log.msg (Log.val "Got channel") + Log.info l $ Log.msg (Log.val "Got channel") readIORef notifConsumersRef >>= \m -> for_ (Map.assocs m) \(domain, (consumer, runningFlag)) -> do Log.info l $ Log.msg (Log.val "Cancelling consumer") . Log.field "Domain" domain._domainText -- Remove the consumer from the channel so it isn't called again diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 37bbaffad01..ef99676c49a 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -3,7 +3,6 @@ module Wire.BackgroundWorker.Env where -import Control.Concurrent.Async import Control.Concurrent.Chan import Control.Monad.Base import Control.Monad.Catch @@ -23,7 +22,6 @@ import System.Logger qualified as Log import System.Logger.Class (Logger, MonadLogger (..)) import System.Logger.Extended qualified as Log import Util.Options -import Wire.API.FederationUpdate import Wire.API.Routes.FederationDomainConfig import Wire.BackgroundWorker.Options @@ -42,10 +40,7 @@ data Env = Env metrics :: Metrics.Metrics, federatorInternal :: Endpoint, httpManager :: Manager, - galley :: Endpoint, - brig :: Endpoint, defederationTimeout :: ResponseTimeout, - remoteDomains :: IORef FederationDomainConfigs, remoteDomainsChan :: Chan FederationDomainConfigs, backendNotificationMetrics :: BackendNotificationMetrics, backendNotificationsConfig :: BackendNotificationsConfig, @@ -65,27 +60,19 @@ mkBackendNotificationMetrics = <*> register (vector "targetDomain" $ counter $ Prometheus.Info "wire_backend_notifications_errors" "Number of errors that occurred while pushing notifications") <*> register (vector "targetDomain" $ gauge $ Prometheus.Info "wire_backend_notifications_stuck_queues" "Set to 1 when pushing notifications is stuck") -mkEnv :: Opts -> IO (Env, Async ()) +mkEnv :: Opts -> IO Env mkEnv opts = do http2Manager <- initHttp2Manager logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat httpManager <- newManager defaultManagerSettings remoteDomainsChan <- newChan let federatorInternal = opts.federatorInternal - galley = opts.galley defederationTimeout = maybe responseTimeoutNone (\t -> responseTimeoutMicro $ 1000000 * t) -- seconds to microseconds opts.defederationTimeout - brig = opts.brig rabbitmqVHost = opts.rabbitmq.vHost - callback = - SyncFedDomainConfigsCallback - { fromFedUpdateCallback = \_old new -> do - writeChan remoteDomainsChan new - } - (remoteDomains, syncThread) <- syncFedDomainConfigs brig logger callback rabbitmqAdminClient <- mkRabbitMqAdminClientEnv opts.rabbitmq statuses <- newIORef $ @@ -95,7 +82,7 @@ mkEnv opts = do metrics <- Metrics.metrics backendNotificationMetrics <- mkBackendNotificationMetrics let backendNotificationsConfig = opts.backendNotificationPusher - pure (Env {..}, syncThread) + pure Env {..} initHttp2Manager :: IO Http2Manager initHttp2Manager = do diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 7cac93318db..da31c41255a 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -12,8 +12,6 @@ data Opts = Opts backgroundWorker :: !Endpoint, federatorInternal :: !Endpoint, rabbitmq :: !RabbitMqAdminOpts, - galley :: !Endpoint, - brig :: !Endpoint, -- | Seconds, Nothing for no timeout defederationTimeout :: Maybe Int, backendNotificationPusher :: BackendNotificationsConfig @@ -31,7 +29,10 @@ data BackendNotificationsConfig = BackendNotificationsConfig -- | Upper limit on amount of time (in microseconds) to wait before retrying -- any notification. This exists to ensure that exponential back-off doesn't -- cause wait times to be very big. - pushBackoffMaxWait :: Int + pushBackoffMaxWait :: Int, + -- | The list of remotes is refreshed at an interval. This value in + -- microseconds decides the interval for polling. + remotesRefreshInterval :: Int } deriving (Show, Generic) diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index bb37c87fce5..0aa74d531f4 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -42,7 +42,6 @@ import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Common import Wire.API.Federation.BackendNotifications import Wire.API.RawJson -import Wire.API.Routes.FederationDomainConfig import Wire.BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options @@ -182,7 +181,6 @@ spec = do ] logger <- Logger.new Logger.defSettings httpManager <- newManager defaultManagerSettings - remoteDomains <- newIORef defFederationDomainConfigs remoteDomainsChan <- newChan let federatorInternal = Endpoint "localhost" 8097 http2Manager = undefined @@ -191,9 +189,7 @@ spec = do rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone - galley = Endpoint "localhost" 8085 - brig = Endpoint "localhost" 8082 - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics domains <- runAppT Env {..} getRemoteDomains @@ -204,7 +200,6 @@ spec = do mockAdmin <- newMockRabbitMqAdmin True ["backend-notifications.foo.example"] logger <- Logger.new Logger.defSettings httpManager <- newManager defaultManagerSettings - remoteDomains <- newIORef defFederationDomainConfigs remoteDomainsChan <- newChan let federatorInternal = Endpoint "localhost" 8097 http2Manager = undefined @@ -213,9 +208,7 @@ spec = do rabbitmqAdminClient = mockRabbitMqAdminClient mockAdmin rabbitmqVHost = "test-vhost" defederationTimeout = responseTimeoutNone - galley = Endpoint "localhost" 8085 - brig = Endpoint "localhost" 8082 - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 backendNotificationMetrics <- mkBackendNotificationMetrics domainsThread <- async $ runAppT Env {..} getRemoteDomains diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index 58454c9582c..22a7c38dcef 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -7,9 +7,7 @@ import Imports import Network.HTTP.Client import System.Logger.Class qualified as Logger import Util.Options (Endpoint (..)) -import Wire.API.Routes.FederationDomainConfig -import Wire.BackgroundWorker.Env hiding (federatorInternal, galley) -import Wire.BackgroundWorker.Env qualified as E +import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util @@ -20,16 +18,13 @@ testEnv = do statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics httpManager <- newManager defaultManagerSettings - remoteDomains <- newIORef defFederationDomainConfigs remoteDomainsChan <- newChan let federatorInternal = Endpoint "localhost" 0 rabbitmqAdminClient = undefined rabbitmqVHost = undefined metrics = undefined - galley = Endpoint "localhost" 8085 - brig = Endpoint "localhost" 8082 defederationTimeout = responseTimeoutNone - backendNotificationsConfig = BackendNotificationsConfig 1000 500000 + backendNotificationsConfig = BackendNotificationsConfig 1000 500000 1000 pure Env {..} runTestAppT :: AppT IO a -> Int -> IO a diff --git a/services/brig/test/integration/Federation/End2end.hs b/services/brig/test/integration/Federation/End2end.hs index cad9c16082d..ee744b29fca 100644 --- a/services/brig/test/integration/Federation/End2end.hs +++ b/services/brig/test/integration/Federation/End2end.hs @@ -17,7 +17,6 @@ module Federation.End2end where -import API.Search.Util import API.User.Util import Bilge import Bilge.Assert ((!!!), (+ +------->+ +--------->+ | -- +------+ +-+-------+ +---------+ +------+ -testHandleLookup :: Brig -> Brig -> Http () -testHandleLookup brig brigTwo = do - -- Create a user on the "other side" using an internal brig endpoint from a - -- second brig instance in backendTwo (in another namespace in kubernetes) - (handle, userBrigTwo) <- createUserWithHandle brigTwo - -- Get result from brig two for comparison - let domain = qDomain $ userQualifiedId userBrigTwo - resultViaBrigTwo <- getUserInfoFromHandle brigTwo domain handle - - -- query the local-namespace brig for a user sitting on the other backend - -- (which will exercise the network traffic via two federators to the remote brig) - resultViaBrigOne <- getUserInfoFromHandle brig domain handle - - liftIO $ assertEqual "remote handle lookup via federator should work in the happy case" (profileQualifiedId resultViaBrigOne) (userQualifiedId userBrigTwo) - liftIO $ assertEqual "querying brig1 or brig2 about the same user should give same result" resultViaBrigTwo resultViaBrigOne - -testSearchUsers :: Brig -> Brig -> Http () -testSearchUsers brig brigTwo = do - -- Create a user on the "other side" using an internal brig endpoint from a - -- second brig instance in backendTwo (in another namespace in kubernetes) - (handle, userBrigTwo) <- createUserWithHandle brigTwo - - searcher <- userId <$> randomUser brig - let expectedUserId = userQualifiedId userBrigTwo - searchTerm = fromHandle handle - domain = qDomain expectedUserId - liftIO $ putStrLn "search for user on brigTwo (directly)..." - assertCanFindWithDomain brigTwo searcher expectedUserId searchTerm domain - - -- exercises multi-backend network traffic - liftIO $ putStrLn "search for user on brigOne via federators to remote brig..." - assertCanFindWithDomain brig searcher expectedUserId searchTerm domain - testGetUsersById :: Brig -> Brig -> Http () testGetUsersById brig1 brig2 = do users <- traverse randomUser [brig1, brig2]