From 3671291800a972b67cc722a174e6cd460a7d759c Mon Sep 17 00:00:00 2001 From: Ben Gamari Date: Fri, 17 Nov 2023 18:49:27 -0500 Subject: [PATCH] Rewrite `TBQueue` to use `TArray Int (Maybe a)` (attempt 2) This reverts commit 110318ad8a0417624398b1af3d5ebafac50d527b. --- Control/Concurrent/STM/TBQueue.hs | 267 ++++++++++++++---------------- bench/chanbench.hs | 59 ------- cabal.project | 5 +- hie.yaml | 2 + testsuite/src/Issue17.hs | 73 -------- testsuite/src/Main.hs | 2 - testsuite/testsuite.cabal | 1 - 7 files changed, 130 insertions(+), 279 deletions(-) delete mode 100644 bench/chanbench.hs create mode 100644 hie.yaml delete mode 100644 testsuite/src/Issue17.hs diff --git a/Control/Concurrent/STM/TBQueue.hs b/Control/Concurrent/STM/TBQueue.hs index 27430ec..d5ea578 100644 --- a/Control/Concurrent/STM/TBQueue.hs +++ b/Control/Concurrent/STM/TBQueue.hs @@ -46,204 +46,185 @@ module Control.Concurrent.STM.TBQueue ( capacityTBQueue, ) where -import Control.Monad (unless) -import Data.Typeable (Typeable) -import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse, - readTVar, retry, writeTVar) -import Numeric.Natural (Natural) -import Prelude hiding (read) +#if !MIN_VERSION_base(4,8,0) +import Control.Applicative (pure) +#endif +import Data.Array.Base +import Data.Maybe (isJust, isNothing) +import Data.Typeable (Typeable) +import GHC.Conc +import Numeric.Natural (Natural) +import Prelude hiding (read) + +import Control.Concurrent.STM.TArray -- | 'TBQueue' is an abstract type representing a bounded FIFO channel. -- -- @since 2.4 data TBQueue a - = TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity - {-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read - {-# UNPACK #-} !(TVar Natural) -- CW: write capacity - {-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent) - !(Natural) -- CAP: initial capacity + = TBQueue {-# UNPACK #-} !(TVar Int) -- read index + {-# UNPACK #-} !(TVar Int) -- write index + {-# UNPACK #-} !(TArray Int (Maybe a)) -- elements + {-# UNPACK #-} !Int -- initial capacity deriving Typeable instance Eq (TBQueue a) where - TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b + -- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one + TBQueue a _ _ _ == TBQueue b _ _ _ = a == b --- Total channel capacity remaining is CR + CW. Reads only need to --- access CR, writes usually need to access only CW but sometimes need --- CR. So in the common case we avoid contention between CR and CW. --- --- - when removing an element from R: --- CR := CR + 1 --- --- - when adding an element to W: --- if CW is non-zero --- then CW := CW - 1 --- then if CR is non-zero --- then CW := CR - 1; CR := 0 --- else **FULL** +-- incMod x cap == (x + 1) `mod` cap +incMod :: Int -> Int -> Int +incMod x cap = let y = x + 1 in if y == cap then 0 else y + +-- decMod x cap = (x - 1) `mod` cap +decMod :: Int -> Int -> Int +decMod x cap = if x == 0 then cap - 1 else x - 1 -- | Builds and returns a new instance of 'TBQueue'. newTBQueue :: Natural -- ^ maximum number of elements the queue can hold -> STM (TBQueue a) -newTBQueue size = do - read <- newTVar [] - write <- newTVar [] - rsize <- newTVar 0 - wsize <- newTVar size - return (TBQueue rsize read wsize write size) +newTBQueue cap + | cap <= 0 = error "capacity has to be greater than 0" + | cap > fromIntegral (maxBound :: Int) = error "capacity is too big" + | otherwise = do + rindex <- newTVar 0 + windex <- newTVar 0 + elements <- newArray (0, cap' - 1) Nothing + pure (TBQueue rindex windex elements cap') + where + cap' = fromIntegral cap -- | @IO@ version of 'newTBQueue'. This is useful for creating top-level -- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't -- possible. newTBQueueIO :: Natural -> IO (TBQueue a) -newTBQueueIO size = do - read <- newTVarIO [] - write <- newTVarIO [] - rsize <- newTVarIO 0 - wsize <- newTVarIO size - return (TBQueue rsize read wsize write size) - --- |Write a value to a 'TBQueue'; blocks if the queue is full. +newTBQueueIO cap + | cap <= 0 = error "capacity has to be greater than 0" + | cap > fromIntegral (maxBound :: Int) = error "capacity is too big" + | otherwise = do + rindex <- newTVarIO 0 + windex <- newTVarIO 0 + elements <- newArray (0, cap' - 1) Nothing + pure (TBQueue rindex windex elements cap') + where + cap' = fromIntegral cap + +-- | Write a value to a 'TBQueue'; retries if the queue is full. writeTBQueue :: TBQueue a -> a -> STM () -writeTBQueue (TBQueue rsize _read wsize write _size) a = do - w <- readTVar wsize - if (w > 0) - then do writeTVar wsize $! w - 1 - else do - r <- readTVar rsize - if (r > 0) - then do writeTVar rsize 0 - writeTVar wsize $! r - 1 - else retry - listend <- readTVar write - writeTVar write (a:listend) - --- |Read the next value from the 'TBQueue'. +writeTBQueue (TBQueue _ windex elements cap) a = do + w <- readTVar windex + ele <- unsafeRead elements w + case ele of + Nothing -> unsafeWrite elements w (Just a) + Just _ -> retry + writeTVar windex $! incMod w cap + +-- | Read the next value from the 'TBQueue'; retries if the queue is empty. readTBQueue :: TBQueue a -> STM a -readTBQueue (TBQueue rsize read _wsize write _size) = do - xs <- readTVar read - r <- readTVar rsize - writeTVar rsize $! r + 1 - case xs of - (x:xs') -> do - writeTVar read xs' - return x - [] -> do - ys <- readTVar write - case ys of - [] -> retry - _ -> do - -- NB. lazy: we want the transaction to be - -- short, otherwise it will conflict - let ~(z,zs) = case reverse ys of - z':zs' -> (z',zs') - _ -> error "readTBQueue: impossible" - writeTVar write [] - writeTVar read zs - return z +readTBQueue (TBQueue rindex _ elements cap) = do + r <- readTVar rindex + ele <- unsafeRead elements r + a <- case ele of + Nothing -> retry + Just a -> do + unsafeWrite elements r Nothing + pure a + writeTVar rindex $! incMod r cap + pure a -- | A version of 'readTBQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryReadTBQueue :: TBQueue a -> STM (Maybe a) -tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` return Nothing +tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing -- | Efficiently read the entire contents of a 'TBQueue' into a list. This -- function never retries. -- -- @since 2.4.5 -flushTBQueue :: TBQueue a -> STM [a] -flushTBQueue (TBQueue rsize read wsize write size) = do - xs <- readTVar read - ys <- readTVar write - if null xs && null ys - then return [] - else do - unless (null xs) $ writeTVar read [] - unless (null ys) $ writeTVar write [] - writeTVar rsize 0 - writeTVar wsize size - return (xs ++ reverse ys) +flushTBQueue :: forall a. TBQueue a -> STM [a] +flushTBQueue (TBQueue _rindex windex elements cap) = do + w <- readTVar windex + go (decMod w cap) [] + where + go :: Int -> [a] -> STM [a] + go i acc = do + ele <- unsafeRead elements i + case ele of + Nothing -> pure acc + Just a -> do + unsafeWrite elements i Nothing + go (decMod i cap) (a : acc) -- | Get the next value from the @TBQueue@ without removing it, --- retrying if the channel is empty. +-- retrying if the queue is empty. peekTBQueue :: TBQueue a -> STM a -peekTBQueue (TBQueue _ read _ write _) = do - xs <- readTVar read - case xs of - (x:_) -> return x - [] -> do - ys <- readTVar write - case ys of - [] -> retry - _ -> do - let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be - -- short, otherwise it will conflict - writeTVar write [] - writeTVar read (z:zs) - return z +peekTBQueue (TBQueue rindex _ elements _) = do + r <- readTVar rindex + ele <- unsafeRead elements r + case ele of + Nothing -> retry + Just a -> pure a -- | A version of 'peekTBQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryPeekTBQueue :: TBQueue a -> STM (Maybe a) -tryPeekTBQueue c = do - m <- tryReadTBQueue c - case m of - Nothing -> return Nothing - Just x -> do - unGetTBQueue c x - return m +tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing -- | Put a data item back onto a channel, where it will be the next item read. --- Blocks if the queue is full. +-- Retries if the queue is full. unGetTBQueue :: TBQueue a -> a -> STM () -unGetTBQueue (TBQueue rsize read wsize _write _size) a = do - r <- readTVar rsize - if (r > 0) - then do writeTVar rsize $! r - 1 - else do - w <- readTVar wsize - if (w > 0) - then writeTVar wsize $! w - 1 - else retry - xs <- readTVar read - writeTVar read (a:xs) +unGetTBQueue (TBQueue rindex _ elements cap) a = do + r <- readTVar rindex + ele <- unsafeRead elements r + case ele of + Nothing -> unsafeWrite elements r (Just a) + Just _ -> retry + writeTVar rindex $! decMod r cap -- | Return the length of a 'TBQueue'. -- -- @since 2.5.0.0 lengthTBQueue :: TBQueue a -> STM Natural -lengthTBQueue (TBQueue rsize _read wsize _write size) = do - r <- readTVar rsize - w <- readTVar wsize - return $! size - r - w +lengthTBQueue (TBQueue rindex windex elements cap) = do + r <- readTVar rindex + w <- readTVar windex + if w == r then do + -- length is 0 or cap + ele <- unsafeRead elements r + case ele of + Nothing -> pure 0 + Just _ -> pure $! fromIntegral cap + else do + let len' = w - r + pure $! fromIntegral (if len' < 0 then len' + cap else len') -- | Returns 'True' if the supplied 'TBQueue' is empty. isEmptyTBQueue :: TBQueue a -> STM Bool -isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do - xs <- readTVar read - case xs of - (_:_) -> return False - [] -> do ys <- readTVar write - case ys of - [] -> return True - _ -> return False +isEmptyTBQueue (TBQueue rindex windex elements _) = do + r <- readTVar rindex + w <- readTVar windex + if w == r then do + ele <- unsafeRead elements r + pure $! isNothing ele + else + pure False -- | Returns 'True' if the supplied 'TBQueue' is full. -- -- @since 2.4.3 isFullTBQueue :: TBQueue a -> STM Bool -isFullTBQueue (TBQueue rsize _read wsize _write _size) = do - w <- readTVar wsize - if (w > 0) - then return False - else do - r <- readTVar rsize - if (r > 0) - then return False - else return True +isFullTBQueue (TBQueue rindex windex elements _) = do + r <- readTVar rindex + w <- readTVar windex + if w == r then do + ele <- unsafeRead elements r + pure $! isJust ele + else + pure False -- | The maximum number of elements the queue can hold. -- -- @since TODO capacityTBQueue :: TBQueue a -> Natural -capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap +capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap diff --git a/bench/chanbench.hs b/bench/chanbench.hs deleted file mode 100644 index 8c534f1..0000000 --- a/bench/chanbench.hs +++ /dev/null @@ -1,59 +0,0 @@ -{-# LANGUAGE CPP, RankNTypes #-} -import Control.Concurrent.Async -import Control.Monad -import System.Environment - -import Control.Concurrent.Chan -import Control.Concurrent.STM -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TBQueue - --- Using CPP rather than a runtime choice between channel types, --- because we want the compiler to be able to optimise the calls. - --- #define CHAN --- #define TCHAN --- #define TQUEUE --- #define TBQUEUE - -#ifdef CHAN -newc = newChan -readc c = readChan c -writec c x = writeChan c x -#elif defined(TCHAN) -newc = newTChanIO -readc c = atomically $ readTChan c -writec c x = atomically $ writeTChan c x -#elif defined(TQUEUE) -newc = newTQueueIO -readc c = atomically $ readTQueue c -writec c x = atomically $ writeTQueue c x -#elif defined(TBQUEUE) -newc = newTBQueueIO 4096 -readc c = atomically $ readTBQueue c -writec c x = atomically $ writeTBQueue c x -#endif - -main = do - [stest,sn] <- getArgs -- 2000000 is a good number - let n = read sn :: Int - test = read stest :: Int - runtest n test - -runtest :: Int -> Int -> IO () -runtest n test = do - c <- newc - case test of - 0 -> do - a <- async $ replicateM_ n $ writec c (1 :: Int) - b <- async $ replicateM_ n $ readc c - waitBoth a b - return () - 1 -> do - replicateM_ n $ writec c (1 :: Int) - replicateM_ n $ readc c - 2 -> do - let n1000 = n `quot` 1000 - replicateM_ 1000 $ do - replicateM_ n1000 $ writec c (1 :: Int) - replicateM_ n1000 $ readc c diff --git a/cabal.project b/cabal.project index c343211..faa73e7 100644 --- a/cabal.project +++ b/cabal.project @@ -1,4 +1,7 @@ -packages: . testsuite/ +packages: + . + testsuite + bench package testsuite tests: true diff --git a/hie.yaml b/hie.yaml new file mode 100644 index 0000000..04cd243 --- /dev/null +++ b/hie.yaml @@ -0,0 +1,2 @@ +cradle: + cabal: diff --git a/testsuite/src/Issue17.hs b/testsuite/src/Issue17.hs deleted file mode 100644 index 06b72f0..0000000 --- a/testsuite/src/Issue17.hs +++ /dev/null @@ -1,73 +0,0 @@ -{-# LANGUAGE CPP #-} - --- see https://github.com/haskell/stm/pull/19 --- --- Test-case contributed by Alexey Kuleshevich --- --- This bug is observable in all versions with TBQueue from `stm-2.4` to --- `stm-2.4.5.1` inclusive. - -module Issue17 (main) where - -import Control.Concurrent.STM -import Test.HUnit.Base (assertBool, assertEqual) - -main :: IO () -main = do - -- New queue capacity is set to 0 - queueIO <- newTBQueueIO 0 - assertNoCapacityTBQueue queueIO - - -- Same as above, except created within STM - queueSTM <- atomically $ newTBQueue 0 - assertNoCapacityTBQueue queueSTM - -#if !MIN_VERSION_stm(2,5,0) - -- NB: below are expected failures - - -- New queue capacity is set to a negative numer - queueIO' <- newTBQueueIO (-1 :: Int) - assertNoCapacityTBQueue queueIO' - - -- Same as above, except created within STM and different negative number - queueSTM' <- atomically $ newTBQueue (minBound :: Int) - assertNoCapacityTBQueue queueSTM' -#endif - -assertNoCapacityTBQueue :: TBQueue Int -> IO () -assertNoCapacityTBQueue queue = do - assertEmptyTBQueue queue - assertFullTBQueue queue - - -- Attempt to write into the queue. - eValWrite <- atomically $ orElse (fmap Left (writeTBQueue queue 217)) - (fmap Right (tryReadTBQueue queue)) - assertEqual "Expected queue with no capacity: writeTBQueue" eValWrite (Right Nothing) - eValUnGet <- atomically $ orElse (fmap Left (unGetTBQueue queue 218)) - (fmap Right (tryReadTBQueue queue)) - assertEqual "Expected queue with no capacity: unGetTBQueue" eValUnGet (Right Nothing) - - -- Make sure that attempt to write didn't affect the queue - assertEmptyTBQueue queue - assertFullTBQueue queue - - -assertEmptyTBQueue :: TBQueue Int -> IO () -assertEmptyTBQueue queue = do - atomically (isEmptyTBQueue queue) >>= - assertBool "Expected empty: isEmptyTBQueue should return True" - - atomically (tryReadTBQueue queue) >>= - assertEqual "Expected empty: tryReadTBQueue should return Nothing" Nothing - - atomically (tryPeekTBQueue queue) >>= - assertEqual "Expected empty: tryPeekTBQueue should return Nothing" Nothing - - atomically (flushTBQueue queue) >>= - assertEqual "Expected empty: flushTBQueue should return []" [] - - -assertFullTBQueue :: TBQueue Int -> IO () -assertFullTBQueue queue = do - atomically (isFullTBQueue queue) >>= - assertBool "Expected full: isFullTBQueue shoule return True" diff --git a/testsuite/src/Main.hs b/testsuite/src/Main.hs index 09802d2..8cbb8db 100644 --- a/testsuite/src/Main.hs +++ b/testsuite/src/Main.hs @@ -6,7 +6,6 @@ import Test.Framework (defaultMain, testGroup) import Test.Framework.Providers.HUnit import qualified Issue9 -import qualified Issue17 import qualified Stm052 import qualified Stm064 import qualified Stm065 @@ -19,7 +18,6 @@ main = do tests = [ testGroup "regression" [ testCase "issue #9" Issue9.main - , testCase "issue #17" Issue17.main , testCase "stm052" Stm052.main , testCase "stm064" Stm064.main , testCase "stm065" Stm065.main diff --git a/testsuite/testsuite.cabal b/testsuite/testsuite.cabal index f77d8e5..863057f 100644 --- a/testsuite/testsuite.cabal +++ b/testsuite/testsuite.cabal @@ -20,7 +20,6 @@ test-suite stm main-is: Main.hs other-modules: Issue9 - Issue17 Stm052 Stm064 Stm065