-- | Thread pool implementation. The three names correspond to the following
--   priority levels (highest to lowest):
--
-- * 'addPoolException' - things that probably result in a build error,
--   so kick them off quickly.
--
-- * 'addPoolResume' - things that started, blocked, and may have open
--   resources in their closure.
--
-- * 'addPoolStart' - rules that haven't yet started.
--
-- * 'addPoolBatch' - rules that might batch if other rules start first.
module Development.Shake.Internal.Core.Pool(
    Pool, runPool,
    addPoolException, addPoolResume, addPoolStart, addPoolBatch,
    increasePool
    ) where

import Control.Concurrent.Extra
import System.Time.Extra
import Control.Exception
import Control.Monad.Extra
import General.Timing
import General.Extra
import qualified General.Bag as Bag
import qualified Data.HashSet as Set


---------------------------------------------------------------------
-- UNFAIR/RANDOM QUEUE

data Queue a = Queue
    {Queue a -> Bag a
queueException :: Bag.Bag a
    ,Queue a -> Bag a
queueResume :: Bag.Bag a
    ,Queue a -> Bag a
queueStart :: Bag.Bag a
    ,Queue a -> Bag a
queueBatch :: Bag.Bag a
    }

lensException :: (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensException = (Queue a -> Bag a
forall a. Queue a -> Bag a
queueException, \x :: Queue a
x v :: Bag a
v -> Queue a
x{queueException :: Bag a
queueException=Bag a
v})
lensResume :: (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensResume = (Queue a -> Bag a
forall a. Queue a -> Bag a
queueResume, \x :: Queue a
x v :: Bag a
v -> Queue a
x{queueResume :: Bag a
queueResume=Bag a
v})
lensStart :: (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensStart = (Queue a -> Bag a
forall a. Queue a -> Bag a
queueStart, \x :: Queue a
x v :: Bag a
v -> Queue a
x{queueStart :: Bag a
queueStart=Bag a
v})
lensBatch :: (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensBatch = (Queue a -> Bag a
forall a. Queue a -> Bag a
queueBatch, \x :: Queue a
x v :: Bag a
v -> Queue a
x{queueBatch :: Bag a
queueBatch=Bag a
v})
lenses :: [(Queue a -> Bag a, Queue a -> Bag a -> Queue a)]
lenses = [(Queue a -> Bag a, Queue a -> Bag a -> Queue a)
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensException, (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensResume, (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensStart, (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensBatch]

newQueue :: Bool -> Queue a
newQueue :: Bool -> Queue a
newQueue deterministic :: Bool
deterministic = Bag a -> Bag a -> Bag a -> Bag a -> Queue a
forall a. Bag a -> Bag a -> Bag a -> Bag a -> Queue a
Queue Bag a
forall a. Bag a
b Bag a
forall a. Bag a
b Bag a
forall a. Bag a
b Bag a
forall a. Bag a
b
    where b :: Bag a
b = if Bool
deterministic then Bag a
forall a. Bag a
Bag.emptyPure else Bag a
forall a. Bag a
Bag.emptyRandom

dequeue :: Queue a -> Bag.Randomly (Maybe (a, Queue a))
dequeue :: Queue a -> Randomly (Maybe (a, Queue a))
dequeue q :: Queue a
q = ((Queue a -> Bag a, Queue a -> Bag a -> Queue a)
 -> Randomly (Maybe (a, Queue a)))
-> [(Queue a -> Bag a, Queue a -> Bag a -> Queue a)]
-> Randomly (Maybe (a, Queue a))
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> [a] -> m (Maybe b)
firstJustM (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
-> Randomly (Maybe (a, Queue a))
forall a b.
(Queue a -> Bag a, Queue a -> Bag a -> b) -> IO (Maybe (a, b))
f [(Queue a -> Bag a, Queue a -> Bag a -> Queue a)]
forall a a. [(Queue a -> Bag a, Queue a -> Bag a -> Queue a)]
lenses
    where
        f :: (Queue a -> Bag a, Queue a -> Bag a -> b) -> IO (Maybe (a, b))
f (sel :: Queue a -> Bag a
sel, upd :: Queue a -> Bag a -> b
upd)
            | Just x :: Randomly (a, Bag a)
x <- Bag a -> Maybe (Randomly (a, Bag a))
forall a. Bag a -> Maybe (Randomly (a, Bag a))
Bag.remove (Bag a -> Maybe (Randomly (a, Bag a)))
-> Bag a -> Maybe (Randomly (a, Bag a))
forall a b. (a -> b) -> a -> b
$ Queue a -> Bag a
sel Queue a
q
            = do (x :: a
x,b :: Bag a
b) <- Randomly (a, Bag a)
x; Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO (Maybe (a, b)))
-> Maybe (a, b) -> IO (Maybe (a, b))
forall a b. (a -> b) -> a -> b
$ (a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
x, Queue a -> Bag a -> b
upd Queue a
q Bag a
b)
        f _ = Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing


---------------------------------------------------------------------
-- THREAD POOL

{-
Must keep a list of active threads, so can raise exceptions in a timely manner
If any worker throws an exception, must signal to all the other workers
-}

data Pool = Pool
    !(Var (Maybe S)) -- Current state, 'Nothing' to say we are aborting
    !(Barrier (Either SomeException S)) -- Barrier to signal that we are finished

data S = S
    {S -> HashSet ThreadId
threads :: !(Set.HashSet ThreadId) -- IMPORTANT: Must be strict or we leak thread stacks
    ,S -> Int
threadsLimit :: {-# UNPACK #-} !Int -- user supplied thread limit, Set.size threads <= threadsLimit
    ,S -> Int
threadsMax :: {-# UNPACK #-} !Int -- high water mark of Set.size threads (accounting only)
    ,S -> Int
threadsSum :: {-# UNPACK #-} !Int -- number of threads we have been through (accounting only)
    ,S -> Queue (IO ())
todo :: !(Queue (IO ())) -- operations waiting a thread
    }


emptyS :: Int -> Bool -> S
emptyS :: Int -> Bool -> S
emptyS n :: Int
n deterministic :: Bool
deterministic = HashSet ThreadId -> Int -> Int -> Int -> Queue (IO ()) -> S
S HashSet ThreadId
forall a. HashSet a
Set.empty Int
n 0 0 (Queue (IO ()) -> S) -> Queue (IO ()) -> S
forall a b. (a -> b) -> a -> b
$ Bool -> Queue (IO ())
forall a. Bool -> Queue a
newQueue Bool
deterministic


worker :: Pool -> IO ()
worker :: Pool -> IO ()
worker pool :: Pool
pool@(Pool var :: Var (Maybe S)
var done :: Barrier (Either SomeException S)
done) = do
    let onVar :: (S -> IO (Maybe S, m ())) -> IO (m ())
onVar act :: S -> IO (Maybe S, m ())
act = Var (Maybe S) -> (Maybe S -> IO (Maybe S, m ())) -> IO (m ())
forall a b. Var a -> (a -> IO (a, b)) -> IO b
modifyVar Var (Maybe S)
var ((Maybe S -> IO (Maybe S, m ())) -> IO (m ()))
-> (Maybe S -> IO (Maybe S, m ())) -> IO (m ())
forall a b. (a -> b) -> a -> b
$ IO (Maybe S, m ())
-> (S -> IO (Maybe S, m ())) -> Maybe S -> IO (Maybe S, m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Maybe S, m ()) -> IO (Maybe S, m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe S
forall a. Maybe a
Nothing, () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())) S -> IO (Maybe S, m ())
act
    IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (S -> IO (Maybe S, IO ())) -> IO (IO ())
forall (m :: * -> *).
Monad m =>
(S -> IO (Maybe S, m ())) -> IO (m ())
onVar ((S -> IO (Maybe S, IO ())) -> IO (IO ()))
-> (S -> IO (Maybe S, IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> do
        Maybe (IO (), Queue (IO ()))
res <- Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ())))
forall a. Queue a -> Randomly (Maybe (a, Queue a))
dequeue (Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ()))))
-> Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ())))
forall a b. (a -> b) -> a -> b
$ S -> Queue (IO ())
todo S
s
        case Maybe (IO (), Queue (IO ()))
res of
            Nothing -> (Maybe S, IO ()) -> IO (Maybe S, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (S -> Maybe S
forall a. a -> Maybe a
Just S
s, () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
            Just (now :: IO ()
now, todo2 :: Queue (IO ())
todo2) -> (Maybe S, IO ()) -> IO (Maybe S, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (S -> Maybe S
forall a. a -> Maybe a
Just S
s{todo :: Queue (IO ())
todo = Queue (IO ())
todo2}, IO ()
now IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Pool -> IO ()
worker Pool
pool)

-- | Given a pool, and a function that breaks the S invariants, restore them
--   They are only allowed to touch threadsLimit or todo
step :: Pool -> (S -> Bag.Randomly S) -> IO ()
step :: Pool -> (S -> Randomly S) -> IO ()
step pool :: Pool
pool@(Pool var :: Var (Maybe S)
var done :: Barrier (Either SomeException S)
done) op :: S -> Randomly S
op = do
    let onVar :: (S -> IO (Maybe S)) -> IO ()
onVar act :: S -> IO (Maybe S)
act = Var (Maybe S) -> (Maybe S -> IO (Maybe S)) -> IO ()
forall a. Var a -> (a -> IO a) -> IO ()
modifyVar_ Var (Maybe S)
var ((Maybe S -> IO (Maybe S)) -> IO ())
-> (Maybe S -> IO (Maybe S)) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe S) -> (S -> IO (Maybe S)) -> Maybe S -> IO (Maybe S)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe S
forall a. Maybe a
Nothing) S -> IO (Maybe S)
act
    (S -> IO (Maybe S)) -> IO ()
onVar ((S -> IO (Maybe S)) -> IO ()) -> (S -> IO (Maybe S)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> do
        S
s <- S -> Randomly S
op S
s
        Maybe (IO (), Queue (IO ()))
res <- Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ())))
forall a. Queue a -> Randomly (Maybe (a, Queue a))
dequeue (Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ()))))
-> Queue (IO ()) -> Randomly (Maybe (IO (), Queue (IO ())))
forall a b. (a -> b) -> a -> b
$ S -> Queue (IO ())
todo S
s
        case Maybe (IO (), Queue (IO ()))
