Skip to content

Commit

Permalink
Delete backend-notification queues from federation-v0 and federation-…
Browse files Browse the repository at this point in the history
…v1 after running tests (#4374)

This ensures that the queues don't accumulate in the statically deployed instances over time.

https://wearezeta.atlassian.net/browse/WPB-11810

Co-authored-by: Akshay Mankar <[email protected]>
  • Loading branch information
battermann and akshaymankar authored Dec 17, 2024
1 parent 6092fbc commit fc9e266
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 30 deletions.
13 changes: 9 additions & 4 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ export NIX_CONFIG='extra-experimental-features = nix-command'

[[ -d "$layout_dir" ]] || mkdir -p "$layout_dir"

if [[ ! -d "$env_dir" || ! -f "$layout_dir/nix-rebuild" || "$store_paths" != $(< "$layout_dir/nix-rebuild" ) ]]; then
if [[ ! -d "$env_dir" || ! -f "$layout_dir/nix-rebuild" || "$store_paths" != $(<"$layout_dir/nix-rebuild") ]]; then
bcmd=nix
if command -v nom &> /dev/null; then
if command -v nom &>/dev/null; then
if [[ "${USE_NOM}" != "0" ]]; then
bcmd=nom
fi
fi
echo "🔧 Building environment"
$bcmd build -f nix wireServer.devEnv -Lv --out-link ./.env
echo "$store_paths" > "$layout_dir/nix-rebuild"
echo "$store_paths" >"$layout_dir/nix-rebuild"
fi

PATH_add "./.env/bin"
Expand All @@ -49,8 +49,13 @@ export LANG=en_US.UTF-8
export RABBITMQ_USERNAME=guest
export RABBITMQ_PASSWORD=alpaca-grapefruit

# Redis
export RABBITMQ_USERNAME_V0=guest
export RABBITMQ_PASSWORD_V0=alpaca-grapefruit

export RABBITMQ_USERNAME_V1=guest
export RABBITMQ_PASSWORD_V1=alpaca-grapefruit

# Redis
export REDIS_PASSWORD=very-secure-redis-cluster-password
export REDIS_ADDITIONAL_WRITE_PASSWORD=very-secure-redis-master-password

Expand Down
1 change: 1 addition & 0 deletions changelog.d/5-internal/WPB-11810
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete federation V0 and V1 queues after integration tests
14 changes: 14 additions & 0 deletions charts/integration/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ data:
rabbitmq:
host: rabbitmq
adminPort: 15671
tls: true
vHost: /
rabbitmq-v0:
host: rabbitmq.wire-federation-v0.svc.cluster.local
adminPort: 15672
tls: false
vHost: /
rabbitmq-v1:
host: rabbitmq.wire-federation-v1.svc.cluster.local
adminPort: 15672
tls: false
vHost: /
backendTwo:
Expand Down
14 changes: 14 additions & 0 deletions charts/integration/templates/integration-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,20 @@ spec:
secretKeyRef:
name: brig
key: rabbitmqPassword
- name: RABBITMQ_USERNAME_V0
value: "wire-server"
- name: RABBITMQ_PASSWORD_V0
valueFrom:
secretKeyRef:
name: rabbitmq-v0
key: rabbitmq-password
- name: RABBITMQ_USERNAME_V1
value: "wire-server"
- name: RABBITMQ_PASSWORD_V1
valueFrom:
secretKeyRef:
name: rabbitmq-v1
key: rabbitmq-password
{{- if hasKey .Values.secrets "redisUsername" }}
- name: REDIS_USERNAME
valueFrom:
Expand Down
6 changes: 6 additions & 0 deletions hack/bin/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ summary() {
done
}

# Copy the secrets from the wire-federation-v0 namespace to the current namespace to be able to delete RabbitMQ queues that are created by the integration tests to avoid overflows
kubectl -n "$NAMESPACE" delete --force secret rabbitmq-v0 || true
kubectl -n wire-federation-v0 get secrets rabbitmq -ojson | jq 'del(.metadata.namespace) | del(.metadata.resourceVersion) | del(.metadata.uid) | .metadata.name="rabbitmq-v0"' | kubectl -n "$NAMESPACE" apply -f -
kubectl -n "$NAMESPACE" delete --force secret rabbitmq-v1 || true
kubectl -n wire-federation-v1 get secrets rabbitmq -ojson | jq 'del(.metadata.namespace) | del(.metadata.resourceVersion) | del(.metadata.uid) | .metadata.name="rabbitmq-v1"' | kubectl -n "$NAMESPACE" apply -f -

# Run tests in parallel using GNU parallel (see https://www.gnu.org/software/parallel/)
# The below commands are a little convoluted, but we wish to:
# - run integration tests. If they fail, keep track of this, but still go and get logs, so we see what failed
Expand Down
5 changes: 4 additions & 1 deletion integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,10 @@ killConnection backend = do
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = Text.pack backend.berVHost,
tls = Just $ RabbitMqTlsOpts Nothing True
tls =
if rc.tls
then Just $ RabbitMqTlsOpts Nothing True
else Nothing
}
servantClient <- liftIO $ mkRabbitMqAdminClientEnv opts
name <- do
Expand Down
2 changes: 2 additions & 0 deletions integration/test/Testlib/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ mkGlobalEnv cfgFile = do
gServicesCwdBase = devEnvProjectRoot <&> (</> "services"),
gBackendResourcePool = resourcePool,
gRabbitMQConfig = intConfig.rabbitmq,
gRabbitMQConfigV0 = intConfig.rabbitmqV0,
gRabbitMQConfigV1 = intConfig.rabbitmqV1,
gTempDir = tempDir,
gTimeOutSeconds = timeOutSeconds
}
Expand Down
9 changes: 4 additions & 5 deletions integration/test/Testlib/ResourcePool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import Data.Functor
import Data.IORef
import qualified Data.Set as Set
import Data.String
import qualified Data.Text as T
import Data.Tuple
import Database.CQL.IO
import GHC.Stack (HasCallStack)
Expand Down Expand Up @@ -86,13 +85,13 @@ deleteAllRabbitMQQueues rc resource = do
{ host = rc.host,
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = T.pack resource.berVHost,
vHost = fromString resource.berVHost,
tls = Just $ RabbitMqTlsOpts Nothing True
}
client <- mkRabbitMqAdminClientEnv opts
queues <- listQueuesByVHost client (T.pack resource.berVHost) Nothing Nothing
for_ queues $ \queue ->
deleteQueue client (T.pack resource.berVHost) queue.name
queuesPage <- listQueuesByVHost client (fromString resource.berVHost) (fromString "") False 100 1
for_ queuesPage.items $ \queue ->
deleteQueue client (fromString resource.berVHost) queue.name

