diff --git a/monad-par/Control/Monad/Par/Scheds/Trace.hs b/monad-par/Control/Monad/Par/Scheds/Trace.hs index 75ab6cf1..919b4abe 100644 --- a/monad-par/Control/Monad/Par/Scheds/Trace.hs +++ b/monad-par/Control/Monad/Par/Scheds/Trace.hs @@ -19,6 +19,8 @@ module Control.Monad.Par.Scheds.Trace ( import qualified Control.Monad.Par.Class as PC import Control.Monad.Par.Scheds.TraceInternal + ( IVar, Par (..), Trace (..), get, new, put + , put_, newFull, newFull_, runCont, runPar, runParIO) import Control.DeepSeq import Control.Monad as M hiding (mapM, sequence, join) import Prelude hiding (mapM, sequence, head,tail) diff --git a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs index 5eb137c6..51fd2f34 100644 --- a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs +++ b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs @@ -1,5 +1,5 @@ -{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns, - ExistentialQuantification, CPP #-} +{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns, LambdaCase, + ExistentialQuantification, CPP, MagicHash, UnboxedTuples #-} {-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-} -- | This module exposes the internals of the @Par@ monad so that you @@ -18,13 +18,18 @@ module Control.Monad.Par.Scheds.TraceInternal ( ) where -import Control.Monad as M hiding (mapM, sequence, join) +import Control.Monad as M hiding (mapM, sequence) import Prelude hiding (mapM, sequence, head,tail) import Data.IORef import System.IO.Unsafe import Control.Concurrent hiding (yield) import GHC.Conc (numCapabilities) import Control.DeepSeq +import GHC.IO (IO (..)) +import GHC.IORef (IORef (..)) +import GHC.STRef (STRef (..)) +import GHC.Exts (casMutVar#, lazy, MutVar#, RealWorld) +--import Control.Exception (evaluate) -- import Text.Printf #if !MIN_VERSION_base(4,8,0) @@ -39,38 +44,73 @@ forkOn = forkOnIO -- --------------------------------------------------------------------------- -data Trace = forall a . Get (IVar a) (a -> Trace) - | forall a . Put (IVar a) a Trace - | forall a . New (IVarContents a) (IVar a -> Trace) +data Trace = forall a . Get !(IVar a) (a -> Trace) + | forall a . Put !(IVar a) a Trace + -- Why does the continuation take a MutVar# instead of an IVar? + -- In the common case that we don't actually *need* the IVar wrapper, + -- we'd rather not allocate it. + | forall a . New !(IVarContents a) (MutVar# RealWorld (IVarContents a) -> Trace) | Fork Trace Trace | Done | Yield Trace | forall a . LiftIO (IO a) (a -> Trace) +-- |A thin wrapper around 'casMutVar#'. +casIORef_ :: IORef a -> a -> a -> IO Bool +casIORef_ (IORef (STRef ref)) old new = + IO $ \s -> case casMutVar# ref old new s of + (# s', 0#, _latest #) -> (# s', True #) + (# s', _, _latest #) -> (# s', False #) + +-- This is similar to atomicModifyIORef, but it applies the +-- function eagerly. This should only be used when the function +-- is very cheap: it must take very little time to avoid CAS +-- failures, and it should not allocate *any* memory, since it +-- will be run again on CAS failure. +atomicModifyIORefEager :: IORef a -> (a -> (# Maybe a, b #)) -> IO b +atomicModifyIORefEager ref f = go ref + where + go ref = do + old <- readIORef ref + -- The lazy here is a bit paranoid; we want to be sure that GHC + -- doesn't decide to substitute an unfolding for old in the call + -- to casIORef. I don't know if that can happen anyway, but + -- being extra-careful shouldn't hurt. + case f (lazy old) of + (# Nothing, res #) -> pure res + (# Just new, res #) -> do + done <- casIORef_ ref old new + if done + then pure res + else go ref +{-# INLINE atomicModifyIORefEager #-} + + -- | The main scheduler loop. sched :: Bool -> Sched -> Trace -> IO () -sched _doSync queue t = loop t +sched _doSync !queue t = loop t where loop t = case t of New a f -> do - r <- newIORef a - loop (f (IVar r)) + IORef (STRef r) <- newIORef a + loop (f r) Get (IVar v) c -> do - e <- readIORef v - case e of + -- Optimistic check for Full avoids the expense of + -- atomicModifyIORef + e0 <- readIORef v + case e0 of Full a -> loop (c a) - _other -> do + _ -> do r <- atomicModifyIORef v $ \e -> case e of - Empty -> (Blocked [c], reschedule queue) - Full a -> (Full a, loop (c a)) - Blocked cs -> (Blocked (c:cs), reschedule queue) - r + Full a -> (Full a, Just a) + cs -> (Blocked c cs, Nothing) + maybe (reschedule queue) (loop . c) r Put (IVar v) a t -> do - cs <- atomicModifyIORef v $ \e -> case e of - Empty -> (Full a, []) - Full _ -> error "multiple put" - Blocked cs -> (Full a, cs) - mapM_ (pushWork queue. ($a)) cs + let fulla = Full a + cs <- atomicModifyIORefEager v $ \case + Full _ -> error "multiple put" + cs -> (# Just fulla, cs #) + icmapM_ (pushWork queue . ($a)) cs loop t Fork child parent -> do pushWork queue child @@ -89,9 +129,9 @@ sched _doSync queue t = loop t Yield parent -> do -- Go to the end of the worklist: let Sched { workpool } = queue - -- TODO: Perhaps consider Data.Seq here. + -- TODO: Perhaps consider some sort of deque here -- This would also be a chance to steal and work from opposite ends of the queue. - atomicModifyIORef workpool $ \ts -> (ts++[parent], ()) + () <- atomicModifyIORef workpool $ \ts -> (ts++[parent], ()) reschedule queue LiftIO io c -> do r <- io @@ -101,6 +141,9 @@ sched _doSync queue t = loop t -- work-stealing mode. reschedule :: Sched -> IO () reschedule queue@Sched{ workpool } = do + -- We don't use atomicModifyIORefEager for workpools, because + -- yields can cause those to contain thunks, and forcing thunks + -- can easily lead to CAS failures. e <- atomicModifyIORef workpool $ \ts -> case ts of [] -> ([], Nothing) @@ -109,7 +152,6 @@ reschedule queue@Sched{ workpool } = do Nothing -> steal queue Just t -> sched True queue t - -- RRN: Note -- NOT doing random work stealing breaks the traditional -- Cilk time/space bounds if one is running strictly nested (series -- parallel) programs. @@ -121,11 +163,11 @@ steal q@Sched{ idle, scheds, no=my_no } = do go scheds where go [] = do m <- newEmptyMVar - r <- atomicModifyIORef idle $ \is -> (m:is, is) - if length r == numCapabilities - 1 + r <- atomicModifyIORef idle $ \is -> (m `ICons` is, is) + if ilength r == numCapabilities - 1 then do -- printf "cpu %d initiating shutdown\n" my_no - mapM_ (\m -> putMVar m True) r + imapM_ (\m -> putMVar m True) r else do done <- takeMVar m if done @@ -138,35 +180,64 @@ steal q@Sched{ idle, scheds, no=my_no } = do go (x:xs) | no x == my_no = go xs | otherwise = do - r <- atomicModifyIORef (workpool x) $ \ ts -> - case ts of - [] -> ([], Nothing) - (x:xs) -> (xs, Just x) - case r of - Just t -> do - -- printf "cpu %d got work from cpu %d\n" my_no (no x) - sched True q t - Nothing -> go xs + -- Cheaply skip over empty workpools. + r0 <- readIORef (workpool x) + case r0 of + [] -> go xs + _ -> do + r <- atomicModifyIORef (workpool x) $ \ ts -> + case ts of + [] -> ([], Nothing) + (t:ts') -> (ts', Just t) + case r of + Just t -> do + -- printf "cpu %d got work from cpu %d\n" my_no (no x) + sched True q t + Nothing -> go xs -- | If any worker is idle, wake one up and give it work to do. pushWork :: Sched -> Trace -> IO () pushWork Sched { workpool, idle } t = do atomicModifyIORef workpool $ \ts -> (t:ts, ()) idles <- readIORef idle - when (not (null idles)) $ do - r <- atomicModifyIORef idle (\is -> case is of - [] -> ([], return ()) - (i:is) -> (is, putMVar i False)) - r -- wake one up + when (not (inull idles)) $ do + r <- atomicModifyIORefEager idle (\is -> case lazy is of + INil -> (# Nothing, INil #) + (_ `ICons` is') -> (# Just is', is #)) + case r of + INil -> pure () + i `ICons` _ -> putMVar i False -- wake one up data Sched = Sched { no :: {-# UNPACK #-} !Int, - workpool :: IORef [Trace], - idle :: IORef [MVar Bool], + workpool :: !(IORef [Trace]), + idle :: !(IORef IdleList), scheds :: [Sched] -- Global list of all per-thread workers. } -- deriving Show +data IdleList = ICons !(MVar Bool) IdleList | INil +inull :: IdleList -> Bool +inull INil = True +inull (ICons _ _) = False + +imapM_ :: Applicative f => (MVar Bool -> f a) -> IdleList -> f () +imapM_ f = go + where + go INil = pure () + go (ICons x xs) = f x *> go xs + +ilength :: IdleList -> Int +ilength = go 0 where + go !acc INil = acc + go acc (ICons _ xs) = go (acc + 1) xs + +icmapM_ :: Applicative f => ((a -> Trace) -> f b) -> IVarContents a -> f () +icmapM_ f = go + where + go (Blocked c cs) = f c *> go cs + go _ = pure () + newtype Par a = Par { runCont :: (a -> Trace) -> Trace } @@ -189,9 +260,9 @@ newtype IVar a = IVar (IORef (IVarContents a)) instance Eq (IVar a) where (IVar r1) == (IVar r2) = r1 == r2 --- Forcing evaluation of a IVar is fruitless. +-- We just force the IORef; the contents can't be forced. instance NFData (IVar a) where - rnf _ = () + rnf !_ = () -- From outside the Par computation we can peek. But this is nondeterministic. @@ -199,20 +270,40 @@ pollIVar :: IVar a -> IO (Maybe a) pollIVar (IVar ref) = do contents <- readIORef ref case contents of - Full x -> return (Just x) - _ -> return (Nothing) + Full x -> return (Just x) + _ -> return Nothing + +-- Invariant: Full should only appear in an outermost position. +-- Something like Blocked f (Full a) is prohibited. +-- We used to use Blocked [a -> Trace], but that requires +-- an extra wrapper allocation to add an element. +data IVarContents a = Full a | Empty | Blocked (a -> Trace) !(IVarContents a) -data IVarContents a = Full a | Empty | Blocked [a -> Trace] +-- A strict-spined list of unboxed IORefs. +data IORefList a = RLCons !(IORef a) !(IORefList a) | RLNil +buildRL :: Int -> IO (IORefList [a]) +buildRL = go RLNil + where + go acc 0 = pure acc + go acc n = do + ref <- newIORef [] + go (RLCons ref acc) (n - 1) + +buildStates :: Int + -> IORefList [Trace] -> IORef IdleList -> [Sched] -> [Sched] +buildStates !_k RLNil !_idl _states = [] +buildStates k (RLCons wp wps) !idl states = + Sched { no = k, workpool=wp, idle = idl, scheds=states } : + buildStates (k + 1) wps idl states {-# INLINE runPar_internal #-} runPar_internal :: Bool -> Par a -> IO a runPar_internal _doSync x = do - workpools <- replicateM numCapabilities $ newIORef [] - idle <- newIORef [] - let states = [ Sched { no=x, workpool=wp, idle, scheds=states } - | (x,wp) <- zip [0..] workpools ] + workpools <- buildRL numCapabilities + idle <- newIORef INil + let states = buildStates 0 workpools idle states #if __GLASGOW_HASKELL__ >= 701 /* 20110301 */ -- @@ -277,15 +368,15 @@ runParAsync = unsafePerformIO . runPar_internal False -- | Creates a new @IVar@ new :: Par (IVar a) -new = Par $ New Empty +new = Par $ \c -> New Empty $ \mv -> c (IVar (IORef (STRef mv))) -- | Creates a new @IVar@ that contains a value newFull :: NFData a => a -> Par (IVar a) -newFull x = deepseq x (Par $ New (Full x)) +newFull x = deepseq x (newFull_ x) -- | Creates a new @IVar@ that contains a value (head-strict only) newFull_ :: a -> Par (IVar a) -newFull_ !x = Par $ New (Full x) +newFull_ !x = Par $ \c -> New (Full x) $ \mv -> c (IVar (IORef (STRef mv))) -- | Read the value in an @IVar@. The 'get' operation can only return when the -- value has been written by a prior or parallel @put@ to the same