res of
            Just (now :: IO ()
now, todo2 :: Queue (IO ())
todo2) | HashSet ThreadId -> Int
forall a. HashSet a -> Int
Set.size (S -> HashSet ThreadId
threads S
s) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< S -> Int
threadsLimit S
s -> do
                -- spawn a new worker
                ThreadId
t <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinallyUnmasked (IO ()
now IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Pool -> IO ()
worker Pool
pool) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \res :: Either SomeException ()
res -> case Either SomeException ()
res of
                    Left e :: SomeException
e -> (S -> IO (Maybe S)) -> IO ()
onVar ((S -> IO (Maybe S)) -> IO ()) -> (S -> IO (Maybe S)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> do
                        ThreadId
t <- IO ThreadId
myThreadId
                        (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread ([ThreadId] -> IO ()) -> [ThreadId] -> IO ()
forall a b. (a -> b) -> a -> b
$ HashSet ThreadId -> [ThreadId]
forall a. HashSet a -> [a]
Set.toList (HashSet ThreadId -> [ThreadId]) -> HashSet ThreadId -> [ThreadId]
forall a b. (a -> b) -> a -> b
$ ThreadId -> HashSet ThreadId -> HashSet ThreadId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.delete ThreadId
t (HashSet ThreadId -> HashSet ThreadId)
-> HashSet ThreadId -> HashSet ThreadId
forall a b. (a -> b) -> a -> b
$ S -> HashSet ThreadId
threads S
s
                        Barrier (Either SomeException S) -> Either SomeException S -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier (Either SomeException S)
done (Either SomeException S -> IO ())
-> Either SomeException S -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException S
forall a b. a -> Either a b
Left SomeException
e
                        Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe S
forall a. Maybe a
Nothing
                    Right _ -> do
                        ThreadId
t <- IO ThreadId
myThreadId
                        Pool -> (S -> Randomly S) -> IO ()
step Pool
pool ((S -> Randomly S) -> IO ()) -> (S -> Randomly S) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> S -> Randomly S
forall (m :: * -> *) a. Monad m => a -> m a
return S
s{threads :: HashSet ThreadId
threads = ThreadId -> HashSet ThreadId -> HashSet ThreadId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.delete ThreadId
t (HashSet ThreadId -> HashSet ThreadId)
-> HashSet ThreadId -> HashSet ThreadId
forall a b. (a -> b) -> a -> b
$ S -> HashSet ThreadId
threads S
s}
                let threads2 :: HashSet ThreadId
threads2 = ThreadId -> HashSet ThreadId -> HashSet ThreadId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.insert ThreadId
t (HashSet ThreadId -> HashSet ThreadId)
-> HashSet ThreadId -> HashSet ThreadId
forall a b. (a -> b) -> a -> b
$ S -> HashSet ThreadId
threads S
s
                Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe S -> IO (Maybe S)) -> Maybe S -> IO (Maybe S)
forall a b. (a -> b) -> a -> b
$ S -> Maybe S
forall a. a -> Maybe a
Just S
s{todo :: Queue (IO ())
todo = Queue (IO ())
todo2, threads :: HashSet ThreadId
threads = HashSet ThreadId
threads2
                               ,threadsSum :: Int
threadsSum = S -> Int
threadsSum S
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1, threadsMax :: Int
threadsMax = S -> Int
threadsMax S
s Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max` HashSet ThreadId -> Int
forall a. HashSet a -> Int
Set.size HashSet ThreadId
threads2}
            Nothing | HashSet ThreadId -> Bool
forall a. HashSet a -> Bool
Set.null (HashSet ThreadId -> Bool) -> HashSet ThreadId -> Bool
forall a b. (a -> b) -> a -> b
$ S -> HashSet ThreadId
threads S
s -> do
                Barrier (Either SomeException S) -> Either SomeException S -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier (Either SomeException S)
done (Either SomeException S -> IO ())
-> Either SomeException S -> IO ()
forall a b. (a -> b) -> a -> b
$ S -> Either SomeException S
forall a b. b -> Either a b
Right S
s
                Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe S
forall a. Maybe a
Nothing
            _ -> Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe S -> IO (Maybe S)) -> Maybe S -> IO (Maybe S)
forall a b. (a -> b) -> a -> b
$ S -> Maybe S
forall a. a -> Maybe a
Just S
s


addPool :: (Queue (IO ()) -> Bag (f ()),
 Queue (IO ()) -> Bag (f ()) -> Queue (IO ()))
-> Pool -> f a -> IO ()
addPool (sel :: Queue (IO ()) -> Bag (f ())
sel, upd :: Queue (IO ()) -> Bag (f ()) -> Queue (IO ())
upd) pool :: Pool
pool act :: f a
act = Pool -> (S -> Randomly S) -> IO ()
step Pool
pool ((S -> Randomly S) -> IO ()) -> (S -> Randomly S) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s ->
    S -> Randomly S
forall (m :: * -> *) a. Monad m => a -> m a
return S
s{todo :: Queue (IO ())
todo = Queue (IO ()) -> Bag (f ()) -> Queue (IO ())
upd (S -> Queue (IO ())
todo S
s) (Bag (f ()) -> Queue (IO ())) -> Bag (f ()) -> Queue (IO ())
forall a b. (a -> b) -> a -> b
$ f () -> Bag (f ()) -> Bag (f ())
forall a. a -> Bag a -> Bag a
Bag.insert (f a -> f ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void f a
act) (Bag (f ()) -> Bag (f ())) -> Bag (f ()) -> Bag (f ())
forall a b. (a -> b) -> a -> b
$ Queue (IO ()) -> Bag (f ())
sel (Queue (IO ()) -> Bag (f ())) -> Queue (IO ()) -> Bag (f ())
forall a b. (a -> b) -> a -> b
$ S -> Queue (IO ())
todo S
s}


-- | Add a new task to the pool. See the top of the module for the relative ordering
--   and semantics.
addPoolException, addPoolResume, addPoolStart :: Pool -> IO a -> IO ()
addPoolException :: Pool -> IO a -> IO ()
addPoolException = (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
-> Pool -> IO a -> IO ()
forall (f :: * -> *) a.
Functor f =>
(Queue (IO ()) -> Bag (f ()),
 Queue (IO ()) -> Bag (f ()) -> Queue (IO ()))
-> Pool -> f a -> IO ()
addPool (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensException
addPoolResume :: Pool -> IO a -> IO ()
addPoolResume = (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
-> Pool -> IO a -> IO ()
forall (f :: * -> *) a.
Functor f =>
(Queue (IO ()) -> Bag (f ()),
 Queue (IO ()) -> Bag (f ()) -> Queue (IO ()))
-> Pool -> f a -> IO ()
addPool (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensResume
addPoolStart :: Pool -> IO a -> IO ()
addPoolStart = (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
-> Pool -> IO a -> IO ()
forall (f :: * -> *) a.
Functor f =>
(Queue (IO ()) -> Bag (f ()),
 Queue (IO ()) -> Bag (f ()) -> Queue (IO ()))
-> Pool -> f a -> IO ()
addPool (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensStart
addPoolBatch :: Pool -> IO a -> IO ()
addPoolBatch = (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
-> Pool -> IO a -> IO ()
forall (f :: * -> *) a.
Functor f =>
(Queue (IO ()) -> Bag (f ()),
 Queue (IO ()) -> Bag (f ()) -> Queue (IO ()))
-> Pool -> f a -> IO ()
addPool (Queue (IO ()) -> Bag (IO ()),
 Queue (IO ()) -> Bag (IO ()) -> Queue (IO ()))
forall a a. (Queue a -> Bag a, Queue a -> Bag a -> Queue a)
lensBatch


-- | Temporarily increase the pool by 1 thread. Call the cleanup action to restore the value.
--   After calling cleanup you should requeue onto a new thread.
increasePool :: Pool -> IO (IO ())
increasePool :: Pool -> IO (IO ())
increasePool pool :: Pool
pool = do
    Pool -> (S -> Randomly S) -> IO ()
step Pool
pool ((S -> Randomly S) -> IO ()) -> (S -> Randomly S) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> S -> Randomly S
forall (m :: * -> *) a. Monad m => a -> m a
return S
s{threadsLimit :: Int
threadsLimit = S -> Int
threadsLimit S
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}
    IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ Pool -> (S -> Randomly S) -> IO ()
step Pool
pool ((S -> Randomly S) -> IO ()) -> (S -> Randomly S) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: S
s -> S -> Randomly S
forall (m :: * -> *) a. Monad m => a -> m a
return S
s{threadsLimit :: Int
threadsLimit = S -> Int
threadsLimit S
s Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1}


-- | Run all the tasks in the pool on the given number of works.
--   If any thread throws an exception, the exception will be reraised.
--   When it completes all threads have either finished, or have had 'killThread'
--   called on them (but may not have actually died yet).
runPool :: Bool -> Int -> (Pool -> IO ()) -> IO () -- run all tasks in the pool
runPool :: Bool -> Int -> (Pool -> IO ()) -> IO ()
runPool deterministic :: Bool
deterministic n :: Int
n act :: Pool -> IO ()
act = do
    Var (Maybe S)
s <- Maybe S -> IO (Var (Maybe S))
forall a. a -> IO (Var a)
newVar (Maybe S -> IO (Var (Maybe S))) -> Maybe S -> IO (Var (Maybe S))
forall a b. (a -> b) -> a -> b
$ S -> Maybe S
forall a. a -> Maybe a
Just (S -> Maybe S) -> S -> Maybe S
forall a b. (a -> b) -> a -> b
$ Int -> Bool -> S
emptyS Int
n Bool
deterministic
    Barrier (Either SomeException S)
done <- IO (Barrier (Either SomeException S))
forall a. IO (Barrier a)
newBarrier

    let cleanup :: IO ()
cleanup = Var (Maybe S) -> (Maybe S -> IO (Maybe S)) -> IO ()
forall a. Var a -> (a -> IO a) -> IO ()
modifyVar_ Var (Maybe S)
s ((Maybe S -> IO (Maybe S)) -> IO ())
-> (Maybe S -> IO (Maybe S)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \s :: Maybe S
s -> do
            -- if someone kills our thread, make sure we kill our child threads
            case Maybe S
s of
                Just s :: S
s -> (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread ([ThreadId] -> IO ()) -> [ThreadId] -> IO ()
forall a b. (a -> b) -> a -> b
$ HashSet ThreadId -> [ThreadId]
forall a. HashSet a -> [a]
Set.toList (HashSet ThreadId -> [ThreadId]) -> HashSet ThreadId -> [ThreadId]
forall a b. (a -> b) -> a -> b
$ S -> HashSet ThreadId
threads S
s
                Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Maybe S -> IO (Maybe S)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe S
forall a. Maybe a
Nothing

    let ghc10793 :: IO b
ghc10793 = do
            -- if this thread dies because it is blocked on an MVar there's a chance we have
            -- a better error in the done barrier, and GHC raised the exception wrongly, see:
            -- https://ghc.haskell.org/trac/ghc/ticket/10793
            Seconds -> IO ()
sleep 1 -- give it a little bit of time for the finally to run
                    -- no big deal, since the blocked indefinitely takes a while to fire anyway
            Maybe (Either SomeException S)
res <- Barrier (Either SomeException S)
-> IO (Maybe (Either SomeException S))
forall a. Barrier a -> IO (Maybe a)
waitBarrierMaybe Barrier (Either SomeException S)
done
            case Maybe (Either SomeException S)
res of
                Just (Left e) -> SomeException -> IO b
forall e a. Exception e => e -> IO a
throwIO SomeException
e
                _ -> BlockedIndefinitelyOnMVar -> IO b
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar
    (BlockedIndefinitelyOnMVar -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (\BlockedIndefinitelyOnMVar -> IO ()
forall b. IO b
ghc10793) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
onException IO ()
cleanup (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let pool :: Pool
pool = Var (Maybe S) -> Barrier (Either SomeException S) -> Pool
Pool Var (Maybe S)
s Barrier (Either SomeException S)
done
        Pool -> IO () -> IO ()
forall a. Pool -> IO a -> IO ()
addPoolStart Pool
pool (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Pool -> IO ()
act Pool
pool
        Either SomeException S
res <- Barrier (Either SomeException S) -> IO (Either SomeException S)
forall a. Barrier a -> IO a
waitBarrier Barrier (Either SomeException S)
done
        case Either SomeException S
res of
            Left e :: SomeException
e -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
e
            Right s :: S
s -> String -> IO ()
addTiming (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "Pool finished (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (S -> Int
threadsSum S
s) String -> String -> String
forall a. [a] -> [a] -> [a]
++ " threads, " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (S -> Int
threadsMax S
s) String -> String -> String
forall a. [a] -> [a] -> [a]
++ " max)"