deleteAllDynamicBackendConfigs :: BackendResource -> Client ()
deleteAllDynamicBackendConfigs resource = write cql (defQueryParams LocalQuorum ())
Expand Down
46 changes: 46 additions & 0 deletions integration/test/Testlib/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import Data.Foldable
import Data.Function
import Data.Functor
import Data.List
import Data.Maybe (fromMaybe)
import Data.String (IsString (fromString))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
import Network.AMQP.Extended
import Network.RabbitMqAdmin
import RunAllTests
import System.Directory
import System.Environment
Expand Down Expand Up @@ -133,11 +139,51 @@ runTests tests mXMLOutput cfg = do
pure (TestSuiteReport [TestCaseReport qname TestSuccess tm])
writeChan output Nothing
wait displayThread
deleteFederationV0AndV1Queues genv
printReport report
mapM_ (saveXMLReport report) mXMLOutput
when (any (\testCase -> testCase.result /= TestSuccess) report.cases) $
exitFailure

deleteFederationV0AndV1Queues :: GlobalEnv -> IO ()
deleteFederationV0AndV1Queues env = do
let testDomains = env.gDomain1 : env.gDomain2 : env.gDynamicDomains
putStrLn "Attempting to delete federation V0 queues..."
(mV0User, mV0Pass) <- readCredsFromEnvWithSuffix "V0"
fromMaybe (putStrLn "No or incomplete credentials for fed V0 RabbitMQ") $
deleteFederationQueues testDomains env.gRabbitMQConfigV0 <$> mV0User <*> mV0Pass

putStrLn "Attempting to delete federation V1 queues..."
(mV1User, mV1Pass) <- readCredsFromEnvWithSuffix "V1"
fromMaybe (putStrLn "No or incomplete credentials for fed V1 RabbitMQ") $
deleteFederationQueues testDomains env.gRabbitMQConfigV1 <$> mV1User <*> mV1Pass
where
readCredsFromEnvWithSuffix :: String -> IO (Maybe Text, Maybe Text)
readCredsFromEnvWithSuffix suffix =
(,)
<$> (fmap fromString <$> lookupEnv ("RABBITMQ_USERNAME_" <> suffix))
<*> (fmap fromString <$> lookupEnv ("RABBITMQ_PASSWORD_" <> suffix))

