diff --git a/Control/Concurrent/STM/TQueue.hs b/Control/Concurrent/STM/TQueue.hs index 483db15..a1153b3 100644 --- a/Control/Concurrent/STM/TQueue.hs +++ b/Control/Concurrent/STM/TQueue.hs @@ -1,5 +1,6 @@ {-# OPTIONS_GHC -fno-warn-name-shadowing #-} {-# LANGUAGE CPP, DeriveDataTypeable #-} +{-# LANGUAGE BangPatterns #-} #if __GLASGOW_HASKELL__ >= 701 {-# LANGUAGE Trustworthy #-} @@ -50,22 +51,26 @@ import GHC.Conc import Control.Monad (unless) import Data.Typeable (Typeable) +data End a = End !Int [a] + -- | 'TQueue' is an abstract type representing an unbounded FIFO channel. -- -- @since 2.4 -data TQueue a = TQueue {-# UNPACK #-} !(TVar [a]) - {-# UNPACK #-} !(TVar [a]) +data TQueue a = TQueue {-# UNPACK #-} !(TVar Int) + {-# UNPACK #-} !(TVar (End a)) + {-# UNPACK #-} !(TVar (End a)) deriving Typeable instance Eq (TQueue a) where - TQueue a _ == TQueue b _ = a == b + TQueue a _ _ == TQueue b _ _ = a == b -- |Build and returns a new instance of 'TQueue' newTQueue :: STM (TQueue a) newTQueue = do - read <- newTVar [] - write <- newTVar [] - return (TQueue read write) + old_len <- newTVar 0 + read <- newTVar (End 0 []) + write <- newTVar (End 0 []) + return (TQueue old_len read write) -- |@IO@ version of 'newTQueue'. This is useful for creating top-level -- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using @@ -73,34 +78,45 @@ newTQueue = do -- possible. newTQueueIO :: IO (TQueue a) newTQueueIO = do - read <- newTVarIO [] - write <- newTVarIO [] - return (TQueue read write) + old_len <- newTVarIO 0 + read <- newTVarIO (End 0 []) + write <- newTVarIO (End 0 []) + return (TQueue old_len read write) -- |Write a value to a 'TQueue'. writeTQueue :: TQueue a -> a -> STM () -writeTQueue (TQueue _read write) a = do - listend <- readTVar write - writeTVar write (a:listend) +writeTQueue (TQueue old_len read write) a = do + ol <- readTVar old_len + End write_count listend <- readTVar write + let write_count' = write_count + 1 + if 2 * write_count' >= ol + then do + End read_count front <- readTVar read + let !len = ol + write_count' - read_count + writeTVar old_len len + writeTVar read (End 0 (front ++ reverse listend ++ [a])) + writeTVar write (End 0 []) + else writeTVar write (End write_count' (a:listend)) -- |Read the next value from the 'TQueue'. readTQueue :: TQueue a -> STM a -readTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (x:xs') -> do - writeTVar read xs' - return x - [] -> do - ys <- readTVar write - case ys of - [] -> retry - _ -> do - let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be - -- short, otherwise it will conflict - writeTVar write [] - writeTVar read zs - return z +readTQueue (TQueue old_len read write) = do + ol <- readTVar old_len + End read_count front <- readTVar read + case front of + [] -> retry + (a:as) -> do + let read_count' = read_count + 1 + if 2 * read_count' >= ol + then do + End write_count listend <- readTVar write + let !len = ol + write_count - read_count' + writeTVar old_len len + writeTVar read (End 0 (as ++ reverse listend)) + writeTVar write (End 0 []) + else do + writeTVar read (End read_count' as) + return a -- | A version of 'readTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. @@ -112,45 +128,40 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing -- -- @since 2.4.5 flushTQueue :: TQueue a -> STM [a] -flushTQueue (TQueue read write) = do - xs <- readTVar read - ys <- readTVar write - unless (null xs) $ writeTVar read [] - unless (null ys) $ writeTVar write [] +flushTQueue (TQueue old_len read write) = do + End read_count xs <- readTVar read + End write_count ys <- readTVar write + unless (read_count == 0 && null xs) $ writeTVar read (End 0 []) + unless (write_count == 0 && null ys) $ writeTVar write (End 0 []) + writeTVar old_len 0 return (xs ++ reverse ys) -- | Get the next value from the @TQueue@ without removing it, -- retrying if the channel is empty. peekTQueue :: TQueue a -> STM a -peekTQueue c = do - x <- readTQueue c - unGetTQueue c x - return x +peekTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + case xs of + x:_ -> return x + [] -> retry -- | A version of 'peekTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryPeekTQueue :: TQueue a -> STM (Maybe a) -tryPeekTQueue c = do - m <- tryReadTQueue c - case m of - Nothing -> return Nothing - Just x -> do - unGetTQueue c x - return m +tryPeekTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + case xs of + x:_ -> return (Just x) + [] -> return Nothing -- |Put a data item back onto a channel, where it will be the next item read. unGetTQueue :: TQueue a -> a -> STM () -unGetTQueue (TQueue read _write) a = do - xs <- readTVar read - writeTVar read (a:xs) +unGetTQueue (TQueue _old_len read _write) a = do + End read_count xs <- readTVar read + writeTVar read (End (read_count - 1) (a:xs)) -- |Returns 'True' if the supplied 'TQueue' is empty. isEmptyTQueue :: TQueue a -> STM Bool -isEmptyTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (_:_) -> return False - [] -> do ys <- readTVar write - case ys of - [] -> return True - _ -> return False +isEmptyTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + return $! null xs