@@ -197,21 +197,6 @@ isFrozen (LVar {status}) = do
197197 Active _ -> return False
198198 Frozen -> return True
199199
200- -- Optionall wraps an IO action so that it will only execute once even if
201- -- called multiple times (even concurrently).
202- dedupWhen :: Bool -> (a -> IO () ) -> IO (a -> IO () )
203- {-# INLINE dedupWhen #-}
204- dedupWhen dedup c =
205- if dedup
206- then do
207- hasInvoked <- newIORef False
208- return $ \ x -> do
209- ticket <- readForCAS hasInvoked
210- unless (peekTicket ticket) $ do
211- (winner, _) <- casIORef hasInvoked ticket True
212- when winner $ c x
213- else return c
214-
215200
216201-- | Logging within the (internal) Par monad.
217202logStrLn :: Int -> String -> Par ()
@@ -234,12 +219,12 @@ logHelper lgr num msg = when (dbgLvl >= 1) $ do
234219 Just lgr -> L. logOn lgr msg'
235220 Nothing -> hPutStrLn stderr (" WARNING/nologger:" ++ show msg')
236221
237- logWith :: Sched . State a s -> Int -> String -> IO ()
238- logOffRecord :: Sched . State a s -> Int -> String -> IO ()
222+ logWith :: Queue . State a s -> Int -> String -> IO ()
223+ logOffRecord :: Queue . State a s -> Int -> String -> IO ()
239224#ifdef DEBUG_LVAR
240225-- Only when the debug level is 1 or higher is the logger even initialized:
241- logWith q lvl str = logHelper (Sched . logger q) (Sched . no q) (L. StrMsg lvl str)
242- logOffRecord q lvl str = logHelper (Sched . logger q) (Sched . no q) (L. OffTheRecord lvl str)
226+ logWith q lvl str = logHelper (Queue . logger q) (Queue . no q) (L. StrMsg lvl str)
227+ logOffRecord q lvl str = logHelper (Queue . logger q) (Queue . no q) (L. OffTheRecord lvl str)
243228#else
244229logWith _ _ _ = return ()
245230logOffRecord _ _ _ = return ()
@@ -286,7 +271,9 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
286271 -- continuation immediately
287272
288273 Nothing -> do -- /transiently/ not past the threshhold; block
289- enableCont <- dedupWhen (not $ Queue. idemp q) $ Queue. pushWork q . k
274+ execFlag <- newDedupCheck
275+ let enableCont b = unless (Queue. idemp q) $
276+ winnerCheck execFlag q (Queue. pushWork q (k b)) (return () )
290277
291278 let onUpdate d = unblockWhen $ deltaThresh d
292279 onFreeze = unblockWhen $ globalThresh state True
@@ -299,8 +286,7 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
299286 B. remove tok
300287 enableCont b
301288
302- logWith q 8 $ " [dbg-lvish] getLV " ++ show (unsafeName execFlag)++
303- " : blocking on LVar, registering listeners..."
289+ logWith q 8 $ " [dbg-lvish] getLV: blocking on LVar, registering listeners..."
304290 -- add listener, i.e., move the continuation to the waiting bag
305291 tok <- B. put listeners $ Listener onUpdate onFreeze
306292
@@ -337,7 +323,7 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
337323
338324{-# INLINE newDedupCheck #-}
339325{-# INLINE winnerCheck #-}
340- winnerCheck :: DedupCell -> Sched . State a s -> IO () -> IO () -> IO ()
326+ winnerCheck :: DedupCell -> Queue . State a s -> IO () -> IO () -> IO ()
341327newDedupCheck :: IO DedupCell
342328
343329#if GET_ONCE
@@ -353,7 +339,7 @@ winnerCheck execFlag q tru fal = do
353339 else do
354340 (winner, _) <- casIORef execFlag ticket True
355341 logWith q 8 $ " [dbg-lvish] getLV " ++ show (unsafeName execFlag)
356- ++ " on worker " ++ (show $ Sched . no q) ++ " : winner check? " ++ show winner
342+ ++ " on worker " ++ (show $ Queue . no q) ++ " : winner check? " ++ show winner
357343 ++ " , ticks " ++ show (ticket, peekTicket ticket)
358344 if winner then tru else fal
359345# else
@@ -363,7 +349,7 @@ newDedupCheck = C2.newCounter 0
363349winnerCheck execFlag q tru fal = do
364350 cnt <- C2. incrCounter 1 execFlag
365351 logWith q 8 $ " [dbg-lvish] getLV " ++ show (unsafeName execFlag)
366- ++ " on worker " ++ (show $ Sched . no q) ++ " : winner check? " ++ show (cnt== 1 )
352+ ++ " on worker " ++ (show $ Queue . no q) ++ " : winner check? " ++ show (cnt== 1 )
367353 ++ " , counter val " ++ show cnt
368354 if cnt== 1 then tru else fal
369355
@@ -378,8 +364,6 @@ winnerCheck _ _ tr _ = tr
378364
379365
380366
381-
382-
383367-- | Update an LVar.
384368putLV_ :: LVar a d -- ^ the LVar
385369 -> (a -> Par (Maybe d , b )) -- ^ how to do the put, and whether the LVar's
@@ -508,10 +492,10 @@ closeInPool (Just hp) dedup c = do
508492 Queue. pushWork q t
509493 B. foreach (blockedOnQuiesce hp) invoke
510494
511- onTerminate <- dedupWhen dedup onTerminate_
495+ dedupFlag <- newDedupCheck
512496
513497 let onFinishHandler _ = ClosedPar $ \ q -> do
514- onTerminate q
498+ when dedup $ winnerCheck dedupFlag q (onTerminate_ q) ( return () )
515499 sched q
516500
517501 C. inc $ numHandlers hp -- record handler invocation in pool
@@ -572,9 +556,15 @@ addHandler hp LVar {state, status, handlerStatus, name} globalCB updateThresh =
572556 logWith q 4 " [dbg-lvish] addHandler: calling globalCB.."
573557 -- At registration time, traverse (globally) over the previously inserted items
574558 -- to launch any required callbacks.
575- let k2 x = do relLock q; k x
559+ let k2 :: () -> ClosedPar
560+ k2 () = case k () of
561+ ClosedPar go -> ClosedPar $ \ q2 -> do
562+ -- Warning! What happens if the globalCB blocks and then wakes on a different thread?
563+ relLock q -- Release lock on original worker.
564+ go q2 -- Continue after the addHandler.
576565 -- Ported over bugfix here from master branch.
577- -- There's a quirk here where we need to stick in the lock release:
566+ -- There's a quirk here where we need to stick in the lock release
567+ -- to happen afetr the globalCB is done (in the continuation).
578568 exec (close (globalCB state) k2) q
579569
580570-- | Block until a handler pool is quiescent.
@@ -647,7 +637,7 @@ liftIO io = mkPar $ \k q -> do
647637-- current Par session, otherwise it will simply throw an exception.
648638getLogger :: Par L. Logger
649639getLogger = mkPar $ \ k q ->
650- let Just lgr = Sched . logger q in
640+ let Just lgr = Queue . logger q in
651641 exec (k lgr) q
652642
653643-- | Return the worker that we happen to be running on. (NONDETERMINISTIC.)
@@ -697,7 +687,7 @@ runParDetailed :: DbgCfg -- ^ Debugging config
697687 -> Par a -- ^ The computation to run.
698688 -> IO ([String ], Either E. SomeException a )
699689runParDetailed cfg@ DbgCfg {dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do
700- (lgr,queues) <- Sched . new cfg numWrkrs noName
690+ (lgr,queues) <- Queue . new cfg numWrkrs noName
701691
702692 -- We create a thread on each CPU with forkOn. The CPU on which
703693 -- the current thread is running will host the main thread; the
@@ -717,7 +707,7 @@ runParDetailed cfg@DbgCfg{dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do
717707
718708 -- Use Control.Concurrent.Async to deal with exceptions:
719709 ----------------------------------------------------------------------------------
720- let runWorker :: (Int ,Sched . State ClosedPar LVarID ) -> IO ()
710+ let runWorker :: (Int ,Queue . State ClosedPar LVarID ) -> IO ()
721711 runWorker (cpu, q) = do
722712 if (cpu /= main_cpu)
723713 then do logOffRecord q 3 $ " [dbg-lvish] Auxillary worker #" ++ show cpu++ " starting."
0 commit comments