deleteFederationQueues :: [String] -> RabbitMQConfig -> Text -> Text -> IO ()
deleteFederationQueues testDomains rc username password = do
let opts =
RabbitMqAdminOpts
{ host = rc.host,
port = 0,
adminPort = fromIntegral rc.adminPort,
vHost = fromString rc.vHost,
tls =
if rc.tls
then Just (RabbitMqTlsOpts Nothing True)
else Nothing
}
client <- mkRabbitMqAdminClientEnvWithCreds opts username password
for_ testDomains $ \domain -> do
page <- client.listQueuesByVHost (fromString rc.vHost) (fromString $ "^backend-notifications\\." <> domain <> "$") True 100 1
for_ page.items $ \queue -> do
putStrLn $ "Deleting queue " <> T.unpack queue.name
void $ deleteQueue client (fromString rc.vHost) queue.name

doListTests :: [(String, String, String, x)] -> IO ()
doListTests tests = for_ tests $ \(qname, _desc, _full, _) -> do
putStrLn qname
Expand Down
12 changes: 11 additions & 1 deletion integration/test/Testlib/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ instance FromJSON DynamicBackendConfig

data RabbitMQConfig = RabbitMQConfig
{ host :: String,
adminPort :: Word16
adminPort :: Word16,
tls :: Bool,
vHost :: String
}
deriving (Show)

Expand All @@ -100,6 +102,8 @@ instance FromJSON RabbitMQConfig where
RabbitMQConfig
<$> ob .: fromString "host"
<*> ob .: fromString "adminPort"
<*> ob .: fromString "tls"
<*> ob .: fromString "vHost"

-- | Initialised once per testsuite.
data GlobalEnv = GlobalEnv
Expand All @@ -115,6 +119,8 @@ data GlobalEnv = GlobalEnv
gServicesCwdBase :: Maybe FilePath,
gBackendResourcePool :: ResourcePool BackendResource,
gRabbitMQConfig :: RabbitMQConfig,
gRabbitMQConfigV0 :: RabbitMQConfig,
gRabbitMQConfigV1 :: RabbitMQConfig,
gTempDir :: FilePath,
gTimeOutSeconds :: Int
}
Expand All @@ -127,6 +133,8 @@ data IntegrationConfig = IntegrationConfig
integrationTestHostName :: String,
dynamicBackends :: Map String DynamicBackendConfig,
rabbitmq :: RabbitMQConfig,
rabbitmqV0 :: RabbitMQConfig,
rabbitmqV1 :: RabbitMQConfig,
cassandra :: CassandraConfig
}
deriving (Show, Generic)
Expand All @@ -142,6 +150,8 @@ instance FromJSON IntegrationConfig where
<*> o .: fromString "integrationTestHostName"
<*> o .: fromString "dynamicBackends"
<*> o .: fromString "rabbitmq"
<*> o .: fromString "rabbitmq-v0"
<*> o .: fromString "rabbitmq-v1"
<*> o .: fromString "cassandra"

data ServiceMap = ServiceMap
Expand Down
9 changes: 6 additions & 3 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Network.AMQP.Extended
withConnection,
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqAdminClientEnvWithCreds,
mkRabbitMqChannelMVar,
demoteOpts,
RabbitMqTlsOpts (..),
Expand Down Expand Up @@ -91,9 +92,8 @@ instance FromJSON RabbitMqAdminOpts where
<*> parseTlsJson v
<*> v .: "adminPort"

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = do
(username, password) <- readCredsFromEnv
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> Text -> Text -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds opts username password = do
mTlsSettings <- traverse (mkTLSSettings opts.host) opts.tls
let (protocol, managerSettings) = case mTlsSettings of
Nothing -> (Servant.Http, HTTP.defaultManagerSettings)
Expand All @@ -107,6 +107,9 @@ mkRabbitMqAdminClientEnv opts = do
(either throwM pure <=< flip runClientM clientEnv)
(toServant $ adminClient basicAuthData)

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = readCredsFromEnv >>= uncurry (mkRabbitMqAdminClientEnvWithCreds opts)

-- | When admin opts are needed use `AmqpEndpoint Identity`, otherwise use
-- `AmqpEndpoint NoAdmin`.
data AmqpEndpoint = AmqpEndpoint
Expand Down
30 changes: 23 additions & 7 deletions libs/extended/src/Network/RabbitMqAdmin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,36 @@ type VHost = Text

