Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions monad-par/Control/Monad/Par/Scheds/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module Control.Monad.Par.Scheds.Trace (

import qualified Control.Monad.Par.Class as PC
import Control.Monad.Par.Scheds.TraceInternal
( IVar, Par (..), Trace (..), get, new, put
, put_, newFull, newFull_, runCont, runPar, runParIO)
import Control.DeepSeq
import Control.Monad as M hiding (mapM, sequence, join)
import Prelude hiding (mapM, sequence, head,tail)
Expand Down
201 changes: 146 additions & 55 deletions monad-par/Control/Monad/Par/Scheds/TraceInternal.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
ExistentialQuantification, CPP #-}
{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns, LambdaCase,
ExistentialQuantification, CPP, MagicHash, UnboxedTuples #-}
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-}

-- | This module exposes the internals of the @Par@ monad so that you
Expand All @@ -18,13 +18,18 @@ module Control.Monad.Par.Scheds.TraceInternal (
) where


import Control.Monad as M hiding (mapM, sequence, join)
import Control.Monad as M hiding (mapM, sequence)
import Prelude hiding (mapM, sequence, head,tail)
import Data.IORef
import System.IO.Unsafe
import Control.Concurrent hiding (yield)
import GHC.Conc (numCapabilities)
import Control.DeepSeq
import GHC.IO (IO (..))
import GHC.IORef (IORef (..))
import GHC.STRef (STRef (..))
import GHC.Exts (casMutVar#, lazy, MutVar#, RealWorld)
--import Control.Exception (evaluate)
-- import Text.Printf

#if !MIN_VERSION_base(4,8,0)
Expand All @@ -39,38 +44,73 @@ forkOn = forkOnIO

-- ---------------------------------------------------------------------------

data Trace = forall a . Get (IVar a) (a -> Trace)
| forall a . Put (IVar a) a Trace
| forall a . New (IVarContents a) (IVar a -> Trace)
data Trace = forall a . Get !(IVar a) (a -> Trace)
| forall a . Put !(IVar a) a Trace
-- Why does the continuation take a MutVar# instead of an IVar?
-- In the common case that we don't actually *need* the IVar wrapper,
-- we'd rather not allocate it.
| forall a . New !(IVarContents a) (MutVar# RealWorld (IVarContents a) -> Trace)
| Fork Trace Trace
| Done
| Yield Trace
| forall a . LiftIO (IO a) (a -> Trace)

-- |A thin wrapper around 'casMutVar#'.
casIORef_ :: IORef a -> a -> a -> IO Bool
casIORef_ (IORef (STRef ref)) old new =
IO $ \s -> case casMutVar# ref old new s of
(# s', 0#, _latest #) -> (# s', True #)
(# s', _, _latest #) -> (# s', False #)

-- This is similar to atomicModifyIORef, but it applies the
-- function eagerly. This should only be used when the function
-- is very cheap: it must take very little time to avoid CAS
-- failures, and it should not allocate *any* memory, since it
-- will be run again on CAS failure.
atomicModifyIORefEager :: IORef a -> (a -> (# Maybe a, b #)) -> IO b
atomicModifyIORefEager ref f = go ref
where
go ref = do
old <- readIORef ref
-- The lazy here is a bit paranoid; we want to be sure that GHC
-- doesn't decide to substitute an unfolding for old in the call
-- to casIORef. I don't know if that can happen anyway, but
-- being extra-careful shouldn't hurt.
case f (lazy old) of
(# Nothing, res #) -> pure res
(# Just new, res #) -> do
done <- casIORef_ ref old new
if done
then pure res
else go ref
{-# INLINE atomicModifyIORefEager #-}


-- | The main scheduler loop.
sched :: Bool -> Sched -> Trace -> IO ()
sched _doSync queue t = loop t
sched _doSync !queue t = loop t
where
loop t = case t of
New a f -> do
r <- newIORef a
loop (f (IVar r))
IORef (STRef r) <- newIORef a
loop (f r)
Get (IVar v) c -> do
e <- readIORef v
case e of
-- Optimistic check for Full avoids the expense of
-- atomicModifyIORef
e0 <- readIORef v
case e0 of
Full a -> loop (c a)
_other -> do
_ -> do
r <- atomicModifyIORef v $ \e -> case e of
Empty -> (Blocked [c], reschedule queue)
Full a -> (Full a, loop (c a))
Blocked cs -> (Blocked (c:cs), reschedule queue)
r
Full a -> (Full a, Just a)
cs -> (Blocked c cs, Nothing)
maybe (reschedule queue) (loop . c) r
Put (IVar v) a t -> do
cs <- atomicModifyIORef v $ \e -> case e of
Empty -> (Full a, [])
Full _ -> error "multiple put"
Blocked cs -> (Full a, cs)
mapM_ (pushWork queue. ($a)) cs
let fulla = Full a
cs <- atomicModifyIORefEager v $ \case
Full _ -> error "multiple put"
cs -> (# Just fulla, cs #)
icmapM_ (pushWork queue . ($a)) cs
loop t
Fork child parent -> do
pushWork queue child
Expand All @@ -89,9 +129,9 @@ sched _doSync queue t = loop t
Yield parent -> do
-- Go to the end of the worklist:
let Sched { workpool } = queue
-- TODO: Perhaps consider Data.Seq here.
-- TODO: Perhaps consider some sort of deque here
-- This would also be a chance to steal and work from opposite ends of the queue.
atomicModifyIORef workpool $ \ts -> (ts++[parent], ())
() <- atomicModifyIORef workpool $ \ts -> (ts++[parent], ())
reschedule queue
LiftIO io c -> do
r <- io
Expand All @@ -101,6 +141,9 @@ sched _doSync queue t = loop t
-- work-stealing mode.
reschedule :: Sched -> IO ()
reschedule queue@Sched{ workpool } = do
-- We don't use atomicModifyIORefEager for workpools, because
-- yields can cause those to contain thunks, and forcing thunks
-- can easily lead to CAS failures.
e <- atomicModifyIORef workpool $ \ts ->
case ts of
[] -> ([], Nothing)
Expand All @@ -109,7 +152,6 @@ reschedule queue@Sched{ workpool } = do
Nothing -> steal queue
Just t -> sched True queue t


-- RRN: Note -- NOT doing random work stealing breaks the traditional
-- Cilk time/space bounds if one is running strictly nested (series
-- parallel) programs.
Expand All @@ -121,11 +163,11 @@ steal q@Sched{ idle, scheds, no=my_no } = do
go scheds
where
go [] = do m <- newEmptyMVar
r <- atomicModifyIORef idle $ \is -> (m:is, is)
if length r == numCapabilities - 1
r <- atomicModifyIORef idle $ \is -> (m `ICons` is, is)
if ilength r == numCapabilities - 1
then do
-- printf "cpu %d initiating shutdown\n" my_no
mapM_ (\m -> putMVar m True) r
imapM_ (\m -> putMVar m True) r
else do
done <- takeMVar m
if done
Expand All @@ -138,35 +180,64 @@ steal q@Sched{ idle, scheds, no=my_no } = do
go (x:xs)
| no x == my_no = go xs
| otherwise = do
r <- atomicModifyIORef (workpool x) $ \ ts ->
case ts of
[] -> ([], Nothing)
(x:xs) -> (xs, Just x)
case r of
Just t -> do
-- printf "cpu %d got work from cpu %d\n" my_no (no x)
sched True q t
Nothing -> go xs
-- Cheaply skip over empty workpools.
r0 <- readIORef (workpool x)
case r0 of
[] -> go xs
_ -> do
r <- atomicModifyIORef (workpool x) $ \ ts ->
case ts of
[] -> ([], Nothing)
(t:ts') -> (ts', Just t)
case r of
Just t -> do
-- printf "cpu %d got work from cpu %d\n" my_no (no x)
sched True q t
Nothing -> go xs

-- | If any worker is idle, wake one up and give it work to do.
pushWork :: Sched -> Trace -> IO ()
pushWork Sched { workpool, idle } t = do
atomicModifyIORef workpool $ \ts -> (t:ts, ())
idles <- readIORef idle
when (not (null idles)) $ do
r <- atomicModifyIORef idle (\is -> case is of
[] -> ([], return ())
(i:is) -> (is, putMVar i False))
r -- wake one up
when (not (inull idles)) $ do
r <- atomicModifyIORefEager idle (\is -> case lazy is of
INil -> (# Nothing, INil #)
(_ `ICons` is') -> (# Just is', is #))
case r of
INil -> pure ()
i `ICons` _ -> putMVar i False -- wake one up

data Sched = Sched
{ no :: {-# UNPACK #-} !Int,
workpool :: IORef [Trace],
idle :: IORef [MVar Bool],
workpool :: !(IORef [Trace]),
idle :: !(IORef IdleList),
scheds :: [Sched] -- Global list of all per-thread workers.
}
-- deriving Show

data IdleList = ICons !(MVar Bool) IdleList | INil
inull :: IdleList -> Bool
inull INil = True
inull (ICons _ _) = False

imapM_ :: Applicative f => (MVar Bool -> f a) -> IdleList -> f ()
imapM_ f = go
where
go INil = pure ()
go (ICons x xs) = f x *> go xs

ilength :: IdleList -> Int
ilength = go 0 where
go !acc INil = acc
go acc (ICons _ xs) = go (acc + 1) xs

icmapM_ :: Applicative f => ((a -> Trace) -> f b) -> IVarContents a -> f ()
icmapM_ f = go
where
go (Blocked c cs) = f c *> go cs
go _ = pure ()

newtype Par a = Par {
runCont :: (a -> Trace) -> Trace
}
Expand All @@ -189,30 +260,50 @@ newtype IVar a = IVar (IORef (IVarContents a))
instance Eq (IVar a) where
(IVar r1) == (IVar r2) = r1 == r2

-- Forcing evaluation of a IVar is fruitless.
-- We just force the IORef; the contents can't be forced.
instance NFData (IVar a) where
rnf _ = ()
rnf !_ = ()


-- From outside the Par computation we can peek. But this is nondeterministic.
pollIVar :: IVar a -> IO (Maybe a)
pollIVar (IVar ref) =
do contents <- readIORef ref
case contents of
Full x -> return (Just x)
_ -> return (Nothing)
Full x -> return (Just x)
_ -> return Nothing


-- Invariant: Full should only appear in an outermost position.
-- Something like Blocked f (Full a) is prohibited.
-- We used to use Blocked [a -> Trace], but that requires
-- an extra wrapper allocation to add an element.
data IVarContents a = Full a | Empty | Blocked (a -> Trace) !(IVarContents a)

data IVarContents a = Full a | Empty | Blocked [a -> Trace]
-- A strict-spined list of unboxed IORefs.
data IORefList a = RLCons !(IORef a) !(IORefList a) | RLNil

buildRL :: Int -> IO (IORefList [a])
buildRL = go RLNil
where
go acc 0 = pure acc
go acc n = do
ref <- newIORef []
go (RLCons ref acc) (n - 1)

buildStates :: Int
-> IORefList [Trace] -> IORef IdleList -> [Sched] -> [Sched]
buildStates !_k RLNil !_idl _states = []
buildStates k (RLCons wp wps) !idl states =
Sched { no = k, workpool=wp, idle = idl, scheds=states } :
buildStates (k + 1) wps idl states

{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> IO a
runPar_internal _doSync x = do
workpools <- replicateM numCapabilities $ newIORef []
idle <- newIORef []
let states = [ Sched { no=x, workpool=wp, idle, scheds=states }
| (x,wp) <- zip [0..] workpools ]
workpools <- buildRL numCapabilities
idle <- newIORef INil
let states = buildStates 0 workpools idle states

#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
--
Expand Down Expand Up @@ -277,15 +368,15 @@ runParAsync = unsafePerformIO . runPar_internal False

-- | Creates a new @IVar@
new :: Par (IVar a)
new = Par $ New Empty
new = Par $ \c -> New Empty $ \mv -> c (IVar (IORef (STRef mv)))

-- | Creates a new @IVar@ that contains a value
newFull :: NFData a => a -> Par (IVar a)
newFull x = deepseq x (Par $ New (Full x))
newFull x = deepseq x (newFull_ x)

-- | Creates a new @IVar@ that contains a value (head-strict only)
newFull_ :: a -> Par (IVar a)
newFull_ !x = Par $ New (Full x)
newFull_ !x = Par $ \c -> New (Full x) $ \mv -> c (IVar (IORef (STRef mv)))

-- | Read the value in an @IVar@. The 'get' operation can only return when the
-- value has been written by a prior or parallel @put@ to the same
Expand Down