diff --git a/cabal.project b/cabal.project index 52ccc84d6cd..c9f7daf5a06 100644 --- a/cabal.project +++ b/cabal.project @@ -48,6 +48,7 @@ packages: , tools/db/find-undead/ , tools/db/move-team/ , tools/db/repair-handles/ + , tools/db/inconsistencies/ , tools/rex/ , tools/stern/ diff --git a/changelog.d/5-internal/pr-2840 b/changelog.d/5-internal/pr-2840 new file mode 100644 index 00000000000..70a1375288e --- /dev/null +++ b/changelog.d/5-internal/pr-2840 @@ -0,0 +1 @@ +Add 'inconsistencies' tool to check for, and repair certain kinds of data inconsistencies across different cassandra tables. diff --git a/nix/local-haskell-packages.nix b/nix/local-haskell-packages.nix index b7ecdeed55f..387f117aa1d 100644 --- a/nix/local-haskell-packages.nix +++ b/nix/local-haskell-packages.nix @@ -46,6 +46,7 @@ auto-whitelist = hself.callPackage ../tools/db/auto-whitelist/default.nix { inherit gitignoreSource; }; billing-team-member-backfill = hself.callPackage ../tools/db/billing-team-member-backfill/default.nix { inherit gitignoreSource; }; find-undead = hself.callPackage ../tools/db/find-undead/default.nix { inherit gitignoreSource; }; + inconsistencies = hself.callPackage ../tools/db/inconsistencies/default.nix { inherit gitignoreSource; }; migrate-sso-feature-flag = hself.callPackage ../tools/db/migrate-sso-feature-flag/default.nix { inherit gitignoreSource; }; move-team = hself.callPackage ../tools/db/move-team/default.nix { inherit gitignoreSource; }; repair-handles = hself.callPackage ../tools/db/repair-handles/default.nix { inherit gitignoreSource; }; diff --git a/nix/wire-server.nix b/nix/wire-server.nix index c0c97594e33..586338588fa 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -82,6 +82,7 @@ let stern = [ "stern" ]; billing-team-member-backfill = [ "billing-team-member-backfill" ]; + inconsistencies = [ "inconsistencies" ]; api-simulations = [ "api-smoketest" "api-loadtest" ]; zauth = [ "zauth" ]; }; @@ -300,7 +301,7 @@ let pkgs.helm pkgs.helmfile pkgs.hlint - ( hlib.justStaticExecutables pkgs.haskellPackages.apply-refact ) + (hlib.justStaticExecutables pkgs.haskellPackages.apply-refact) pkgs.jq pkgs.kubectl pkgs.nixpkgs-fmt diff --git a/tools/db/inconsistencies/README.md b/tools/db/inconsistencies/README.md new file mode 100644 index 00000000000..3515341075b --- /dev/null +++ b/tools/db/inconsistencies/README.md @@ -0,0 +1,138 @@ +# inconsistencies + +This tool checks and can repair certain inconsistencies between the `user`, `user_keys` and `user_handle` tables in brig. + +More context on how this was/is useful under these issues: + +- https://wearezeta.atlassian.net/browse/SQSERVICES-1798 +- https://wearezeta.atlassian.net/browse/SQSERVICES-1797 + +(A precursor tool to check inconsistencies between spar and brig tables, if deemed useful, could be exhumed from git history of [PR #2840](https://github.com/wireapp/wire-server/pull/2840) or [one commit](https://github.com/wireapp/wire-server/pull/2840/commits/2e06428d10508328bcf2d829b16a7cc75ee72386) and incorporated here) + +This tool writes findings into an output file as JSON lines, so it can be more easily analysed. The tool should run on a cluster (as opposted to through port-forwarding from a local machine) for speed. Though please do watch metrics when running this as a few thousand parallelized table scan pagination requests per second can have a performance impact on the whole database. + +## How to run and make sense of data + +1. Build image + +``` +make build-image-inconsistencies +``` + +2. Push image + +``` +docker push +``` + +3. Run it in K8s using this pod yaml **Update image field and args appropriately**: + +Inside the affected cluster's context (e.g. `targets/wire/staging/app`), open a PR with a pod manifest file that can be created using `kubectl apply -f ` + +```yaml +apiVersion: v1 +kind: Pod +metadata: + name: inconsistencies + labels: + app: inconsistencies +spec: + restartPolicy: Never + containers: + - name: inconsistencies + image: + imagePullPolicy: Always + args: + - handle-less-users # adjust to the command you need, see Options.hs + - --cassandra-host-brig + - brig-brig-eks-service.databases + - --cassandra-keyspace-brig + - brig + - --inconsistencies-file + - /inconsistencies.log +``` + +4. Wait for the process to finish. Watch logs, it will say something like "sleeping for 4 hours" and then close all connections to cassandra. + +5. Copy the logs using `kubectl cp` + +``` +kubectl cp inconsistencies:/inconsistencies.log inconsistencies.log +``` + +6. **IMPORTANT:** Delete the pod. The easiest way to do this is with `kubectl delete -f ` (which also deletes any configmap) + +7. Convert logs into CSV: + +```bash +cat inconsistencies.log | + jq -r '[.userId, .status.value, .status.writetime, .userHandle.value, .userHandle.writetime, .handleClaimUser.value, .handleClaimUser.writetime] | @csv' >! inconsistencies.csv +``` + +You can look at this data using any tool comfortable. + +8. From a CSV file, you may extract only handles/emails/keys to feed into repair using awk/grep: + +```bash +cat inconsistencies.csv | awk -F ',' '{print $1}' | grep -v '^"+' | xargs -n 1 echo > dangling-email-keys.txt +``` + +## How to repair some data + +First, you need to extract a list of emails/handles/UUIDs you wish to repair. The code will still perform checks on whether these inputs actually need any kind of repairing (backfilling into tables or removing from tables). + +You can run the same container with additional flags of the command, a configmap with values (for simplicity called `input`), and the `--repair-data` flag. See source code under `Options.hs`. + +At least the following are supported: + +- `missing-email-keys` (and a mounted configmap containing newline-separated UUIDs) +- `dangling-handles` (and a mounted configmap containing newline-separated handles) +- `dangling-keys` (and a mounted configmap containing newline-separated emails) + +Example: + +```yaml +apiVersion: v1 +kind: Pod +metadata: + name: inconsistencies + labels: + app: inconsistencies +spec: + restartPolicy: Never + containers: + - name: inconsistencies + image: quay.io/wire/inconsistencies: + imagePullPolicy: Always + args: + - missing-email-keys + - --input-file + - /input/input + - --repair-data + - --cassandra-host-brig + - brig-brig-eks-service.databases + - --cassandra-keyspace-brig + - brig + - --inconsistencies-file + - /inconsistencies.log + volumeMounts: + - name: input + mountPath: "/input" + readOnly: true + volumes: + - name: input + configMap: + name: input +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: input +data: + input: | + 2a7de2ba-754c-11ed-b14d-00163e5e6c00 + 3049c812-754c-11ed-b56e-00163e5e6c00 + ... +``` + +Apply as usual, should execute quickly, and make sure to export inconsistencies.log and check actual logs, then delete the resources created (`kubectl delete -f ...`) diff --git a/tools/db/inconsistencies/default.nix b/tools/db/inconsistencies/default.nix new file mode 100644 index 00000000000..6108d7797cc --- /dev/null +++ b/tools/db/inconsistencies/default.nix @@ -0,0 +1,76 @@ +# WARNING: GENERATED FILE, DO NOT EDIT. +# This file is generated by running hack/bin/generate-local-nix-packages.sh and +# must be regenerated whenever local packages are added or removed, or +# dependencies are added or removed. +{ mkDerivation +, aeson +, base +, brig +, brig-types +, bytestring +, case-insensitive +, cassandra-util +, conduit +, containers +, extended +, extra +, galley-types +, gitignoreSource +, HsOpenSSL +, http-client +, imports +, lens +, lib +, multihash +, optparse-applicative +, saml2-web-sso +, string-conversions +, text +, time +, tinylog +, types-common +, unliftio +, uri-bytestring +, uuid +, wire-api +}: +mkDerivation { + pname = "inconsistencies"; + version = "1.0.0"; + src = gitignoreSource ./.; + isLibrary = false; + isExecutable = true; + executableHaskellDepends = [ + aeson + base + brig + brig-types + bytestring + case-insensitive + cassandra-util + conduit + containers + extended + extra + galley-types + HsOpenSSL + http-client + imports + lens + multihash + optparse-applicative + saml2-web-sso + string-conversions + text + time + tinylog + types-common + unliftio + uri-bytestring + uuid + wire-api + ]; + description = "Find handles which belong to deleted users"; + license = lib.licenses.agpl3Only; + mainProgram = "inconsistencies"; +} diff --git a/tools/db/inconsistencies/inconsistencies.cabal b/tools/db/inconsistencies/inconsistencies.cabal new file mode 100644 index 00000000000..54c3419c1bb --- /dev/null +++ b/tools/db/inconsistencies/inconsistencies.cabal @@ -0,0 +1,99 @@ +cabal-version: 1.12 +name: inconsistencies +version: 1.0.0 +synopsis: Find handles which belong to deleted users. +category: Network +author: Wire Swiss GmbH +maintainer: Wire Swiss GmbH +copyright: (c) 2020 Wire Swiss GmbH +license: AGPL-3 +build-type: Simple + +executable inconsistencies + main-is: Main.hs + other-modules: + DanglingHandles + DanglingUserKeys + EmailLessUsers + HandleLessUsers + Options + Paths_inconsistencies + + hs-source-dirs: src + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedStrings + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns + + ghc-options: + -O2 -Wall -Wincomplete-uni-patterns -Wincomplete-record-updates + -Wpartial-fields -fwarn-tabs -optP-Wno-nonportable-include-path + -funbox-strict-fields -threaded -with-rtsopts=-N -with-rtsopts=-T + -rtsopts + + build-depends: + aeson + , base + , brig + , brig-types + , bytestring + , case-insensitive + , cassandra-util + , conduit + , containers + , extended + , extra + , galley-types + , HsOpenSSL + , http-client + , imports + , lens + , multihash + , optparse-applicative + , saml2-web-sso + , string-conversions + , text + , time + , tinylog + , types-common + , unliftio + , uri-bytestring + , uuid + , wire-api + + default-language: Haskell2010 diff --git a/tools/db/inconsistencies/src/DanglingHandles.hs b/tools/db/inconsistencies/src/DanglingHandles.hs new file mode 100644 index 00000000000..5a208bd8680 --- /dev/null +++ b/tools/db/inconsistencies/src/DanglingHandles.hs @@ -0,0 +1,165 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module DanglingHandles where + +import Brig.Data.Instances () +import Brig.Types.Intra +import Cassandra +import Cassandra.Util +import Conduit +import qualified Data.Aeson as Aeson +import qualified Data.ByteString as BS +import Data.Conduit.Internal (zipSources) +import qualified Data.Conduit.List as C +import Data.Handle +import Data.Id +import Data.String.Conversions (cs) +import qualified Data.Text as Text +import qualified Data.Text.IO as Text +import Imports +import System.Logger +import qualified System.Logger as Log +import UnliftIO.Async + +runCommand :: Logger -> ClientState -> FilePath -> IO () +runCommand l brig inconsistenciesFile = do + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (transPipe (runClient brig) getHandles) + .| C.mapM + ( \(i, userHandles) -> do + Log.info l (Log.field "userIds" (show ((i - 1) * pageSize + fromIntegral (length userHandles)))) + pure userHandles + ) + .| C.mapM (liftIO . pooledMapConcurrentlyN 48 (\(handle, userId, claimTime) -> checkUser l brig handle userId claimTime False)) + .| C.map ((<> "\n") . BS.intercalate "\n" . map (cs . Aeson.encode) . catMaybes) + .| sinkFile inconsistenciesFile + +examineHandles :: Logger -> ClientState -> FilePath -> FilePath -> Bool -> IO () +examineHandles l brig handlesFile errorsFile fixClaim = do + handles <- mapMaybe parseHandle . Text.lines <$> Text.readFile handlesFile + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (C.sourceList handles) + .| C.mapM + ( \(i, handle) -> do + when (i `mod` 100 == 0) $ + Log.info l (Log.field "handlesProcesses" i) + liftIO $ checkHandle l brig handle fixClaim + ) + .| C.map ((<> "\n") . cs . Aeson.encode) + .| sinkFile errorsFile + +pageSize :: Int32 +pageSize = 1000 + +data HandleInfo = HandleInfo + { -- | Handle in the user_handle table + claimedHandle :: Handle, + userId :: UserId, + handleClaimTime :: Writetime Handle, + status :: Maybe (WithWritetime AccountStatus), + -- | Handle in the user table + userHandle :: Maybe (WithWritetime Handle) + } + deriving (Generic) + +instance Aeson.ToJSON HandleInfo + +data WithWritetime a = WithWritetime + { value :: a, + writetime :: Writetime a + } + deriving (Generic) + +instance Aeson.ToJSON a => Aeson.ToJSON (WithWritetime a) + +---------------------------------------------------------------------------- +-- Queries + +getHandle :: Handle -> Client (Maybe (UserId, Writetime UserId)) +getHandle handle = retry x1 $ query1 cql (params LocalQuorum (Identity handle)) + where + cql :: PrepQuery R (Identity Handle) (UserId, Writetime UserId) + cql = "SELECT user, writetime(user) from user_handle where handle = ?" + +getHandles :: ConduitM () [(Handle, UserId, Writetime UserId)] Client () +getHandles = paginateC cql (paramsP LocalQuorum () pageSize) x5 + where + cql :: PrepQuery R () (Handle, UserId, Writetime UserId) + cql = "SELECT handle, user, writetime(user) from user_handle" + +type UserDetailsRow = (Maybe AccountStatus, Maybe (Writetime AccountStatus), Maybe Handle, Maybe (Writetime Handle)) + +getUserDetails :: UserId -> Client (Maybe UserDetailsRow) +getUserDetails uid = retry x1 $ query1 cql (params LocalQuorum (Identity uid)) + where + cql :: PrepQuery R (Identity UserId) UserDetailsRow + cql = "SELECT status, writetime(status), handle, writetime(handle) from user where id = ?" + +checkHandle :: Logger -> ClientState -> Handle -> Bool -> IO (Maybe HandleInfo) +checkHandle l brig handle fixHandle = do + mUser <- runClient brig $ getHandle handle + case mUser of + Nothing -> do + Log.warn l (Log.msg (Log.val "No user found for handle") . Log.field "handle" (fromHandle handle)) + pure Nothing + Just (uid, claimTime) -> checkUser l brig handle uid claimTime fixHandle + +freeHandle :: Logger -> Handle -> Client () +freeHandle l handle = do + Log.info l $ Log.msg (Log.val "Freeing handle") . Log.field "handle" (fromHandle handle) + retry x5 $ write handleDelete (params LocalQuorum (Identity handle)) + where + handleDelete :: PrepQuery W (Identity Handle) () + handleDelete = "DELETE FROM user_handle WHERE handle = ?" + +checkUser :: Logger -> ClientState -> Handle -> UserId -> Writetime UserId -> Bool -> IO (Maybe HandleInfo) +checkUser l brig claimedHandle userId handleClaimTime fixClaim = do + maybeDetails <- runClient brig $ getUserDetails userId + case maybeDetails of + Nothing -> do + let status = Nothing + userHandle = Nothing + when fixClaim $ + runClient brig $ + freeHandle l claimedHandle + pure . Just $ HandleInfo {..} + Just (mStatus, mStatusWriteTime, mHandle, mHandleWriteTime) -> do + let status = WithWritetime <$> mStatus <*> mStatusWriteTime + userHandle = WithWritetime <$> mHandle <*> mHandleWriteTime + statusError = case mStatus of + Nothing -> True + Just Deleted -> True + _ -> False + handleError = mHandle /= Just claimedHandle + if statusError || handleError + then do + when fixClaim $ + runClient brig $ + freeHandle l claimedHandle + pure . Just $ HandleInfo {..} + else pure Nothing diff --git a/tools/db/inconsistencies/src/DanglingUserKeys.hs b/tools/db/inconsistencies/src/DanglingUserKeys.hs new file mode 100644 index 00000000000..9770b3176b5 --- /dev/null +++ b/tools/db/inconsistencies/src/DanglingUserKeys.hs @@ -0,0 +1,238 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module DanglingUserKeys where + +import Brig.Data.Instances () +import Brig.Data.UserKey +import Brig.Email (EmailKey (..), mkEmailKey) +import Brig.Phone (PhoneKey (..), mkPhoneKey) +import Brig.Types.Intra +import Cassandra +import Cassandra.Util +import Conduit +import qualified Data.Aeson as Aeson +import qualified Data.ByteString as BS +import Data.Conduit.Internal (zipSources) +import qualified Data.Conduit.List as C +import Data.Id +import Data.String.Conversions (cs) +import qualified Data.Text as Text +import qualified Data.Text.IO as Text +import Imports +import System.Logger +import qualified System.Logger as Log +import UnliftIO.Async +import Wire.API.User hiding (userEmail, userPhone) + +runCommand :: Logger -> ClientState -> FilePath -> IO () +runCommand l brig inconsistenciesFile = do + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (transPipe (runClient brig) getKeys) + .| C.mapM + ( \(i, userKeys) -> do + Log.info l (Log.field "keys" (show ((i - 1) * pageSize + fromIntegral (length userKeys)))) + pure userKeys + ) + .| C.mapM (liftIO . pooledMapConcurrentlyN 48 (\(key, userId, claimTime) -> checkUser l brig key userId claimTime False)) + .| C.map ((<> "\n") . BS.intercalate "\n" . map (cs . Aeson.encode) . catMaybes) + .| sinkFile inconsistenciesFile + +runRepair :: Logger -> ClientState -> FilePath -> FilePath -> Bool -> IO () +runRepair l brig inputFile outputFile repairData = do + keys <- mapMaybe parseKey . Text.lines <$> Text.readFile inputFile + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (C.sourceList keys) + .| C.mapM + ( \(i, key) -> do + when (i `mod` 100 == 0) $ + Log.info l (Log.field "keysProcesses" i) + liftIO $ checkKey l brig key repairData + ) + .| C.map ((<> "\n") . cs . Aeson.encode) + .| sinkFile outputFile + +pageSize :: Int32 +pageSize = 1000 + +data Inconsistency = Inconsistency + { -- | Key in the user_keys table + key :: UserKey, + userId :: UserId, + time :: Writetime UserId, + status :: Maybe (WithWritetime AccountStatus), + userEmail :: Maybe (WithWritetime Email), + userPhone :: Maybe (WithWritetime Phone), + inconsistencyCase :: Text + } + deriving (Generic) + +instance Aeson.ToJSON Inconsistency + +data WithWritetime a = WithWritetime + { value :: a, + writetime :: Writetime a + } + deriving (Generic) + +instance Aeson.ToJSON a => Aeson.ToJSON (WithWritetime a) + +---------------------------------------------------------------------------- +-- Queries + +getKey :: UserKey -> Client (Maybe (UserId, Writetime UserId)) +getKey key = retry x5 $ query1 cql (params LocalQuorum (Identity key)) + where + cql :: PrepQuery R (Identity UserKey) (UserId, Writetime UserId) + cql = "SELECT user, writetime(user) from user_keys where key = ?" + +getKeys :: ConduitM () [(UserKey, UserId, Writetime UserId)] Client () +getKeys = paginateC cql (paramsP LocalQuorum () pageSize) x5 + where + cql :: PrepQuery R () (UserKey, UserId, Writetime UserId) + cql = "SELECT key, user, writetime(user) from user_keys" + +parseKey :: Text -> Maybe UserKey +parseKey t = (userEmailKey <$> parseEmail t) <|> (userPhoneKey <$> parsePhone t) + +instance Cql UserKey where + ctype = Tagged TextColumn + + fromCql (CqlText t) = + maybe + (Left $ "invalid userkey: " <> show t) + Right + (parseKey t) + fromCql _ = Left "userkey: expected text" + + toCql k = toCql $ keyText k + +instance Aeson.ToJSON UserKey where + toJSON = Aeson.toJSON . keyText + +type UserDetailsRow = (Maybe AccountStatus, Maybe (Writetime AccountStatus), Maybe Email, Maybe (Writetime Email), Maybe Phone, Maybe (Writetime Phone)) + +getUserDetails :: UserId -> Client (Maybe UserDetailsRow) +getUserDetails uid = retry x5 $ query1 cql (params LocalQuorum (Identity uid)) + where + cql :: PrepQuery R (Identity UserId) UserDetailsRow + cql = "SELECT status, writetime(status), email, writetime(email), phone, writetime(phone) from user where id = ?" + +checkKey :: Logger -> ClientState -> UserKey -> Bool -> IO (Maybe Inconsistency) +checkKey l brig key repairData = do + mUser <- runClient brig $ getKey key + case mUser of + Nothing -> do + Log.warn l (Log.msg (Log.val "No user found for key") . Log.field "key" (keyText key)) + pure Nothing + Just (uid, writeTime) -> checkUser l brig key uid writeTime repairData + +-- mostly copied from Brig to not need a Brig Env/ReaderT +freeUserKey :: Logger -> UserKey -> Client () +freeUserKey l k = do + Log.info l $ Log.msg (Log.val "Freeing key") . Log.field "key" (keyText k) + retry x5 $ write keyDelete (params LocalQuorum (Identity $ keyText k)) + where + keyDelete :: PrepQuery W (Identity Text) () + keyDelete = "DELETE FROM user_keys WHERE key = ?" + +insertKey :: Logger -> UserId -> UserKey -> Client () +insertKey l u k = do + Log.info l $ Log.msg (Log.val "Inserting key") . Log.field "key" (keyText k) . Log.field "userId" (show u) + retry x5 $ write keyInsert (params LocalQuorum (keyText k, u)) + where + keyInsert :: PrepQuery W (Text, UserId) () + keyInsert = "INSERT INTO user_keys (key, user) VALUES (?, ?)" + +-- repair these inconsistent data cases: +-- - 1. user deleted (or with status=null) in user table, data left in user_keys -> delete records in user_keys +-- - 2. user not found in user table, data left in user_keys -> delete records in user_keys +-- - 3. the user to which this key points to has a different email in the user table, and +-- 3.a. this user *also* has a correct entry in the user_keys table -> delete record of the wrong pointer inside user_keys +-- 3.b. this user's email, when searched for points to another user -> do nothing; log this issue +-- 3.c this user's email, when searched for does not exist in user_keys. Do nothing, let this be handled by the other module EmailLessUsers.hs +-- 4. user has an email in user_keys but no email inside user table -> do nothing. How to resolve? +checkUser :: Logger -> ClientState -> UserKey -> UserId -> Writetime UserId -> Bool -> IO (Maybe Inconsistency) +checkUser l brig key userId time repairData = do + maybeDetails <- runClient brig $ getUserDetails userId + case maybeDetails of + Nothing -> do + let status = Nothing + userEmail = Nothing + userPhone = Nothing + inconsistencyCase = "2." + when repairData $ -- case 2. + runClient brig $ + freeUserKey l key + pure . Just $ Inconsistency {..} + Just (mStatus, mStatusWriteTime, mEmail, mEmailWriteTime, mPhone, mPhoneWriteTime) -> do + let status = WithWritetime <$> mStatus <*> mStatusWriteTime + userEmail = WithWritetime <$> mEmail <*> mEmailWriteTime + userPhone = WithWritetime <$> mPhone <*> mPhoneWriteTime + statusError = case mStatus of + Nothing -> True + Just Deleted -> True + _ -> False + compareEmail e = (emailKeyUniq . mkEmailKey <$> mEmail) /= Just (fromEmail e) + comparePhone p = (phoneKeyUniq . mkPhoneKey <$> mPhone) /= Just (fromPhone p) + keyError = foldKey compareEmail comparePhone key + if statusError + then do + let inconsistencyCase = "1." + when repairData $ runClient brig (freeUserKey l key) + pure . Just $ Inconsistency {..} + else + if keyError + then do + case mEmail of + Nothing -> do + Log.warn l (Log.msg (Log.val "Subcase 4: user has no email") . Log.field "userId" (show userId)) + let inconsistencyCase = "4." + pure . Just $ Inconsistency {..} + Just email -> do + validKeysEntry <- runClient brig $ getKey (userEmailKey email) + case validKeysEntry of + Just (uid, _) -> + if uid == userId + then do + -- there is a valid matching user_key entry for a user in the user table; just *also* an extra entry that can be cleaned up (case 3.a.) + Log.warn l (Log.msg (Log.val "Subcase 3a: entry can be repaired by removing entry") . Log.field "key" (keyText key)) + let inconsistencyCase = "3.a." + when repairData $ + runClient brig $ + freeUserKey l key + pure . Just $ Inconsistency {..} + else do + let inconsistencyCase = "3.b." + Log.warn l (Log.msg (Log.val "Subcase 3b: double mismatch entry in user_keys") . Log.field "userId" (show userId)) + pure . Just $ Inconsistency {..} + Nothing -> do + let inconsistencyCase = "3.c." + Log.warn l (Log.msg (Log.val "Subcase 3c: missing entry in user_keys") . Log.field "userId" (show userId)) + pure . Just $ Inconsistency {..} + else pure Nothing diff --git a/tools/db/inconsistencies/src/EmailLessUsers.hs b/tools/db/inconsistencies/src/EmailLessUsers.hs new file mode 100644 index 00000000000..10300f6ab40 --- /dev/null +++ b/tools/db/inconsistencies/src/EmailLessUsers.hs @@ -0,0 +1,156 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module EmailLessUsers where + +import Brig.Data.Instances () +import Brig.Data.UserKey +import Brig.Email +import Brig.Types.Intra +import Cassandra +import Cassandra.Util +import Conduit +import qualified DanglingUserKeys as K +import qualified Data.Aeson as Aeson +import qualified Data.ByteString as BS +import Data.Conduit.Internal (zipSources) +import qualified Data.Conduit.List as C +import Data.Either.Extra +import Data.Id +import Data.String.Conversions (cs) +import qualified Data.Text as Text +import qualified Data.Text.IO as Text +import Imports +import System.Logger +import qualified System.Logger as Log +import UnliftIO.Async + +runCommand :: Logger -> ClientState -> FilePath -> IO () +runCommand l brig inconsistenciesFile = do + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (transPipe (runClient brig) getUsers) + .| C.mapM + ( \(i, userDetails) -> do + Log.info l (Log.field "userIds" (show ((i - 1) * pageSize + fromIntegral (length userDetails)))) + pure $ mapMaybe userWithEmailAndStatus userDetails + ) + .| C.mapM (liftIO . pooledMapConcurrentlyN 48 (checkUser l brig False)) + .| C.map ((<> "\n") . BS.intercalate "\n" . map (cs . Aeson.encode) . catMaybes) + .| sinkFile inconsistenciesFile + +runRepair :: Logger -> ClientState -> FilePath -> FilePath -> Bool -> IO () +runRepair l brig inputFile outputFile repairData = do + inputLines :: [UserId] <- mapMaybe (eitherToMaybe . parseIdFromText) . Text.lines <$> Text.readFile inputFile + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (C.sourceList inputLines) + .| C.mapM + ( \(i, uid) -> do + when (i `mod` 100 == 0) $ + Log.info l (Log.field "linesProcessed" i) + liftIO $ repairUser l brig repairData uid + ) + .| C.map ((<> "\n") . cs . Aeson.encode) + .| sinkFile outputFile + +pageSize :: Int32 +pageSize = 1000 + +data EmailInfo = EmailInfo + { userId :: UserId, + status :: WithWritetime AccountStatus, + -- | Email in the user table + userEmail :: WithWritetime Email, + -- | Email in the user_keys table + emailKey :: Maybe (WithWritetime UserId), + inconsistencyCase :: Text + } + deriving (Generic) + +instance Aeson.ToJSON EmailInfo + +data WithWritetime a = WithWritetime + { value :: a, + writetime :: Writetime a + } + deriving (Generic) + +instance Aeson.ToJSON a => Aeson.ToJSON (WithWritetime a) + +---------------------------------------------------------------------------- +-- Queries + +getUserDetails :: UserId -> Client (Maybe UserDetailsRow) +getUserDetails uid = retry x5 $ query1 cql (params LocalQuorum (Identity uid)) + where + cql :: PrepQuery R (Identity UserId) UserDetailsRow + cql = "SELECT id, status, writetime(status), email, writetime(email), activated from user where id = ?" + +getUsers :: ConduitM () [UserDetailsRow] Client () +getUsers = paginateC cql (paramsP LocalQuorum () pageSize) x5 + where + cql :: PrepQuery R () UserDetailsRow + cql = "SELECT id, status, writetime(status), email, writetime(email), activated from user" + +type UserDetailsRow = (UserId, Maybe AccountStatus, Maybe (Writetime AccountStatus), Maybe Email, Maybe (Writetime Email), Bool) + +insertMissingEmail :: Logger -> ClientState -> Email -> UserId -> IO () +insertMissingEmail l brig email uid = do + runClient brig $ K.insertKey l uid (userEmailKey email) + +userWithEmailAndStatus :: UserDetailsRow -> Maybe (UserId, AccountStatus, Writetime AccountStatus, Email, Writetime Email) +userWithEmailAndStatus (uid, mStatus, mStatusWritetime, mEmail, mEmailWritetime, activated) = do + let act = if activated then Just True else Nothing + case (,,,,) <$> mStatus <*> mStatusWritetime <*> mEmail <*> mEmailWritetime <*> act of + Nothing -> Nothing + Just (status, statusWritetime, email, emailWritetime, _) -> Just (uid, status, statusWritetime, email, emailWritetime) + +repairUser :: Logger -> ClientState -> Bool -> UserId -> IO (Maybe EmailInfo) +repairUser l brig repairData uid = do + user <- runClient brig $ getUserDetails uid + let u = userWithEmailAndStatus =<< user + case u of + Nothing -> pure Nothing + Just x -> checkUser l brig repairData x + +checkUser :: Logger -> ClientState -> Bool -> (UserId, AccountStatus, Writetime AccountStatus, Email, Writetime Email) -> IO (Maybe EmailInfo) +checkUser l brig repairData (userId, statusValue, statusWritetime, userEmailValue, userEmailWriteTime) = do + let status = WithWritetime statusValue statusWritetime + userEmail = WithWritetime userEmailValue userEmailWriteTime + mKeyDetails <- runClient brig $ K.getKey (userEmailKey userEmailValue) + case mKeyDetails of + Nothing -> do + let emailKey = Nothing + inconsistencyCase = if statusValue == Active then "1-missing-email" else "2-missing-email-but-not-active" + when (repairData && (statusValue == Active)) $ do + insertMissingEmail l brig userEmailValue userId + pure . Just $ EmailInfo {..} + Just (emailKeyValue, emailClaimTime) -> do + let emailKey = Just $ WithWritetime emailKeyValue emailClaimTime + let inconsistencyCase = "3-wrong-email" + if emailKeyValue == userId + then pure Nothing + else pure . Just $ EmailInfo {..} diff --git a/tools/db/inconsistencies/src/HandleLessUsers.hs b/tools/db/inconsistencies/src/HandleLessUsers.hs new file mode 100644 index 00000000000..f3f6e5be312 --- /dev/null +++ b/tools/db/inconsistencies/src/HandleLessUsers.hs @@ -0,0 +1,115 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module HandleLessUsers where + +import Brig.Data.Instances () +import Brig.Types.Intra +import Cassandra +import Cassandra.Util +import Conduit +import qualified Data.Aeson as Aeson +import qualified Data.ByteString as BS +import Data.Conduit.Internal (zipSources) +import qualified Data.Conduit.List as C +import Data.Handle +import Data.Id +import Data.String.Conversions (cs) +import Imports +import System.Logger +import qualified System.Logger as Log +import UnliftIO.Async + +runCommand :: Logger -> ClientState -> FilePath -> IO () +runCommand l brig inconsistenciesFile = do + runResourceT $ + runConduit $ + zipSources + (C.sourceList [(1 :: Int32) ..]) + (transPipe (runClient brig) getUsers) + .| C.mapM + ( \(i, userDetails) -> do + Log.info l (Log.field "userIds" (show ((i - 1) * pageSize + fromIntegral (length userDetails)))) + pure $ mapMaybe userWithHandleAndStatus userDetails + ) + .| C.mapM (liftIO . pooledMapConcurrentlyN 48 (checkUser brig)) + .| C.map ((<> "\n") . BS.intercalate "\n" . map (cs . Aeson.encode) . catMaybes) + .| sinkFile inconsistenciesFile + +pageSize :: Int32 +pageSize = 1000 + +data HandleInfo = HandleInfo + { userId :: UserId, + status :: WithWritetime AccountStatus, + -- | Handle in the user table + userHandle :: WithWritetime Handle, + handleClaimUser :: Maybe (WithWritetime UserId) + } + deriving (Generic) + +instance Aeson.ToJSON HandleInfo + +data WithWritetime a = WithWritetime + { value :: a, + writetime :: Writetime a + } + deriving (Generic) + +instance Aeson.ToJSON a => Aeson.ToJSON (WithWritetime a) + +---------------------------------------------------------------------------- +-- Queries + +getHandle :: Handle -> Client (Maybe (UserId, Writetime UserId)) +getHandle handle = retry x1 $ query1 cql (params LocalQuorum (Identity handle)) + where + cql :: PrepQuery R (Identity Handle) (UserId, Writetime UserId) + cql = "SELECT user, writetime(user) from user_handle where handle = ?" + +getUsers :: ConduitM () [UserDetailsRow] Client () +getUsers = paginateC cql (paramsP LocalQuorum () pageSize) x5 + where + cql :: PrepQuery R () UserDetailsRow + cql = "SELECT id, status, writetime(status), handle, writetime(handle) from user" + +type UserDetailsRow = (UserId, Maybe AccountStatus, Maybe (Writetime AccountStatus), Maybe Handle, Maybe (Writetime Handle)) + +userWithHandleAndStatus :: UserDetailsRow -> Maybe (UserId, AccountStatus, Writetime AccountStatus, Handle, Writetime Handle) +userWithHandleAndStatus (uid, mStatus, mStatusWritetime, mHandle, mHandleWritetime) = + case (,,,) <$> mStatus <*> mStatusWritetime <*> mHandle <*> mHandleWritetime of + Nothing -> Nothing + Just (status, statusWritetime, handle, handleWritetime) -> Just (uid, status, statusWritetime, handle, handleWritetime) + +checkUser :: ClientState -> (UserId, AccountStatus, Writetime AccountStatus, Handle, Writetime Handle) -> IO (Maybe HandleInfo) +checkUser brig (userId, statusValue, statusWritetime, userHandleValue, userHandleWriteTime) = do + let status = WithWritetime statusValue statusWritetime + userHandle = WithWritetime userHandleValue userHandleWriteTime + mClaimDetails <- runClient brig $ getHandle userHandleValue + case mClaimDetails of + Nothing -> + let handleClaimUser = Nothing + in pure . Just $ HandleInfo {..} + Just (handleClaimUserValue, handleClaimTime) -> do + let handleClaimUser = Just $ WithWritetime handleClaimUserValue handleClaimTime + if handleClaimUserValue == userId + then pure Nothing + else pure . Just $ HandleInfo {..} diff --git a/tools/db/inconsistencies/src/Main.hs b/tools/db/inconsistencies/src/Main.hs new file mode 100644 index 00000000000..340ef94e4db --- /dev/null +++ b/tools/db/inconsistencies/src/Main.hs @@ -0,0 +1,82 @@ +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE OverloadedStrings #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Main + ( main, + ) +where + +import Cassandra as C +import Cassandra.Settings as C +import qualified DanglingHandles +import qualified DanglingUserKeys +import qualified EmailLessUsers +import qualified HandleLessUsers +import Imports +import Options as O +import Options.Applicative +import qualified System.Logger as Log +import System.Logger.Extended (structuredJSONRenderer) + +main :: IO () +main = do + (cmd, s) <- execParser (info (helper <*> optionsParser) desc) + lgr <- initLogger + brig <- initCas (setCasBrig s) (Log.clone (Just "cassandra-brig") lgr) + let workLogger = Log.clone (Just "work") lgr + outputFile = setIncosistenciesFile s + case cmd of + DanglingHandles Nothing -> + DanglingHandles.runCommand workLogger brig outputFile + DanglingHandles (Just (handlesFile, fixClaims)) -> + DanglingHandles.examineHandles workLogger brig handlesFile outputFile fixClaims + HandleLessUsers -> + HandleLessUsers.runCommand workLogger brig outputFile + DanglingUserKeys Nothing -> + DanglingUserKeys.runCommand workLogger brig outputFile + DanglingUserKeys (Just (inputFile, repairData)) -> + DanglingUserKeys.runRepair workLogger brig inputFile outputFile repairData + MissingEmailUserKeys (Just (inputFile, repairData)) -> + EmailLessUsers.runRepair workLogger brig inputFile outputFile repairData + MissingEmailUserKeys Nothing -> + EmailLessUsers.runCommand workLogger brig outputFile + + Log.info lgr $ Log.msg (Log.val "Done scanning, sleeping for 4 hours so logs can be extracted") . Log.field "file" (setIncosistenciesFile s) + threadDelay (4 * 60 * 60 * 1_000_000) + Log.info lgr $ Log.msg (Log.val "Sleep compelete, logs will not be accessible anymore if this was running in a container!") + where + desc = + header "db-inconsistencies" + <> progDesc "finds inconsistencies in the DB" + <> fullDesc + initLogger = + Log.new + . Log.setOutput Log.StdOut + . Log.setBufSize 0 + . Log.setRenderer structuredJSONRenderer + $ Log.defSettings + initCas cas l = + C.init + . C.setLogger (C.mkLogger l) + . C.setContacts (cHosts cas) [] + . C.setPortNumber (fromIntegral $ cPort cas) + . C.setKeyspace (cKeyspace cas) + . C.setProtocolVersion C.V4 + $ C.defSettings diff --git a/tools/db/inconsistencies/src/Options.hs b/tools/db/inconsistencies/src/Options.hs new file mode 100644 index 00000000000..7cbdcb37fff --- /dev/null +++ b/tools/db/inconsistencies/src/Options.hs @@ -0,0 +1,135 @@ +{-# LANGUAGE OverloadedStrings #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2022 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Options where + +import qualified Cassandra as C +import Data.Id +import qualified Data.Text as Text +import Imports +import Options.Applicative + +data Settings = Settings + { setCasBrig :: CassandraSettings, + setIncosistenciesFile :: FilePath + } + deriving (Show) + +data CassandraSettings = CassandraSettings + { cHosts :: !String, + cPort :: !Word16, + cKeyspace :: !C.Keyspace + } + deriving (Show) + +data Command + = DanglingHandles (Maybe (FilePath, Bool)) + | HandleLessUsers + | DanglingUserKeys (Maybe (FilePath, Bool)) + | MissingEmailUserKeys (Maybe (FilePath, Bool)) + +optionsParser :: Parser (Command, Settings) +optionsParser = (,) <$> commandParser <*> settingsParser + +commandParser :: Parser Command +commandParser = + subparser $ + danglingHandlesCommand <> handleLessUsersCommand <> danglingKeysCommand <> missingEmailsCommand + +danglingHandlesCommand :: Mod CommandFields Command +danglingHandlesCommand = command "dangling-handles" (info (DanglingHandles <$> optional (inputFileRepairParser "handles")) (progDesc "find handle which shouldn't be claimed")) + +danglingKeysCommand :: Mod CommandFields Command +danglingKeysCommand = command "dangling-keys" (info (DanglingUserKeys <$> optional (inputFileRepairParser "keys")) (progDesc "find keys which shouldn't be there")) + +missingEmailsCommand :: Mod CommandFields Command +missingEmailsCommand = command "missing-email-keys" (info (MissingEmailUserKeys <$> optional (inputFileRepairParser "emails")) (progDesc "find missing email keys (users with emails inside user table but not inside user_keys table)")) + +handleLessUsersCommand :: Mod CommandFields Command +handleLessUsersCommand = command "handle-less-users" (info (pure HandleLessUsers) (progDesc "find users which have a handle in the user table but not in the user_handle table")) + +settingsParser :: Parser Settings +settingsParser = + Settings + <$> cassandraSettingsParser "brig" + <*> inconsistenciesFileParser + +inputFileRepairParser :: String -> Parser (FilePath, Bool) +inputFileRepairParser s = + (,) <$> inputFileParser s <*> repairDataParser + +inputFileParser :: String -> Parser FilePath +inputFileParser s = + strOption + ( long "input-file" + <> help ("file containing list of " <> s <> " separated by new lines") + <> metavar "FILEPATH" + ) + +repairDataParser :: Parser Bool +repairDataParser = + switch + ( long "repair-data" + <> help "Automatically repair data" + ) + +inconsistenciesFileParser :: Parser FilePath +inconsistenciesFileParser = + strOption + ( long "inconsistencies-file" + <> help "File to output the found inconsistencies" + <> metavar "FILEPATH" + ) + +teamIdParser :: Parser TeamId +teamIdParser = + option + (eitherReader (parseIdFromText . Text.pack)) + ( long "team-id" + <> help "Team id to search into" + <> metavar "TEAM_ID" + ) + +cassandraSettingsParser :: String -> Parser CassandraSettings +cassandraSettingsParser ks = + CassandraSettings + <$> strOption + ( long ("cassandra-host-" ++ ks) + <> metavar "HOST" + <> help ("Cassandra Host for: " ++ ks) + <> value "localhost" + <> showDefault + ) + <*> option + auto + ( long ("cassandra-port-" ++ ks) + <> metavar "PORT" + <> help ("Cassandra Port for: " ++ ks) + <> value 9042 + <> showDefault + ) + <*> ( C.Keyspace . Text.pack + <$> strOption + ( long ("cassandra-keyspace-" ++ ks) + <> metavar "STRING" + <> help ("Cassandra Keyspace for: " ++ ks) + <> value (ks ++ "_test") + <> showDefault + ) + )