Skip to content

Commit

Permalink
Rewrite TBQueue to use TArray Int (Maybe a) (attempt 2)
Browse files Browse the repository at this point in the history
This reverts commit 110318a.
  • Loading branch information
bgamari committed Nov 17, 2023
1 parent e1ff4d4 commit 3671291
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 279 deletions.
267 changes: 124 additions & 143 deletions Control/Concurrent/STM/TBQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,204 +46,185 @@ module Control.Concurrent.STM.TBQueue (
capacityTBQueue,
) where

import Control.Monad (unless)
import Data.Typeable (Typeable)
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
readTVar, retry, writeTVar)
import Numeric.Natural (Natural)
import Prelude hiding (read)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative (pure)
#endif
import Data.Array.Base
import Data.Maybe (isJust, isNothing)
import Data.Typeable (Typeable)
import GHC.Conc
import Numeric.Natural (Natural)
import Prelude hiding (read)

import Control.Concurrent.STM.TArray

-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
= TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity
{-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read
{-# UNPACK #-} !(TVar Natural) -- CW: write capacity
{-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent)
!(Natural) -- CAP: initial capacity
= TBQueue {-# UNPACK #-} !(TVar Int) -- read index
{-# UNPACK #-} !(TVar Int) -- write index
{-# UNPACK #-} !(TArray Int (Maybe a)) -- elements
{-# UNPACK #-} !Int -- initial capacity
deriving Typeable

instance Eq (TBQueue a) where
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b
-- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b

-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR. So in the common case we avoid contention between CR and CW.
--
-- - when removing an element from R:
-- CR := CR + 1
--
-- - when adding an element to W:
-- if CW is non-zero
-- then CW := CW - 1
-- then if CR is non-zero
-- then CW := CR - 1; CR := 0
-- else **FULL**
-- incMod x cap == (x + 1) `mod` cap
incMod :: Int -> Int -> Int
incMod x cap = let y = x + 1 in if y == cap then 0 else y

-- decMod x cap = (x - 1) `mod` cap
decMod :: Int -> Int -> Int
decMod x cap = if x == 0 then cap - 1 else x - 1

-- | Builds and returns a new instance of 'TBQueue'.
newTBQueue :: Natural -- ^ maximum number of elements the queue can hold
-> STM (TBQueue a)
newTBQueue size = do
read <- newTVar []
write <- newTVar []
rsize <- newTVar 0
wsize <- newTVar size
return (TBQueue rsize read wsize write size)
newTBQueue cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVar 0
windex <- newTVar 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap

-- | @IO@ version of 'newTBQueue'. This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Natural -> IO (TBQueue a)
newTBQueueIO size = do
read <- newTVarIO []
write <- newTVarIO []
rsize <- newTVarIO 0
wsize <- newTVarIO size
return (TBQueue rsize read wsize write size)

-- |Write a value to a 'TBQueue'; blocks if the queue is full.
newTBQueueIO cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVarIO 0
windex <- newTVarIO 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap

-- | Write a value to a 'TBQueue'; retries if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
w <- readTVar wsize
if (w > 0)
then do writeTVar wsize $! w - 1
else do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize 0
writeTVar wsize $! r - 1
else retry
listend <- readTVar write
writeTVar write (a:listend)

-- |Read the next value from the 'TBQueue'.
writeTBQueue (TBQueue _ windex elements cap) a = do
w <- readTVar windex
ele <- unsafeRead elements w
case ele of
Nothing -> unsafeWrite elements w (Just a)
Just _ -> retry
writeTVar windex $! incMod w cap

-- | Read the next value from the 'TBQueue'; retries if the queue is empty.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
-- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
let ~(z,zs) = case reverse ys of
z':zs' -> (z',zs')
_ -> error "readTBQueue: impossible"
writeTVar write []
writeTVar read zs
return z
readTBQueue (TBQueue rindex _ elements cap) = do
r <- readTVar rindex
ele <- unsafeRead elements r
a <- case ele of
Nothing -> retry
Just a -> do
unsafeWrite elements r Nothing
pure a
writeTVar rindex $! incMod r cap
pure a

-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` return Nothing
tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing

-- | Efficiently read the entire contents of a 'TBQueue' into a list. This
-- function never retries.
--
-- @since 2.4.5
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue (TBQueue rsize read wsize write size) = do
xs <- readTVar read
ys <- readTVar write
if null xs && null ys
then return []
else do
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
writeTVar rsize 0
writeTVar wsize size
return (xs ++ reverse ys)
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue (TBQueue _rindex windex elements cap) = do
w <- readTVar windex
go (decMod w cap) []
where
go :: Int -> [a] -> STM [a]
go i acc = do
ele <- unsafeRead elements i
case ele of
Nothing -> pure acc
Just a -> do
unsafeWrite elements i Nothing
go (decMod i cap) (a : acc)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
-- retrying if the queue is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
case xs of
(x:_) -> return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z
peekTBQueue (TBQueue rindex _ elements _) = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> retry
Just a -> pure a

-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue c = do
m <- tryReadTBQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTBQueue c x
return m
tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing

-- | Put a data item back onto a channel, where it will be the next item read.
-- Blocks if the queue is full.
-- Retries if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize $! r - 1
else do
w <- readTVar wsize
if (w > 0)
then writeTVar wsize $! w - 1
else retry
xs <- readTVar read
writeTVar read (a:xs)
unGetTBQueue (TBQueue rindex _ elements cap) a = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> unsafeWrite elements r (Just a)
Just _ -> retry
writeTVar rindex $! decMod r cap

-- | Return the length of a 'TBQueue'.
--
-- @since 2.5.0.0
lengthTBQueue :: TBQueue a -> STM Natural
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w
lengthTBQueue (TBQueue rindex windex elements cap) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
-- length is 0 or cap
ele <- unsafeRead elements r
case ele of
Nothing -> pure 0
Just _ -> pure $! fromIntegral cap
else do
let len' = w - r
pure $! fromIntegral (if len' < 0 then len' + cap else len')

-- | Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
isEmptyTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isNothing ele
else
pure False

-- | Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
w <- readTVar wsize
if (w > 0)
then return False
else do
r <- readTVar rsize
if (r > 0)
then return False
else return True
isFullTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isJust ele
else
pure False

-- | The maximum number of elements the queue can hold.
--
-- @since TODO
capacityTBQueue :: TBQueue a -> Natural
capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap
capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap
59 changes: 0 additions & 59 deletions bench/chanbench.hs

This file was deleted.

5 changes: 4 additions & 1 deletion cabal.project
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
packages: . testsuite/
packages:
.
testsuite
bench

package testsuite
tests: true
Loading

0 comments on commit 3671291

Please sign in to comment.