Skip to content

Commit

Permalink
Merge branch 'facebookincubator:main' into protobuf-binary
Browse files Browse the repository at this point in the history
  • Loading branch information
TiwariAbhishek23 authored Nov 7, 2023
2 parents 6335ace + 9de2897 commit 6f1fbf8
Show file tree
Hide file tree
Showing 26 changed files with 466 additions and 120 deletions.
1 change: 1 addition & 0 deletions glean/client/hs/Glean.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ module Glean
, fillDatabase
, finalize
, completePredicates
, CompletePredicates(..)
, basicWriter
, FactBuilder
, makeFact
Expand Down
12 changes: 10 additions & 2 deletions glean/client/hs/Glean/Remote.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ module Glean.Remote
, dbShard
, dbShardWord

-- * Retries
, RetryBackend(..)
, backendRetryWrites
, RetryPolicy(..)
, defaultRetryPolicy

-- * More operations
, SchemaPredicates
, loadPredicates
Expand Down Expand Up @@ -60,6 +66,7 @@ import Util.Log
import Util.STM

import Glean.Backend.Types
import Glean.Backend.Retry
import Glean.BuildInfo (buildRule)
import Glean.ClientConfig.Types (UseShards(..), ClientConfig(..))
import Glean.DefaultConfigs
Expand All @@ -69,6 +76,7 @@ import qualified Glean.Types as Thrift
import Glean.Query.Thrift
import Glean.Query.Thrift.Internal
import Glean.Util.ConfigProvider
import Glean.Util.RetryChannelException
import Glean.Util.Service
import Glean.Username (getUsername)
import Glean.Util.ThriftSource as ThriftSource
Expand Down Expand Up @@ -236,8 +244,8 @@ instance Backend ThriftBackend where
withShard t (Thrift.work_repo $ Thrift.workFinished_work rq)
$ GleanService.workFinished rq

completePredicates_ t repo = withShard t repo $
GleanService.completePredicates repo
completePredicates_ t repo preds = withShard t repo $
GleanService.completePredicates repo preds

restoreDatabase t loc =
withoutShard t $ GleanService.restore loc
Expand Down
4 changes: 2 additions & 2 deletions glean/db/Glean/Backend/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ instance Backend LoggingBackend where
(const mempty) $
workFinished env rq

completePredicates_ (LoggingBackend env) repo =
completePredicates_ (LoggingBackend env) repo preds =
loggingAction
(runLogRepo "completePredicates" env repo)
(const mempty) $
completePredicates_ env repo
completePredicates_ env repo preds

restoreDatabase (LoggingBackend env) loc =
loggingAction (runLogCmd "restoreDatabase" env) (const mempty) $
Expand Down
147 changes: 125 additions & 22 deletions glean/db/Glean/Database/CompletePredicates.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,35 @@ import Util.STM
import qualified Glean.Database.Catalog as Catalog
import Glean.Database.Open
import Glean.Database.Schema
import Glean.Database.Schema.Types
( lookupPredicateSourceRef
, SchemaSelector(..)
)
import qualified Glean.Database.Storage as Storage
import Glean.Database.Types
import Glean.Internal.Types as Thrift
import Glean.Logger
import Glean.RTS.Foreign.Ownership
import Glean.RTS.Types (Pid)
import Glean.Schema.Util (convertRef)
import Glean.Types as Thrift
import qualified Glean.Util.Warden as Warden
import Glean.Util.Mutex


-- | Client API: kick off the completion process for a DB asynchronously
completePredicates
:: Env
-> Repo
-> Thrift.CompletePredicates
-> IO CompletePredicatesResponse
completePredicates env repo complete =
case complete of
Thrift.CompletePredicates_axiom _ ->
completeAxiomPredicates env repo
Thrift.CompletePredicates_derived (Thrift.CompleteDerivedPredicate pred) ->
completeDerivedPredicate env repo pred

-- For internal use: actually perform completion for a DB
syncCompletePredicates :: Env -> Repo -> IO ()
syncCompletePredicates env repo =
Expand Down Expand Up @@ -64,16 +84,16 @@ syncCompletePredicates env repo =
withWriteLock (Just writing) f =
withMutex (wrLock writing) $ const f


