@@ -42,6 +42,7 @@ module ScheduledMerges (
4242import Prelude hiding (lookup )
4343
4444import Data.Bits
45+ import Data.Foldable (for_ , toList )
4546import Data.Map.Strict (Map )
4647import qualified Data.Map.Strict as Map
4748import Data.STRef
@@ -518,11 +519,12 @@ data EventDetail =
518519increment :: forall s . Tracer (ST s ) Event
519520 -> Counter -> Run -> Levels s -> ST s (Levels s )
520521increment tr sc = \ r ls -> do
521- ls' <- go 1 [r] ls
522+ (ls', refused) <- go 1 [r] ls
523+ assertST $ null refused
522524 invariant ls'
523525 return ls'
524526 where
525- go , go' :: Int -> [Run ] -> Levels s -> ST s (Levels s )
527+ go , go' :: Int -> [Run ] -> Levels s -> ST s (Levels s , Maybe Run )
526528 go ! ln incoming ls = do
527529 case incoming of
528530 [r] -> do
@@ -532,13 +534,15 @@ increment tr sc = \r ls -> do
532534 -- because of underfull runs
533535 assertST $ all (\ r -> tieringRunSizeToLevel r `elem` [ln- 2 , ln- 1 ]) incoming
534536 assertST $ tieringLevel (sum (map Map. size incoming)) `elem` [ln- 1 , ln]
535- go' ln incoming ls
537+ (ls', refused) <- go' ln incoming ls
538+ for_ refused $ assertST . (== head incoming)
539+ return (ls', refused)
536540
537541 go' ! ln incoming [] = do
538542 let mergepolicy = mergePolicyForLevel ln []
539543 traceWith tr' AddLevelEvent
540544 mr <- newMerge tr' ln mergepolicy MergeLastLevel incoming
541- return (Level mr [] : [] )
545+ return (Level mr [] : [] , Nothing )
542546 where
543547 tr' = contramap (EventAt sc ln) tr
544548
@@ -555,24 +559,24 @@ increment tr sc = \r ls -> do
555559 , sum (map Map. size (r : incoming)) <= tieringRunSize ln -> do
556560 let mergelast = mergeLastForLevel ls
557561 mr' <- newMerge tr' ln MergePolicyTiering mergelast (incoming ++ [r])
558- return (Level mr' rs : ls)
562+ return (Level mr' rs : ls, Nothing )
559563
560564 -- This tiering level is now full. We take the completed merged run
561565 -- (the previous incoming runs), plus all the other runs on this level
562566 -- as a bundle and move them down to the level below. We start a merge
563567 -- for the new incoming runs. This level is otherwise empty.
564568 MergePolicyTiering | tieringLevelIsFull ln incoming resident -> do
565569 mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming
566- ls' <- go (ln+ 1 ) resident ls
567- return (Level mr' [] : ls')
570+ ( ls', refused) <- go (ln+ 1 ) resident ls
571+ return (Level mr' (toList refused) : ls', Nothing )
568572
569573 -- This tiering level is not yet full. We move the completed merged run
570574 -- into the level proper, and start the new merge for the incoming runs.
571575 MergePolicyTiering -> do
572576 let mergelast = mergeLastForLevel ls
573577 mr' <- newMerge tr' ln MergePolicyTiering mergelast incoming
574578 traceWith tr' (AddRunEvent (length resident))
575- return (Level mr' resident : ls)
579+ return (Level mr' resident : ls, Nothing )
576580
577581 -- The final level is using levelling. If the existing completed merge
578582 -- run is too large for this level, we promote the run to the next
@@ -581,15 +585,15 @@ increment tr sc = \r ls -> do
581585 MergePolicyLevelling | levellingLevelIsFull ln incoming r -> do
582586 assert (null rs && null ls) $ return ()
583587 mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming
584- ls' <- go (ln+ 1 ) [r] []
585- return (Level mr' [] : ls')
588+ ( ls', refused) <- go (ln+ 1 ) [r] []
589+ return (Level mr' (toList refused) : ls', Nothing )
586590
587591 -- Otherwise we start merging the incoming runs into the run.
588592 MergePolicyLevelling -> do
589593 assert (null rs && null ls) $ return ()
590594 mr' <- newMerge tr' ln MergePolicyLevelling MergeLastLevel
591595 (incoming ++ [r])
592- return (Level mr' [] : [] )
596+ return (Level mr' [] : [] , Nothing )
593597
594598 where
595599 tr' = contramap (EventAt sc ln) tr
0 commit comments