Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronize source repos concurrently #9844

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Cabal/src/Distribution/Simple/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ module Distribution.Simple.Utils
, unintersperse
, wrapText
, wrapLine
, sequenceConcurrentlyBounded
, sequenceConcurrentlyBounded_

-- * FilePath stuff
, isAbsoluteOnAnyPlatform
Expand Down Expand Up @@ -235,6 +237,7 @@ import Data.Typeable
( cast
)

import Control.Concurrent
import qualified Control.Exception as Exception
import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
import Distribution.Compat.Process (proc)
Expand Down Expand Up @@ -2025,3 +2028,45 @@ findHookedPackageDesc verbosity mbWorkDir dir = do

buildInfoExt :: String
buildInfoExt = ".buildinfo"

sequenceConcurrentlyBounded :: Int -> [IO a] -> IO [a]
sequenceConcurrentlyBounded n xs = do
sem <- newQSem (n - 1)
tid <- myThreadId
let
catchForMe x =
Exception.catches
x
[ Exception.Handler $ \e@(Exception.SomeAsyncException _) -> throwIO e
, Exception.Handler $ \e@(SomeException _) -> Exception.throwTo tid e
]
Exception.mask $ \restore -> do
resultvars <- for xs $ \x -> do
var <- newEmptyMVar
_tid <- forkIO $ Exception.bracket_ (waitQSem sem) (signalQSem sem) $ catchForMe $ do
res <- restore x
True <- tryPutMVar var res
return ()
return var
Exception.bracket_ (signalQSem sem) (waitQSem sem) (traverse takeMVar resultvars)

sequenceConcurrentlyBounded_ :: Int -> [IO a] -> IO ()
sequenceConcurrentlyBounded_ n xs = do
sem <- newQSem (n - 1)
tid <- myThreadId
let
catchForMe x =
Exception.catches
x
[ Exception.Handler $ \e@(Exception.SomeAsyncException _) -> throwIO e
, Exception.Handler $ \e@(SomeException _) -> Exception.throwTo tid e
]
Exception.mask $ \restore -> do
resultvars <- for xs $ \x -> do
var <- newEmptyMVar
_tid <- forkIO $ Exception.bracket_ (waitQSem sem) (signalQSem sem) $ catchForMe $ do
_ <- restore x
True <- tryPutMVar var ()
return ()
return var
Exception.bracket_ (signalQSem sem) (waitQSem sem) (traverse_ takeMVar resultvars)
18 changes: 10 additions & 8 deletions cabal-install/src/Distribution/Client/ProjectConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ resolveBuildTimeSettings
cabalLogsDirectory
</> "$compiler"
</> "$libname"
<.> "log"
<.> "log"
givenTemplate = flagToMaybe projectConfigLogFile

useDefaultTemplate
Expand Down Expand Up @@ -1245,10 +1245,10 @@ fetchAndReadSourcePackages
preferredHttpTransport
sequenceA
[ fetchAndReadSourcePackageRemoteTarball
verbosity
distDirLayout
getTransport
uri
verbosity
distDirLayout
getTransport
uri
| ProjectPackageRemoteTarball uri <- pkgLocations
]

Expand Down Expand Up @@ -1403,15 +1403,17 @@ syncAndReadSourcePackagesRemoteRepos
]

let progPathExtra = fromNubList projectConfigProgPathExtra
let numJobs = 4 -- hardcoded for now
getConfiguredVCS <- delayInitSharedResources $ \repoType ->
let vcs = Map.findWithDefault (error $ "Unknown VCS: " ++ prettyShow repoType) repoType knownVCSs
in configureVCS verbosity progPathExtra vcs

concat
<$> sequenceA
<$> sequenceConcurrentlyBoundedRebuild
numJobs
[ rerunIfChanged verbosity monitor repoGroup' $ do
vcs' <- getConfiguredVCS repoType
syncRepoGroupAndReadSourcePackages vcs' pathStem repoGroup'
vcs' <- getConfiguredVCS repoType
syncRepoGroupAndReadSourcePackages vcs' pathStem repoGroup'
| repoGroup@((primaryRepo, repoType) : _) <- Map.elems reposByLocation
, let repoGroup' = map fst repoGroup
pathStem =
Expand Down
13 changes: 12 additions & 1 deletion cabal-install/src/Distribution/Client/RebuildMonad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ module Distribution.Client.RebuildMonad
, findFileWithExtensionMonitored
, findFirstFileMonitored
, findFileMonitored
, sequenceConcurrentlyBoundedRebuild
) where

import Distribution.Client.Compat.Prelude
Expand All @@ -66,7 +67,7 @@ import Distribution.Client.Glob hiding (matchFileGlob)
import qualified Distribution.Client.Glob as Glob (matchFileGlob)
import Distribution.Simple.PreProcess.Types (Suffix (..))

import Distribution.Simple.Utils (debug)
import Distribution.Simple.Utils (debug, sequenceConcurrentlyBounded)

import Control.Concurrent.MVar (MVar, modifyMVar, newMVar)
import Control.Monad.Reader as Reader
Expand Down Expand Up @@ -330,3 +331,13 @@ findFileMonitored searchPath fileName =
[ path </> fileName
| path <- nub searchPath
]

-- | Run multiple 'Rebuild' actions in parallel, collecting the final
-- list of used files.
sequenceConcurrentlyBoundedRebuild :: Int -> [Rebuild a] -> Rebuild [a]
sequenceConcurrentlyBoundedRebuild n xs = do
root <- askRoot
results <- liftIO $ sequenceConcurrentlyBounded n (unRebuild root <$> xs)
for results $ \(a, files) -> do
monitorFiles files
return a
5 changes: 5 additions & 0 deletions changelog.d/synchronize-source-repos-concurrently
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
synopsis: Synchronize source repositories concurrently
packages: cabal-install
prs: #0000
significance: significant

Loading