-- | Client API: kick off the completion process for a DB asynchronously
completePredicates :: Env -> Repo -> IO CompletePredicatesResponse
completePredicates env@Env{..} repo = do
completeAxiomPredicates :: Env -> Repo -> IO CompletePredicatesResponse
completeAxiomPredicates env@Env{..} repo = do
let
doCompletion = -- we are masked in here
(`finally` deregister) $ do
r <- tryAll $ syncCompletePredicates env repo
case r of
Left (e :: SomeException) -> do setBroken e; throwIO e
Left (e :: SomeException) -> do
setBroken repo envCatalog "completePredicates" e
throwIO e
Right{} -> setComplete

deregister =
Expand All @@ -82,21 +102,6 @@ completePredicates env@Env{..} repo = do
setComplete = void $ atomically $
Catalog.modifyMeta envCatalog repo $ \meta ->
return meta { metaAxiomComplete = True }

-- If a synchronous exception is thrown during completion, this is
-- not recoverable, so mark the DB as broken. The exception is also
-- propagated to the caller of completePredicates.
setBroken e = void $ atomically $
Catalog.modifyMeta envCatalog repo $ \meta ->
return meta { metaCompleteness =
Broken (DatabaseBroken "completePredicates" (Text.pack (show e))) }

waitFor async = do
r <- timeout 1000000 $ wait async
case r of
Nothing -> throwIO $ Retry 1
Just _ -> return CompletePredicatesResponse{}

mask_ $ do
-- speculatively spawn a thread to do the completion, we'll cancel
-- this if we don't need it. This is so that we can atomically
Expand All @@ -121,8 +126,106 @@ completePredicates env@Env{..} repo = do
case HashMap.lookup repo completing of
Just existingAsync -> do -- in progress
later $ cancel async
return $ waitFor existingAsync
return $ waitFor existingAsync CompletePredicatesResponse{}
Nothing -> now $ do -- start async completion
void $ tryPutTMVar tmvar ()
modifyTVar envCompleting (HashMap.insert repo async)
return $ waitFor async
return $ waitFor async CompletePredicatesResponse{}


-- | Propagate ownership information for an externally derived predicate.
syncCompleteDerivedPredicate :: Env -> Repo -> Pid -> IO ()
syncCompleteDerivedPredicate env repo pid =
withOpenDatabase env repo $ \OpenDB{..} -> do
maybeOwnership <- readTVarIO odbOwnership
forM_ maybeOwnership $ \ownership -> do
maybeBase <- repoParent env repo
let withBase repo f = readDatabase env repo $ \_ lookup -> f (Just lookup)
maybe ($ Nothing) withBase maybeBase $ \base ->
withWriteLock odbWriting $ do
computed <- Storage.computeDerivedOwnership odbHandle ownership base pid
Storage.storeOwnership odbHandle computed
where
withWriteLock Nothing f = f
-- if there's no write lock, we must be in the finalization
-- phase and the DB has already been marked read-only. We have
-- exclusive write access at this point so it's safe to
-- continue.
withWriteLock (Just writing) f =
withMutex (wrLock writing) $ const f

-- | Kick off completion of externally derived predicate asynchronously
completeDerivedPredicate
:: Env
-> Repo
-> PredicateRef
-> IO CompletePredicatesResponse
completeDerivedPredicate env@Env{..} repo pred = do
details <- withOpenDatabase env repo $ \odb ->
predicateDetails (odbSchema odb) pred
let
doCompletion = do -- we are masked in here
r <- tryAll $
syncCompleteDerivedPredicate env repo (predicatePid details)
case r of
Left (e :: SomeException) -> do
setBroken repo envCatalog "completeDerivedPredicate" e
throwIO e
Right{} -> return ()

