{-# LANGUAGE Trustworthy #-}
{-# OPTIONS_GHC -funbox-strict-fields #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.QSemN
-- Copyright   :  (c) The University of Glasgow 2001
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- 
-- Maintainer  :  [email protected]
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Quantity semaphores in which each thread may wait for an arbitrary
-- \"amount\".
--
-----------------------------------------------------------------------------

module Control.Concurrent.QSemN
        (  -- * General Quantity Semaphores
          QSemN,        -- abstract
          newQSemN,     -- :: Int   -> IO QSemN
          waitQSemN,    -- :: QSemN -> Int -> IO ()
          signalQSemN   -- :: QSemN -> Int -> IO ()
      ) where

import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar
                          , tryPutMVar, isEmptyMVar)
import Control.Exception
import Control.Monad (when)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO.Unsafe (unsafePerformIO)

-- | 'QSemN' is a quantity semaphore in which the resource is acquired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSemN` calls.
--
-- The pattern
--
-- >   bracket_ (waitQSemN n) (signalQSemN n) (...)
--
-- is safe; it never loses any of the resource.
--
data QSemN = QSemN !(IORef (Int, [(Int, MVar ())], [(Int, MVar ())]))

-- The semaphore state (i, xs, ys):
--
--   i is the current resource value
--
--   (xs,ys) is the queue of blocked threads, where the queue is
--           given by xs ++ reverse ys.  We can enqueue new blocked threads
--           by consing onto ys, and dequeue by removing from the head of xs.
--
-- A blocked thread is represented by an empty (MVar ()).  To unblock
-- the thread, we put () into the MVar.
--
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSemN.
-- This means that when unblocking a thread in signalQSemN we must
-- first check whether the MVar is already full.

-- |Build a new 'QSemN' with a supplied initial quantity.
--  The initial quantity must be at least 0.
newQSemN :: Int -> IO QSemN
newQSemN :: Int -> IO QSemN
newQSemN Int
initial
  | Int
initial forall a. Ord a => a -> a -> Bool
< Int
0 = forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"newQSemN: Initial quantity must be non-negative"
  | Bool
otherwise   = do
      IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
sem <- forall a. a -> IO (IORef a)
newIORef (Int
initial, [], [])
      forall (m :: * -> *) a. Monad m => a -> m a
return (IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> QSemN
QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
sem)

-- An unboxed version of Maybe (MVar a)
data MaybeMV a = JustMV !(MVar a) | NothingMV

-- |Wait for the specified quantity to become available
waitQSemN :: QSemN -> Int -> IO ()
-- We need to mask here. Once we've enqueued our MVar, we need
-- to be sure to wait for it. Otherwise, we could lose our
-- allocated resource.
waitQSemN :: QSemN -> Int -> IO ()
waitQSemN qs :: QSemN
qs@(QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
m) Int
sz = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
    -- unsafePerformIO and not unsafeDupablePerformIO. We must
    -- be sure to wait on the same MVar that gets enqueued.
  MaybeMV ()
mmvar <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
m forall a b. (a -> b) -> a -> b
$ \ (Int
i,[(Int, MVar ())]
b1,[(Int, MVar ())]
b2) -> forall a. IO a -> a
unsafePerformIO forall a b. (a -> b) -> a -> b
$ do
    let z :: Int
z = Int
iforall a. Num a => a -> a -> a
-Int
sz
    if Int
z forall a. Ord a => a -> a -> Bool
< Int
0
      then do
        MVar ()
b <- forall a. IO (MVar a)
newEmptyMVar
        forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i, [(Int, MVar ())]
b1, (Int
sz,MVar ()
b)forall a. a -> [a] -> [a]
:[(Int, MVar ())]
b2), forall a. MVar a -> MaybeMV a
JustMV MVar ()
b)
      else forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
z, [(Int, MVar ())]
b1, [(Int, MVar ())]
b2), forall a. MaybeMV a
NothingMV)

  -- Note: this case match actually allocates the MVar if necessary.
  case MaybeMV ()
mmvar of
    MaybeMV ()
NothingMV -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    JustMV MVar ()
b -> MVar () -> IO ()
wait MVar ()
b
  where
    wait :: MVar () -> IO ()
    wait :: MVar () -> IO ()
wait MVar ()
b =
      forall a. MVar a -> IO a
takeMVar MVar ()
b forall a b. IO a -> IO b -> IO a
`onException` do
        Bool
already_filled <- Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
b ()
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
already_filled forall a b. (a -> b) -> a -> b
$ QSemN -> Int -> IO ()
signalQSemN QSemN
qs Int
sz

-- |Signal that a given quantity is now available from the 'QSemN'.
signalQSemN :: QSemN -> Int -> IO ()
-- We don't need to mask here because we should *already* be masked
-- here (e.g., by bracket). Indeed, if we're not already masked,
-- it's too late to do so.
--
-- What if the unsafePerformIO thunk is forced in another thread,
-- and receives an asynchronous exception? That shouldn't be a
-- problem: when we force it ourselves, presumably masked, we
-- will resume its execution.
signalQSemN :: QSemN -> Int -> IO ()
signalQSemN (QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
m) Int
sz0 = do
    -- unsafePerformIO and not unsafeDupablePerformIO. We must not
    -- wake up more threads than we're supposed to.
  ()
unit <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())])
m forall a b. (a -> b) -> a -> b
$ \(Int
i,[(Int, MVar ())]
a1,[(Int, MVar ())]
a2) ->
            forall a. IO a -> a
unsafePerformIO (forall {a}.
(Num a, Ord a) =>
a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop (Int
sz0 forall a. Num a => a -> a -> a
+ Int
i) [(Int, MVar ())]
a1 [(Int, MVar ())]
a2)

  -- Forcing this will actually wake the necessary threads.
  forall a. a -> IO a
evaluate ()
unit
 where
   loop :: a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop a
0  [(a, MVar ())]
bs [(a, MVar ())]
b2 = forall (m :: * -> *) a. Monad m => a -> m a
return ((a
0,  [(a, MVar ())]
bs, [(a, MVar ())]
b2), ())
   loop a
sz [] [] = forall (m :: * -> *) a. Monad m => a -> m a
return ((a
sz, [], []), ())
   loop a
sz [] [(a, MVar ())]
b2 = a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop a
sz (forall a. [a] -> [a]
reverse [(a, MVar ())]
b2) []
   loop a
sz ((a
j,MVar ()
b):[(a, MVar ())]
bs) [(a, MVar ())]
b2
     | a
j forall a. Ord a => a -> a -> Bool
> a
sz = do
       Bool
r <- forall a. MVar a -> IO Bool
isEmptyMVar MVar ()
b
       if Bool
r then forall (m :: * -> *) a. Monad m => a -> m a
return ((a
sz, (a
j,MVar ()
b)forall a. a -> [a] -> [a]
:[(a, MVar ())]
bs, [(a, MVar ())]
b2), ())
            else a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop a
sz [(a, MVar ())]
bs [(a, MVar ())]
b2
     | Bool
otherwise = do
       Bool
r <- forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
b ()
       if Bool
r then a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop (a
szforall a. Num a => a -> a -> a
-a
j) [(a, MVar ())]
bs [(a, MVar ())]
b2
            else a
-> [(a, MVar ())]
-> [(a, MVar ())]
-> IO ((a, [(a, MVar ())], [(a, MVar ())]), ())
loop a
sz [(a, MVar ())]
bs [(a, MVar ())]
b2