Skip to content

Commit

Permalink
Refactor Writetime (#2994)
Browse files Browse the repository at this point in the history
  • Loading branch information
smatting authored Jan 17, 2023
1 parent b2c183a commit 37d9b3f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 28 deletions.
1 change: 1 addition & 0 deletions changelog.d/5-internal/refactor-writetime
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor Writetime from Int64 to wrapper of UTCTime
56 changes: 45 additions & 11 deletions libs/cassandra-util/src/Cassandra/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,28 @@
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.
{-# LANGUAGE NumericUnderscores #-}

module Cassandra.Util
( writeTimeToUTC,
defInitCassandra,
Writetime,
( defInitCassandra,
Writetime (..),
writetimeToInt64,
)
where

import Cassandra (ClientState, Keyspace (Keyspace), init)
import Cassandra (ClientState, init)
import Cassandra.CQL
import Cassandra.Settings (defSettings, setContacts, setKeyspace, setLogger, setPortNumber)
import Data.Aeson
import Data.Fixed
import Data.Text (unpack)
import Data.Time (UTCTime)
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Time (UTCTime, nominalDiffTimeToSeconds)
import Data.Time.Clock (secondsToNominalDiffTime)
import Data.Time.Clock.POSIX
import qualified Database.CQL.IO.Tinylog as CT
import Imports hiding (init)
import qualified System.Logger as Log

type Writetime a = Int64

writeTimeToUTC :: Writetime a -> UTCTime
writeTimeToUTC = posixSecondsToUTCTime . fromIntegral . (`div` 1000000)

defInitCassandra :: Text -> Text -> Word16 -> Log.Logger -> IO ClientState
defInitCassandra ks h p lg =
init
Expand All @@ -44,3 +44,37 @@ defInitCassandra ks h p lg =
. setContacts (unpack h) []
. setKeyspace (Keyspace ks)
$ defSettings

-- | Read cassandra's writetimes https://docs.datastax.com/en/dse/5.1/cql/cql/cql_using/useWritetime.html
-- as UTCTime values without any loss of precision
newtype Writetime a = Writetime {writetimeToUTC :: UTCTime}

instance Cql (Writetime a) where
ctype = Tagged BigIntColumn
toCql = CqlBigInt . writetimeToInt64
fromCql (CqlBigInt n) =
pure
. Writetime
. posixSecondsToUTCTime
. secondsToNominalDiffTime
. MkFixed
. (* 1_000_000)
. fromIntegral @Int64 @Integer
$ n
fromCql _ = Left "Writetime: bigint expected"

-- | This yields the same int as it is returned by WRITETIME()
writetimeToInt64 :: Writetime a -> Int64
writetimeToInt64 =
fromIntegral @Integer @Int64
. (`div` 1_000_000)
. unfixed
. nominalDiffTimeToSeconds
. utcTimeToPOSIXSeconds
. writetimeToUTC
where
unfixed :: Fixed a -> Integer
unfixed (MkFixed n) = n

instance ToJSON (Writetime a) where
toJSON = toJSON . writetimeToInt64
33 changes: 21 additions & 12 deletions services/brig/src/Brig/User/Search/Index.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import Brig.Types.Intra
import Brig.Types.Search (SearchVisibilityInbound, defaultSearchVisibilityInbound, searchVisibilityInboundFromFeatureStatus)
import Brig.User.Search.Index.Types as Types
import qualified Cassandra as C
import Cassandra.Util
import Control.Lens hiding ((#), (.=))
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow, throwM, try)
import Control.Monad.Except
Expand All @@ -73,7 +74,6 @@ import Data.ByteString.Builder (Builder, toLazyByteString)
import Data.ByteString.Conversion (toByteString')
import qualified Data.ByteString.Conversion as Bytes
import qualified Data.ByteString.Lazy as BL
import Data.Fixed (Fixed (MkFixed))
import Data.Handle (Handle)
import Data.Id
import qualified Data.Map as Map
Expand All @@ -85,8 +85,6 @@ import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import qualified Data.Text.Lazy as LT
import Data.Text.Lazy.Builder.Int (decimal)
import Data.Text.Lens hiding (text)
import Data.Time (UTCTime, secondsToNominalDiffTime)
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import qualified Data.UUID as UUID
import qualified Database.Bloodhound as ES
import Imports hiding (log, searchable)
Expand Down Expand Up @@ -775,12 +773,6 @@ scanForIndex num = do

type Activated = Bool

type Writetime a = Int64

-- Note: Writetime is in microseconds (e-6) https://docs.datastax.com/en/dse/5.1/cql/cql/cql_using/useWritetime.html
writeTimeToUTC :: Writetime a -> UTCTime
writeTimeToUTC = posixSecondsToUTCTime . secondsToNominalDiffTime . MkFixed . (* 1_000_000) . fromIntegral @Int64 @Integer

type ReindexRow =
( UserId,
Maybe TeamId,
Expand Down Expand Up @@ -837,7 +829,20 @@ reindexRowToIndexUser
)
searchVisInbound =
do
iu <- mkIndexUser u <$> version [Just tName, tStatus, tHandle, tEmail, Just tColour, Just tActivated, tService, tManagedBy, tSsoId, tEmailUnvalidated]
iu <-
mkIndexUser u
<$> version
[ Just (v tName),
v <$> tStatus,
v <$> tHandle,
v <$> tEmail,
Just (v tColour),
Just (v tActivated),
v <$> tService,
v <$> tManagedBy,
v <$> tSsoId,
v <$> tEmailUnvalidated
]
pure $
if shouldIndex
then
Expand All @@ -850,7 +855,7 @@ reindexRowToIndexUser
. set iuAccountStatus status
. set iuSAMLIdP (idpUrl =<< ssoId)
. set iuManagedBy managedBy
. set iuCreatedAt (Just (writeTimeToUTC tActivated))
. set iuCreatedAt (Just (writetimeToUTC tActivated))
. set iuSearchVisibilityInbound (Just searchVisInbound)
. set iuScimExternalId (join $ User.scimExternalId <$> managedBy <*> ssoId)
. set iuSso (sso =<< ssoId)
Expand All @@ -861,8 +866,12 @@ reindexRowToIndexUser
-- It's mostly empty, but having the status here might be useful in the future.
& set iuAccountStatus status
where
version :: [Maybe (Writetime Name)] -> m IndexVersion
v :: Writetime a -> Int64
v = writetimeToInt64

version :: [Maybe Int64] -> m IndexVersion
version = mkIndexVersion . getMax . mconcat . fmap Max . catMaybes

shouldIndex =
( case status of
Nothing -> True
Expand Down
23 changes: 22 additions & 1 deletion services/brig/test/integration/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.
{-# LANGUAGE NumericUnderscores #-}

module API.Internal
( tests,
Expand All @@ -28,7 +29,9 @@ import Bilge.Assert
import Brig.Data.User (lookupFeatureConferenceCalling, lookupStatus, userExists)
import qualified Brig.Options as Opt
import Brig.Types.Intra
import qualified Cassandra as C
import qualified Cassandra as Cass
import Cassandra.Util
import Control.Exception (ErrorCall (ErrorCall), throwIO)
import Control.Lens ((^.), (^?!))
import Control.Monad.Catch
Expand Down Expand Up @@ -77,7 +80,8 @@ tests opts mgr db brig brigep gundeck galley = do
test mgr "get,get" $ testKpcGetGet brig,
test mgr "put,put" $ testKpcPutPut brig,
test mgr "add key package ref" $ testAddKeyPackageRef brig
]
],
test mgr "writetimeToInt64" $ testWritetimeRepresentation opts mgr db brig brigep galley
]

testSuspendUser :: forall m. TestConstraints m => Cass.ClientState -> Brig -> m ()
Expand Down Expand Up @@ -370,3 +374,20 @@ getFeatureConfig galley uid = do
getAllFeatureConfigs :: (MonadIO m, MonadHttp m, HasCallStack) => (Request -> Request) -> UserId -> m ResponseLBS
getAllFeatureConfigs galley uid = do
get $ galley . paths ["feature-configs"] . zUser uid

testWritetimeRepresentation :: forall m. TestConstraints m => Opt.Opts -> Manager -> Cass.ClientState -> Brig -> Endpoint -> Galley -> m ()
testWritetimeRepresentation _ _mgr db brig _brigep _galley = do
quid <- userQualifiedId <$> randomUser brig
let uid = qUnqualified quid

ref <- fromJust <$> (runIdentity <$$> Cass.runClient db (C.query1 q1 (C.params C.LocalQuorum (Identity uid))))

wt <- fromJust <$> (runIdentity <$$> Cass.runClient db (C.query1 q2 (C.params C.LocalQuorum (Identity uid))))

liftIO $ assertEqual "writetimeToInt64(<fromCql WRITETIME(status)>) does not match WRITETIME(status)" ref (writetimeToInt64 wt)
where
q1 :: C.PrepQuery C.R (Identity UserId) (Identity Int64)
q1 = "SELECT WRITETIME(status) from user where id = ?"

q2 :: C.PrepQuery C.R (Identity UserId) (Identity (Writetime ()))
q2 = "SELECT WRITETIME(status) from user where id = ?"
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Cassandra/Team.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ team tid =
toTeam (u, n, i, k, d, s, st, b, ss) =
let t = newTeam tid u n i (fromMaybe NonBinding b) & teamIconKey .~ k & teamSplashScreen .~ fromMaybe DefaultIcon ss
status = if d then PendingDelete else fromMaybe Active s
in TeamData t status (writeTimeToUTC <$> st)
in TeamData t status (writetimeToUTC <$> st)

teamIdsOf :: UserId -> [TeamId] -> Client [TeamId]
teamIdsOf usr tids =
Expand Down
4 changes: 2 additions & 2 deletions tools/db/find-undead/src/Work.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Work where

import Brig.Types.Intra (AccountStatus (..))
import Cassandra
import Cassandra.Util (Writetime, writeTimeToUTC)
import Cassandra.Util (Writetime, writetimeToUTC)
import Conduit
import Control.Lens (view, _1, _2)
import Data.Aeson (FromJSON, (.:))
Expand Down Expand Up @@ -72,7 +72,7 @@ logUUID l f (uuid, _, time) =
Log.info l $
Log.msg f
. Log.field "uuid" (show uuid)
. Log.field "write time" (show $ writeTimeToUTC <$> time)
. Log.field "write time" (show $ writetimeToUTC <$> time)

getScrolled :: (ES.MonadBH m, MonadThrow m) => ES.IndexName -> ES.MappingName -> ConduitM () [UUID] m ()
getScrolled index mapping = processRes =<< lift (ES.getInitialScroll index mapping esSearch)
Expand Down
3 changes: 2 additions & 1 deletion tools/db/inconsistencies/src/DanglingHandles.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ freeHandle l handle = do
handleDelete = "DELETE FROM user_handle WHERE handle = ?"

checkUser :: Logger -> ClientState -> Handle -> UserId -> Writetime UserId -> Bool -> IO (Maybe HandleInfo)
checkUser l brig claimedHandle userId handleClaimTime fixClaim = do
checkUser l brig claimedHandle userId handleClaimTime' fixClaim = do
maybeDetails <- runClient brig $ getUserDetails userId
let handleClaimTime = Writetime . writetimeToUTC $ handleClaimTime'
case maybeDetails of
Nothing -> do
let status = Nothing
Expand Down

0 comments on commit 37d9b3f

Please sign in to comment.