Skip to content

Commit adf3720

Browse files
committed
prototype: merge the union level into regular levels
1 parent 7c543fe commit adf3720

File tree

2 files changed

+179
-47
lines changed

2 files changed

+179
-47
lines changed

prototypes/ScheduledMerges.hs

Lines changed: 130 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ module ScheduledMerges (
6565
MergeDebt(..),
6666
NominalCredit(..),
6767
NominalDebt(..),
68+
maxBufferSize,
6869
Run,
6970
runSize,
7071
UnionCredits (..),
@@ -85,6 +86,7 @@ import Prelude hiding (lookup)
8586

8687
import Data.Bits
8788
import Data.Foldable (for_, toList, traverse_)
89+
import Data.Functor ((<&>))
8890
import Data.Map.Strict (Map)
8991
import qualified Data.Map.Strict as Map
9092
import Data.Maybe (catMaybes)
@@ -130,7 +132,18 @@ data Level s = Level !(IncomingRun s) ![Run]
130132
data IncomingRun s = Merging !MergePolicy
131133
!NominalDebt !(STRef s NominalCredit)
132134
!(MergingRun LevelMergeType s)
133-
| Single !Run
135+
| Single !SingleRunOrigin !Run
136+
137+
-- | Additional information about the origin of a 'Single' run. This allows us
138+
-- to have stronger invariants, depending on the origin.
139+
data SingleRunOrigin = -- | Either a flushed write buffer or last level run.
140+
--
141+
-- TODO distinguish there two cases? One only happens in
142+
-- first, the other in last level.
143+
Regular
144+
-- | A former union level that was completed (merged down
145+
-- to a single run) and became the last regular level.
146+
| MigratedUnion
134147

135148
-- | The merge policy for a LSM level can be either tiering or levelling.
136149
-- In this design we use levelling for the last level, and tiering for
@@ -325,7 +338,7 @@ invariant (LSMContent _ levels ul) = do
325338

326339
levelsInvariant !ln (Level ir rs : ls) = do
327340
mrs <- case ir of
328-
Single r ->
341+
Single _ r ->
329342
return (CompletedMerge r)
330343
Merging mp _ _ (MergingRun mt _ ref) -> do
331344
assertST $ ln > 1 -- no merges on level 1
@@ -344,19 +357,25 @@ invariant (LSMContent _ levels ul) = do
344357
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
345358
expectedRunLengths ln rs ls =
346359
case mergePolicyForLevel ln ls ul of
347-
-- Levels using levelling have only one (incoming) run, which almost
348-
-- always consists of an ongoing merge. The exception is when a
349-
-- levelling run becomes too large and is promoted, in that case
350-
-- initially there's no merge, but it is still represented as an
351-
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
352-
MergePolicyLevelling -> assertST $ null rs
353-
-- Runs in tiering levels usually fit that size, but they can be one
354-
-- larger, if a run has been held back (creating a 5-way merge).
355-
MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs
356-
-- (This is actually still not really true, but will hold in practice.
357-
-- In the pathological case, all runs passed to the next level can be
358-
-- factor (5/4) too large, and there the same holding back can lead to
359-
-- factor (6/4) etc., until at level 12 a run is two levels too large.
360+
MergePolicyLevelling ->
361+
-- Levels using levelling have only one (incoming) run, which almost
362+
-- always consists of an ongoing merge. The exception is when a
363+
-- levelling run becomes too large and is promoted, in that case
364+
-- initially there's no merge, but it is still represented as an
365+
-- 'IncomingRun', using 'Single'. Thus there are no other resident
366+
-- runs.
367+
assertST $ null rs
368+
MergePolicyTiering -> do
369+
-- Runs in tiering levels usually fit that size, but they can be one
370+
-- larger, if a run has been held back (creating a 5-way merge).
371+
--
372+
-- TODO: This is actually still not really true, but will hold in
373+
-- practice. In the pathological case, all runs passed to the next
374+
-- level can be factor (5/4) too large, and there the same holding
375+
-- back can lead to factor (6/4) etc., until at level 12 a run is two
376+
-- levels too large.
377+
assertST $ all (\r -> runSize r > 0) rs
378+
assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs
360379

361380
-- Incoming runs being merged also need to be of the right size, but the
362381
-- conditions are more complicated.
@@ -367,11 +386,14 @@ invariant (LSMContent _ levels ul) = do
367386
MergePolicyLevelling -> do
368387
case (ir, mrs) of
369388
-- A single incoming run (which thus didn't need merging) must be
370-
-- of the expected size range already
371-
(Single r, m) -> do
389+
-- of the expected size range already, but it could also be smaller
390+
-- if it comes from a union level.
391+
(Single origin r, m) -> do
372392
assertST $ case m of CompletedMerge{} -> True
373393
OngoingMerge{} -> False
374-
assertST $ levellingRunSizeToLevel r == ln
394+
case origin of
395+
Regular -> assertST $ levellingRunSizeToLevel r == ln
396+
MigratedUnion -> assertST $ levellingRunSizeToLevel r <= ln
375397

376398
-- A completed merge for levelling can be of almost any size at all!
377399
-- It can be smaller, due to deletions in the last level. But it
@@ -397,7 +419,7 @@ invariant (LSMContent _ levels ul) = do
397419
case (ir, mrs, mergeTypeForLevel ls ul) of
398420
-- A single incoming run (which thus didn't need merging) must be
399421
-- of the expected size already
400-
(Single r, m, _) -> do
422+
(Single _ r, m, _) -> do
401423
assertST $ case m of CompletedMerge{} -> True
402424
OngoingMerge{} -> False
403425
assertST $ tieringRunSizeToLevel r == ln
@@ -496,6 +518,11 @@ isCompletedMergingTree (MergingTree ref) = do
496518
OngoingTreeMerge mr -> isCompletedMergingRun mr
497519
PendingTreeMerge _ -> failI $ "not completed: PendingTreeMerge"
498520

521+
getCompletedMergingTree :: MergingTree s -> ST s (Maybe Run)
522+
getCompletedMergingTree t =
523+
either (const Nothing) Just
524+
<$> evalInvariant (isCompletedMergingTree t)
525+
499526
type Invariant s = E.ExceptT String (ST s)
500527

501528
assertI :: String -> Bool -> Invariant s ()
@@ -774,19 +801,46 @@ updates tr lsm = mapM_ (uncurry (update tr lsm))
774801
update :: Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
775802
update tr (LSMHandle scr lsmr) k op = do
776803
sc <- readSTRef scr
777-
content@(LSMContent wb ls unionLevel) <- readSTRef lsmr
804+
content@(LSMContent wb regularLevels unionLevel) <- readSTRef lsmr
778805
modifySTRef' scr (+1)
779-
supplyCreditsLevels (NominalCredit 1) ls
806+
supplyCreditsLevels (NominalCredit 1) regularLevels
780807
invariant content
781808
let wb' = Map.insertWith combine k op wb
782809
if bufferSize wb' >= maxBufferSize
783810
then do
784-
ls' <- increment tr sc (bufferToRun wb') ls unionLevel
785-
let content' = LSMContent Map.empty ls' unionLevel
811+
-- Before adding the run to the regular levels, we check if we can get
812+
-- rid of the union level (by migrating it into into the regular ones).
813+
--
814+
-- This state can be reached in two situations:
815+
--
816+
-- * If the tree was already completed, flushing the write buffer
817+
-- can lead to creating a new regular level, making the completed
818+
-- tree fit in.
819+
--
820+
-- This is easy to detect and can immediately be addressed by
821+
-- migrating the run to the regular levels.
822+
--
823+
-- * If the size of the union level alread fits, supplying credits
824+
-- to the merging tree can complete it (and thus the union level).
825+
--
826+
-- This can happen when calling 'suppyUnionCredits' on the union
827+
-- table, but also through operations on other tables due to
828+
-- sharing. This can be difficult to detect. Also, if we perform
829+
-- an operation on one table, we probably don't want to modify
830+
-- other tables that are not directly involved in the operation.
831+
--
832+
-- Luckily, the only place where we care about the run being migrated
833+
-- promptly, is when creating new merges. This allows runs from regular
834+
-- and union levels to form new last level merges together, as soon as
835+
-- possible. This means it is sufficient to check for migration
836+
-- opportunities whenever we flush a write buffer.
837+
(ls, ul) <- migrateUnionLevel tr sc regularLevels unionLevel
838+
ls' <- increment tr sc (bufferToRun wb') ls ul
839+
let content' = LSMContent Map.empty ls' ul
786840
invariant content'
787841
writeSTRef lsmr content'
788842
else
789-
writeSTRef lsmr (LSMContent wb' ls unionLevel)
843+
writeSTRef lsmr (LSMContent wb' regularLevels unionLevel)
790844

791845
supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
792846
supplyMergeCredits (LSMHandle scr lsmr) credits = do
@@ -1158,9 +1212,44 @@ depositNominalCredit (NominalDebt nominalDebt)
11581212
-- Updates
11591213
--
11601214

1215+
-- | At some point, we want to merge the union level with the regular levels.
1216+
-- We achieve this by moving it into a new last regular level once it is both
1217+
-- completed (merged down to a single run) and fits into such a new level.
1218+
--
1219+
-- Our representation doesn't allow for empty levels, so we can only put the
1220+
-- run directly after the pre-existing regular levels. If it is too large for
1221+
-- that, we don't want to move it yet to avoid violating run size invariants
1222+
-- and doing inefficient merges of runs with very different sizes.
1223+
migrateUnionLevel :: forall s. Tracer (ST s) Event
1224+
-> Counter -> Levels s -> UnionLevel s
1225+
-> ST s (Levels s, UnionLevel s)
1226+
migrateUnionLevel _ _ ls NoUnion = do
1227+
-- nothing to do
1228+
return (ls, NoUnion)
1229+
migrateUnionLevel _tr _sc ls ul@(Union t _) =
1230+
-- TODO: tracing
1231+
getCompletedMergingTree t <&> \case
1232+
Just r
1233+
| null r ->
1234+
-- If the union level is empty, we can just drop it.
1235+
(ls, NoUnion)
1236+
| levellingRunSizeToLevel r <= length ls + 1 ->
1237+
-- If it fits into a hypothetical new last level, put it there.
1238+
--
1239+
-- TODO: In some cases it seems desirable to even add it to the
1240+
-- existing last regular level (so it becomes part of a merge
1241+
-- sooner), but that would lead to additional merging work that was
1242+
-- not accounted for. We'd need to be careful to ensure the merge
1243+
-- completes in time, without doing a lot of work in a short time.
1244+
(ls ++ [Level (Single MigratedUnion r) []], NoUnion)
1245+
_ ->
1246+
-- Otherwise, just leave it for now.
1247+
(ls, ul)
1248+
11611249
increment :: forall s. Tracer (ST s) Event
1162-
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
1163-
increment tr sc run0 ls0 ul = do
1250+
-> Counter -> Run -> Levels s -> UnionLevel s
1251+
-> ST s (Levels s)
1252+
increment tr sc run0 ls0 ul =
11641253
go 1 [run0] ls0
11651254
where
11661255
mergeTypeFor :: Levels s -> LevelMergeType
@@ -1177,7 +1266,7 @@ increment tr sc run0 ls0 ul = do
11771266

11781267
go !ln incoming (Level ir rs : ls) = do
11791268
r <- case ir of
1180-
Single r -> return r
1269+
Single _ r -> return r
11811270
Merging mergePolicy _ _ mr -> do
11821271
r <- expectCompletedMergingRun mr
11831272
traceWith tr' MergeCompletedEvent {
@@ -1235,7 +1324,7 @@ increment tr sc run0 ls0 ul = do
12351324
newLevelMerge :: Tracer (ST s) EventDetail
12361325
-> Int -> MergePolicy -> LevelMergeType
12371326
-> [Run] -> ST s (IncomingRun s)
1238-
newLevelMerge _ _ _ _ [r] = return (Single r)
1327+
newLevelMerge _ _ _ _ [r] = return (Single Regular r)
12391328
newLevelMerge tr level mergePolicy mergeType rs = do
12401329
assertST (length rs `elem` [4, 5])
12411330
mergingRun@(MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
@@ -1320,24 +1409,20 @@ levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident >
13201409

13211410
-- | Ensures that the merge contains more than one input, avoiding creating a
13221411
-- pending merge where possible.
1323-
newPendingLevelMerge :: [IncomingRun s]
1412+
newPendingLevelMerge :: [PreExistingRun s]
13241413
-> Maybe (MergingTree s)
13251414
-> ST s (Maybe (MergingTree s))
13261415
newPendingLevelMerge [] t = return t
1327-
newPendingLevelMerge [Single r] Nothing =
1416+
newPendingLevelMerge [PreExistingRun r] Nothing =
13281417
Just . MergingTree <$> newSTRef (CompletedTreeMerge r)
1329-
newPendingLevelMerge [Merging{}] Nothing =
1418+
newPendingLevelMerge [PreExistingMergingRun{}] Nothing =
13301419
-- This case should never occur. If there is a single entry in the list,
13311420
-- there can only be one level in the input table. At level 1 there are no
13321421
-- merging runs, so it must be a PreExistingRun.
13331422
error "newPendingLevelMerge: singleton Merging run"
1334-
newPendingLevelMerge irs tree = do
1335-
let prs = map incomingToPreExistingRun irs
1336-
st = PendingTreeMerge (PendingLevelMerge prs tree)
1423+
newPendingLevelMerge prs tree = do
1424+
let st = PendingTreeMerge (PendingLevelMerge prs tree)
13371425
Just . MergingTree <$> newSTRef st
1338-
where
1339-
incomingToPreExistingRun (Single r) = PreExistingRun r
1340-
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr
13411426

13421427
-- | Ensures that the merge contains more than one input.
13431428
newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s))
@@ -1354,14 +1439,18 @@ contentToMergingTree (LSMContent wb ls ul) =
13541439
-- flush the write buffer (but this should not modify the content)
13551440
buffers
13561441
| bufferSize wb == 0 = []
1357-
| otherwise = [Single (bufferToRun wb)]
1442+
| otherwise = [PreExistingRun (bufferToRun wb)]
13581443

1359-
levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs
1444+
levels = flip concatMap ls $ \(Level ir rs) ->
1445+
incomingToPreExistingRun ir : map PreExistingRun rs
13601446

13611447
trees = case ul of
13621448
NoUnion -> Nothing
13631449
Union t _ -> Just t
13641450

1451+
incomingToPreExistingRun (Single _ r) = PreExistingRun r
1452+
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr
1453+
13651454
-- | When calculating (an upped bound of) the total debt of a recursive tree of
13661455
-- merges, we also need to return an upper bound on the size of the resulting
13671456
-- run. See 'remainingDebtPendingMerge'.
@@ -1536,7 +1625,7 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir
15361625

15371626
flattenIncomingRun :: IncomingRun s -> ST s [Run]
15381627
flattenIncomingRun = \case
1539-
Single r -> return [r]
1628+
Single _ r -> return [r]
15401629
Merging _ _ _ mr -> flattenMergingRun mr
15411630

15421631
flattenMergingRun :: MergingRun t s -> ST s [Run]
@@ -1599,7 +1688,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do
15991688
return (wb, levels, tree)
16001689

16011690
dumpLevel :: Level s -> ST s LevelRepresentation
1602-
dumpLevel (Level (Single r) rs) =
1691+
dumpLevel (Level (Single _ r) rs) =
16031692
return (Nothing, (r:rs))
16041693
dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do
16051694
mrs <- readSTRef ref

prototypes/ScheduledMergesTest.hs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ tests :: TestTree
2424
tests = testGroup "Unit and property tests"
2525
[ testCase "test_regression_empty_run" test_regression_empty_run
2626
, testCase "test_merge_again_with_incoming" test_merge_again_with_incoming
27-
, testProperty "prop_union" prop_union
27+
, testProperty "prop_union_supply_all" prop_union_supply_all
28+
, testProperty "prop_union_merge_into_levels" prop_union_merge_into_levels
2829
, testGroup "T"
2930
[ localOption (QuickCheckTests 1000) $ -- super quick, run more
3031
testProperty "Arbitrary satisfies invariant" prop_arbitrarySatisfiesInvariant
@@ -176,29 +177,71 @@ test_merge_again_with_incoming =
176177
-- properties
177178
--
178179

180+
-- TODO: also generate nested unions?
181+
179182
-- | Supplying enough credits for the remaining debt completes the union merge.
180-
prop_union :: [[(LSM.Key, LSM.Op)]] -> Property
181-
prop_union kopss = length (filter (not . null) kopss) > 1 QC.==>
183+
prop_union_supply_all :: [[(LSM.Key, LSM.Op)]] -> Property
184+
prop_union_supply_all kopss = length (filter (not . null) kopss) > 1 QC.==>
182185
QC.ioProperty $ runWithTracer $ \tr ->
183186
stToIO $ do
184187
ts <- traverse (mkTable tr) kopss
185188
t <- LSM.unions ts
186189

187190
debt@(UnionDebt x) <- LSM.remainingUnionDebt t
188-
_ <- LSM.supplyUnionCredits t (UnionCredits x)
191+
leftovers <- LSM.supplyUnionCredits t (UnionCredits x)
189192
debt' <- LSM.remainingUnionDebt t
190193

191194
rep <- dumpRepresentation t
192195
return $ QC.counterexample (show (debt, debt')) $ QC.conjoin
193-
[ debt =/= UnionDebt 0
194-
, debt' === UnionDebt 0
196+
[ QC.counterexample "debt before" $ debt =/= UnionDebt 0
197+
, QC.counterexample "debt after" $ debt' === UnionDebt 0
198+
, QC.counterexample "leftovers" $ leftovers >= 0
195199
, hasUnionWith isCompleted rep
196200
]
197201
where
198202
isCompleted = \case
199203
MLeaf{} -> True
200204
MNode{} -> False
201205

206+
-- | The union level will get merged into the last regular level once the union
207+
-- merge is completed and sufficient new entries have been inserted.
208+
prop_union_merge_into_levels :: [[(LSM.Key, LSM.Op)]] -> Property
209+
prop_union_merge_into_levels kopss = length (filter (not . null) kopss) > 1 QC.==>
210+
QC.forAll arbitrary $ \firstPay ->
211+
QC.ioProperty $ runWithTracer $ \tr ->
212+
stToIO $ do
213+
ts <- traverse (mkTable tr) kopss
214+
t <- LSM.unions ts
215+
216+
-- pay off the union
217+
let payOffDebt = do
218+
UnionDebt d <- LSM.remainingUnionDebt t
219+
_ <- LSM.supplyUnionCredits t (UnionCredits d)
220+
return ()
221+
222+
-- insert as many new entries as there are in the completed
223+
-- union level, so it fits into the last level
224+
let fillTable = do
225+
unionRunSize <- length <$> LSM.logicalValue t
226+
LSM.inserts tr t
227+
[(K k, V 0, Nothing) | k <- [1 .. unionRunSize]]
228+
229+
-- we can do these in any order
230+
if firstPay
231+
then payOffDebt >> fillTable
232+
else fillTable >> payOffDebt
233+
234+
-- then flush the write buffer
235+
LSM.inserts tr t
236+
[(K k, V 0, Nothing) | k <- [1 .. maxBufferSize]]
237+
238+
(_, _, mtree) <- representationShape <$> dumpRepresentation t
239+
240+
-- the union level is gone
241+
return $ QC.conjoin
242+
[ mtree === Nothing
243+
]
244+
202245
mkTable :: Tracer (ST s) Event -> [(LSM.Key, LSM.Op)] -> ST s (LSM s)
203246
mkTable tr ks = do
204247
t <- LSM.new

0 commit comments

Comments
 (0)