@@ -20,18 +20,17 @@ import Data.Foldable (fold)
2020import qualified Data.HashMap.Strict as Map
2121import Data.IORef
2222import Data.List (intercalate )
23- import Data.Maybe
23+ import Data.Maybe ( fromMaybe , isNothing )
2424import Data.Set (Set )
2525import qualified Data.Set as S
2626import Data.Typeable
2727import Debug.Trace (traceEventIO , traceM )
2828import Development.IDE.Graph.Classes
2929import Development.IDE.Graph.Internal.Key
3030import Development.IDE.WorkerThread (DeliverStatus (.. ),
31- TaskQueue ,
31+ TaskQueue ( .. ) ,
3232 awaitRunInThread ,
33- counTaskQueue ,
34- runInThreadStmInNewThreads )
33+ counTaskQueue )
3534import qualified Focus
3635import GHC.Conc (TVar , atomically )
3736import GHC.Generics (Generic )
@@ -40,12 +39,12 @@ import qualified StmContainers.Map as SMap
4039import StmContainers.Map (Map )
4140import System.Time.Extra (Seconds , sleep )
4241import UnliftIO (Async (asyncThreadId ),
43- MonadUnliftIO ,
42+ MonadUnliftIO , async ,
4443 asyncExceptionFromException ,
4544 asyncExceptionToException ,
46- readTVar , readTVarIO ,
45+ poll , readTVar , readTVarIO ,
4746 throwTo , waitCatch ,
48- withAsync )
47+ withAsync , writeTQueue )
4948import UnliftIO.Concurrent (ThreadId , myThreadId )
5049import qualified UnliftIO.Exception as UE
5150
@@ -162,7 +161,7 @@ type DBQue = TaskQueue (Either Dynamic (IO ()))
162161data Database = Database {
163162 databaseExtra :: Dynamic ,
164163
165- databaseThreads :: TVar [Async () ],
164+ databaseThreads :: TVar [( DeliverStatus , Async () )],
166165
167166 databaseReverseDep :: SMap. Map Key KeySet ,
168167 -- For each key, the set of keys that depend on it directly.
@@ -193,23 +192,27 @@ data Database = Database {
193192-- all non-dirty running need to have an updated step,
194193-- so it won't be view as dirty when we restart the build
195194-- computeToPreserve :: Database -> KeySet -> STM [(Key, Async ())]
195+ computeToPreserve :: Database -> KeySet -> STM ([(Key , Async () )], [Key ])
196196computeToPreserve db dirtySet = do
197- -- All keys that depend (directly or transitively) on any dirty key
198- affected <- computeTransitiveReverseDeps db dirtySet
199- -- Running stage-2 keys are eligible to be considered for cleanup
200- running2 <- getRunningStage2Keys db
201- allRunings <- getRunningKeys db
202- forM_ allRunings $ \ k -> do
203- -- if not dirty, bump its step
204- unless (memberKeySet k dirtySet) $ do
205- SMap. focus (Focus. alter $ \ case
206- Just kd@ KeyDetails {keyStatus= Running {runningStep, runningPrev, runningWait, runningStage}} -> Just (kd{keyStatus = Running (runningStep + 1 ) runningPrev runningWait runningStage})
207- _ -> Nothing
208- ) k (databaseValues db)
209-
210- -- traceM $ "key: " ++ show k ++ ", isDirty: " ++ show isDirty
211- -- Keep only those whose key is NOT affected by the dirty set
212- pure ([kv | kv@ (k, _async) <- running2, not (memberKeySet k affected)], allRunings)
197+ -- All keys that depend (directly or transitively) on any dirty key
198+ affected <- computeTransitiveReverseDeps db dirtySet
199+ running2 <- getRunningStage2Keys db
200+ allRunings <- getRunningKeys db
201+ forM_ allRunings $ \ k -> do
202+ -- if not dirty, bump its step
203+ unless (memberKeySet k affected) $ do
204+ SMap. focus
205+ ( Focus. alter $ \ case
206+ Just kd@ KeyDetails {keyStatus = Running {runningStep, runningPrev, runningWait, runningStage}} ->
207+ Just (kd {keyStatus = Running (runningStep + 1 ) runningPrev runningWait runningStage})
208+ _ -> Nothing
209+ )
210+ k
211+ (databaseValues db)
212+
213+ -- traceM $ "key: " ++ show k ++ ", isDirty: " ++ show isDirty
214+ -- Keep only those whose key is NOT affected by the dirty set
215+ pure ([kv | kv@ (k, _async) <- running2, not (memberKeySet k affected)], allRunings)
213216
214217getRunningStage2Keys :: Database -> STM [(Key , Async () )]
215218-- getRunningStage2Keys db = return []
@@ -267,15 +270,35 @@ runInDataBase :: String -> Database -> [(IO result, Either SomeException result
267270runInDataBase title db acts = do
268271 s <- getDataBaseStepInt db
269272 let actWithEmptyHook = map (\ (x, y) -> (const $ return () , x, y)) acts
270- runInThreadStmInNewThreads (getDataBaseStepInt db) (return $ DeliverStatus s title) (databaseQueue db) (databaseThreads db) actWithEmptyHook
273+ runInThreadStmInNewThreads db (return $ DeliverStatus s title) actWithEmptyHook
274+
275+ runInThreadStmInNewThreads :: Database -> IO DeliverStatus -> [(Async () -> IO () , IO result , Either SomeException result -> IO () )] -> STM ()
276+ runInThreadStmInNewThreads db mkDeliver acts = do
277+ -- Take an action from TQueue, run it and
278+ -- use barrier to wait for the result
279+ let TaskQueue q = databaseQueue db
280+ let log prefix title = dataBaseLogger db (prefix ++ title)
281+ writeTQueue q $ Right $ do
282+ uninterruptibleMask $ \ restore -> do
283+ do
284+ deliver <- mkDeliver
285+ log " runInThreadStmInNewThreads submit begin " (deliverName deliver)
286+ curStep <- atomically $ getDataBaseStepInt db
287+ -- traceM ("runInThreadStmInNewThreads: current step: " ++ show curStep ++ " deliver step: " ++ show deliver)
288+ when (curStep == deliverStep deliver) $ do
289+ syncs <- mapM (\ (preHook, act, handler) -> do
290+ a <- async (handler =<< (restore $ Right <$> act) `catch` \ e@ (SomeException _) -> return (Left e))
291+ preHook a
292+ return (deliver, a)
293+ ) acts
294+ atomically $ modifyTVar' (databaseThreads db) (syncs++ )
295+ log " runInThreadStmInNewThreads submit end " (deliverName deliver)
271296
272297runOneInDataBase :: IO DeliverStatus -> Database -> (Async () -> IO () ) -> IO result -> (SomeException -> IO () ) -> STM ()
273298runOneInDataBase mkDelivery db registerAsync act handler = do
274299 runInThreadStmInNewThreads
275- (getDataBaseStepInt db)
300+ db
276301 mkDelivery
277- (databaseQueue db)
278- (databaseThreads db)
279302 [ ( registerAsync, warpLog act,
280303 \ case
281304 Left e -> handler e
@@ -284,7 +307,7 @@ runOneInDataBase mkDelivery db registerAsync act handler = do
284307 ]
285308 where
286309 warpLog a =
287- UE. bracket
310+ bracket
288311 (do (DeliverStatus _ title) <- mkDelivery; dataBaseLogger db (" Starting async action: " ++ title); return title)
289312 (\ title -> dataBaseLogger db $ " Finished async action: " ++ title)
290313 (const a)
@@ -308,19 +331,29 @@ shutDatabase preserve Database{..} = uninterruptibleMask $ \unmask -> do
308331 asyncs <- readTVarIO databaseThreads
309332 step <- readTVarIO databaseStep
310333 tid <- myThreadId
311- traceEventIO (" shutDatabase: cancelling " ++ show (length asyncs) ++ " asyncs, step " ++ show step)
312- let remains = filter (`S.member` preserve) asyncs
313- let toCancel = filter (`S.notMember` preserve) asyncs
314- mapM_ (\ a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) toCancel
334+ -- traceEventIO ("shutDatabase: cancelling " ++ show (length asyncs) ++ " asyncs, step " ++ show step)
335+ -- traceEventIO ("shutDatabase: async entries: " ++ show (map (deliverName . fst) asyncs))
336+ let remains = filter (\ (_, s) -> s `S.member` preserve) asyncs
337+ let toCancel = filter (\ (_, s) -> s `S.notMember` preserve) asyncs
338+ -- traceEventIO ("shutDatabase: remains count: " ++ show (length remains) ++ ", names: " ++ show (map (deliverName . fst) remains))
339+ -- traceEventIO ("shutDatabase: toCancel count: " ++ show (length toCancel) ++ ", names: " ++ show (map (deliverName . fst) toCancel))
340+ mapM_ (\ (_, a) -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) toCancel
315341 atomically $ modifyTVar' databaseThreads (const remains)
316342 -- Wait until all the asyncs are done
317343 -- But if it takes more than 10 seconds, log to stderr
318344 unless (null asyncs) $ do
319345 let warnIfTakingTooLong = unmask $ forever $ do
320- sleep 10
321- traceEventIO " cleanupAsync: waiting for asyncs to finish"
346+ sleep 5
347+ as <- readTVarIO databaseThreads
348+ -- poll each async: Nothing => still running
349+ statuses <- forM as $ \ (d,a) -> do
350+ p <- poll a
351+ return (d, a, p)
352+ let still = [ (deliverName d, show (asyncThreadId a)) | (d,a,p) <- statuses, isNothing p ]
353+ traceEventIO $ " cleanupAsync: waiting for asyncs to finish; total=" ++ show (length as) ++ " , stillRunning=" ++ show (length still)
354+ traceEventIO $ " cleanupAsync: still running (deliverName, threadId) = " ++ show still
322355 withAsync warnIfTakingTooLong $ \ _ ->
323- mapM_ waitCatch asyncs
356+ mapM_ waitCatch $ map snd toCancel
324357
325358-- waitForDatabaseRunningKeys :: Database -> IO ()
326359-- waitForDatabaseRunningKeys = getDatabaseValues >=> mapM_ (waitRunning . snd)
0 commit comments