mask_ $ do
-- speculatively spawn a thread to do the completion, we'll cancel
-- this if we don't need it. This is so that we can atomically
-- start the job and update the Env state together.
tmvar <- newEmptyTMVarIO
async <- Warden.spawn envWarden $
atomically (takeTMVar tmvar) >> doCompletion
join $ immediately $ do
meta <- now $ Catalog.readMeta envCatalog repo
if
| not $ metaAxiomComplete meta -> do
now $ throwSTM $ Thrift.Exception "DB is not complete."
| Broken b <- metaCompleteness meta -> do
later $ cancel async
now $ throwSTM $ Exception $ databaseBroken_reason b
| envReadOnly -> do
later $ cancel async
now $ throwSTM $ Exception "DB is read-only"
| otherwise -> do
completing <- now $ readTVar envCompletingDerived
let derivations = HashMap.lookupDefault mempty repo completing
predId = predicateId details
case HashMap.lookup predId derivations of
Just existingAsync -> do -- in progress
later $ cancel async
return $ waitFor existingAsync CompletePredicatesResponse{}
Nothing -> now $ do -- start async completion
void $ tryPutTMVar tmvar ()
modifyTVar envCompletingDerived $
HashMap.insert repo $
HashMap.insert predId async derivations
return $ waitFor async CompletePredicatesResponse{}
where
predicateDetails schema pred =
case lookupPredicateSourceRef (convertRef pred) LatestSchemaAll schema of
Right details -> return details
Left err ->
throwIO $ Thrift.Exception $ "completeDerivedPredicate: " <> err

-- If a synchronous exception is thrown during completion, this is
-- not recoverable, so mark the DB as broken. The exception is also
-- propagated to the caller.
setBroken :: Repo -> Catalog.Catalog -> Text.Text -> SomeException -> IO ()
setBroken repo catalog context e = void $ atomically $
Catalog.modifyMeta catalog repo $ \meta ->
return meta {
metaCompleteness = Broken
(DatabaseBroken context (Text.pack (show e)))
}

waitFor :: Async () -> b -> IO b
waitFor async result = do
r <- timeout 1000000 $ wait async
case r of
Nothing -> throwIO $ Retry 1
Just _ -> return result
2 changes: 2 additions & 0 deletions glean/db/Glean/Database/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ initEnv evb envStorage envCatalog shardManager cfg

envCompleting <- newTVarIO HashMap.empty

envCompletingDerived <- newTVarIO HashMap.empty

envDbSchemaCache <- newMVar HashMap.empty

return Env
Expand Down
10 changes: 7 additions & 3 deletions glean/db/Glean/Database/Open.hs
Original file line number Diff line number Diff line change
Expand Up @@ -553,17 +553,21 @@ baseSlices env deps stored_units = case deps of
slice <- if exclude && Set.null units
then return Nothing
else do
let !n = Set.size units
vlog 1 $ "computing slice for " <> showRepo repo <> " with " <>
show (Set.toList units) <>
if exclude then " excluded" else " included"
show n <>
(if exclude then " excluded: " else " included: ") <>
show (Set.toList units)
unitIds <- fmap catMaybes $
forM (Set.toList units) $ \name -> do
id <- Storage.getUnitId odbHandle name
vlog 2 $ "unit: " <> show name <> " = " <> show id
return id
maybeOwnership <- readTVarIO odbOwnership
forM maybeOwnership $ \ownership ->
r <- forM maybeOwnership $ \ownership ->
Ownership.slice ownership (catMaybes rest) unitIds exclude
logInfo $ "completed " <> show n <> " slices for " <> showRepo repo
return r
return (slice : rest)

-- | Create the slices for a stack of dependencies
Expand Down
2 changes: 2 additions & 0 deletions glean/db/Glean/Database/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ data Env = forall storage. Storage storage => Env
, envActive :: TVar (HashMap Thrift.Repo DB)
, envDeleting :: TVar (HashMap Thrift.Repo (Async ()))
, envCompleting :: TVar (HashMap Thrift.Repo (Async ()))
, envCompletingDerived ::
TVar (HashMap Thrift.Repo (HashMap PredicateId (Async ())))
, envReadOnly :: Bool
, envMockWrites :: Bool
, envStats :: Stats
Expand Down
Loading

0 comments on commit 6f1fbf8

Please sign in to comment.