type QueueName = Text

data Page a = Page {items :: [a], page :: Int, pageCount :: Int}
deriving (Show, Eq, Generic)

instance (FromJSON a) => FromJSON (Page a) where
parseJSON =
genericParseJSON $
defaultOptions
{ fieldLabelModifier = camelTo2 '_'
}

instance (ToJSON a) => ToJSON (Page a) where
toJSON =
genericToJSON $
defaultOptions
{ fieldLabelModifier = camelTo2 '_'
}

-- | Upstream Docs:
-- https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.12.0/deps/rabbitmq_management/priv/www/api/index.html
data AdminAPI route = AdminAPI
{ -- | NOTE: This endpoint can be made paginated, but that complicates
-- consumer code a little. This might be needed for performance tuning
-- later, but perhaps not.
listQueuesByVHost ::
{ listQueuesByVHost ::
route
:- "api"
:> "queues"
:> Capture "vhost" VHost
:> QueryParam "name" Text
:> QueryParam "use_regex" Bool
:> Get '[JSON] [Queue],
:> QueryParam' '[Required, Strict] "name" Text
:> QueryParam' '[Required, Strict] "use_regex" Bool
:> QueryParam' '[Required, Strict] "page_size" Int
:> QueryParam' '[Required, Strict] "page" Int
:> Get '[JSON] (Page Queue),
deleteQueue ::
route
:- "api"
Expand Down
16 changes: 10 additions & 6 deletions services/background-worker/src/Wire/BackendNotificationPusher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,18 @@ getRemoteDomains adminClient = do
handlers =
skipAsyncExceptions
<> [logRetries (const $ pure True) logErrr]
recovering policy handlers $ const go
recovering policy handlers $ const $ go [] 1
where
go :: AppT IO [Domain]
go = do
go :: [Domain] -> Int -> AppT IO [Domain]
go domains pageNumber = do
vhost <- asks rabbitmqVHost
queues <- liftIO $ listQueuesByVHost adminClient vhost (Just "backend-notifications\\..*") (Just True)
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queues
catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
queuesPage <- liftIO $ listQueuesByVHost adminClient vhost "^backend-notifications\\..*" True 100 pageNumber
let notifQueuesSuffixes = mapMaybe (\q -> Text.stripPrefix "backend-notifications." q.name) queuesPage.items
newDomains <- catMaybes <$> traverse (\d -> either (\e -> logInvalidDomain d e >> pure Nothing) (pure . Just) $ mkDomain d) notifQueuesSuffixes
let domainsSoFar = newDomains <> domains
if queuesPage.page >= queuesPage.pageCount
then pure domainsSoFar
else go domainsSoFar (pageNumber + 1)
logInvalidDomain d e =
Log.warn $
Log.msg (Log.val "Found invalid domain in a backend notifications queue name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,18 @@ mockApi mockAdmin =
deleteConnection = mockDeleteConnection mockAdmin
}

mockListQueuesByVHost :: MockRabbitMqAdmin -> Text -> Maybe Text -> Maybe Bool -> Servant.Handler [Queue]
mockListQueuesByVHost MockRabbitMqAdmin {..} vhost _ _ = do
mockListQueuesByVHost :: MockRabbitMqAdmin -> Text -> Text -> Bool -> Int -> Int -> Servant.Handler (Page Queue)
mockListQueuesByVHost MockRabbitMqAdmin {..} vhost _ _ _ _ = do
atomically $ modifyTVar listQueuesVHostCalls (<> [vhost])
readTVarIO broken >>= \case
True -> throwError $ Servant.err500
False -> pure $ map (\n -> Queue n vhost) queues
False ->
pure
Page
{ items = map (\n -> Queue n vhost) queues,
pageCount = 1,
page = 1
}

mockListDeleteQueue :: MockRabbitMqAdmin -> Text -> Text -> Servant.Handler NoContent
mockListDeleteQueue _ _ _ = do
Expand Down
14 changes: 14 additions & 0 deletions services/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ dynamicBackends:
rabbitmq:
host: localhost
adminPort: 15671
tls: true
vHost: /

rabbitmq-v0:
host: localhost
adminPort: 15672
tls: false
vHost: federation-v0

rabbitmq-v1:
host: localhost
adminPort: 15672
tls: false
vHost: federation-v1

cassandra:
host: 127.0.0.1
Expand Down

0 comments on commit fc9e266

Please sign in to comment.