diff --git a/prototypes/ScheduledMerges.hs b/prototypes/ScheduledMerges.hs index c444335dd..88279d057 100644 --- a/prototypes/ScheduledMerges.hs +++ b/prototypes/ScheduledMerges.hs @@ -65,6 +65,7 @@ module ScheduledMerges ( MergeDebt(..), NominalCredit(..), NominalDebt(..), + maxBufferSize, Run, runSize, UnionCredits (..), @@ -85,6 +86,7 @@ import Prelude hiding (lookup) import Data.Bits import Data.Foldable (for_, toList, traverse_) +import Data.Functor ((<&>)) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes) @@ -130,7 +132,18 @@ data Level s = Level !(IncomingRun s) ![Run] data IncomingRun s = Merging !MergePolicy !NominalDebt !(STRef s NominalCredit) !(MergingRun LevelMergeType s) - | Single !Run + | Single !SingleRunOrigin !Run + +-- | Additional information about the origin of a 'Single' run. This allows us +-- to have stronger invariants, depending on the origin. +data SingleRunOrigin = -- | Either a flushed write buffer or last level run. + -- + -- TODO distinguish there two cases? One only happens in + -- first, the other in last level. + Regular + -- | A former union level that was completed (merged down + -- to a single run) and became the last regular level. + | MigratedUnion -- | The merge policy for a LSM level can be either tiering or levelling. -- In this design we use levelling for the last level, and tiering for @@ -325,7 +338,7 @@ invariant (LSMContent _ levels ul) = do levelsInvariant !ln (Level ir rs : ls) = do mrs <- case ir of - Single r -> + Single _ r -> return (CompletedMerge r) Merging mp _ _ (MergingRun mt _ ref) -> do assertST $ ln > 1 -- no merges on level 1 @@ -344,19 +357,25 @@ invariant (LSMContent _ levels ul) = do expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s () expectedRunLengths ln rs ls = case mergePolicyForLevel ln ls ul of - -- Levels using levelling have only one (incoming) run, which almost - -- always consists of an ongoing merge. The exception is when a - -- levelling run becomes too large and is promoted, in that case - -- initially there's no merge, but it is still represented as an - -- 'IncomingRun', using 'Single'. Thus there are no other resident runs. - MergePolicyLevelling -> assertST $ null rs - -- Runs in tiering levels usually fit that size, but they can be one - -- larger, if a run has been held back (creating a 5-way merge). - MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs - -- (This is actually still not really true, but will hold in practice. - -- In the pathological case, all runs passed to the next level can be - -- factor (5/4) too large, and there the same holding back can lead to - -- factor (6/4) etc., until at level 12 a run is two levels too large. + MergePolicyLevelling -> + -- Levels using levelling have only one (incoming) run, which almost + -- always consists of an ongoing merge. The exception is when a + -- levelling run becomes too large and is promoted, in that case + -- initially there's no merge, but it is still represented as an + -- 'IncomingRun', using 'Single'. Thus there are no other resident + -- runs. + assertST $ null rs + MergePolicyTiering -> do + -- Runs in tiering levels usually fit that size, but they can be one + -- larger, if a run has been held back (creating a 5-way merge). + -- + -- TODO: This is actually still not really true, but will hold in + -- practice. In the pathological case, all runs passed to the next + -- level can be factor (5/4) too large, and there the same holding + -- back can lead to factor (6/4) etc., until at level 12 a run is two + -- levels too large. + assertST $ all (\r -> runSize r > 0) rs + assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs -- Incoming runs being merged also need to be of the right size, but the -- conditions are more complicated. @@ -367,11 +386,14 @@ invariant (LSMContent _ levels ul) = do MergePolicyLevelling -> do case (ir, mrs) of -- A single incoming run (which thus didn't need merging) must be - -- of the expected size range already - (Single r, m) -> do + -- of the expected size range already, but it could also be smaller + -- if it comes from a union level. + (Single origin r, m) -> do assertST $ case m of CompletedMerge{} -> True OngoingMerge{} -> False - assertST $ levellingRunSizeToLevel r == ln + case origin of + Regular -> assertST $ levellingRunSizeToLevel r == ln + MigratedUnion -> assertST $ levellingRunSizeToLevel r <= ln -- A completed merge for levelling can be of almost any size at all! -- It can be smaller, due to deletions in the last level. But it @@ -397,7 +419,7 @@ invariant (LSMContent _ levels ul) = do case (ir, mrs, mergeTypeForLevel ls ul) of -- A single incoming run (which thus didn't need merging) must be -- of the expected size already - (Single r, m, _) -> do + (Single _ r, m, _) -> do assertST $ case m of CompletedMerge{} -> True OngoingMerge{} -> False assertST $ tieringRunSizeToLevel r == ln @@ -496,6 +518,11 @@ isCompletedMergingTree (MergingTree ref) = do OngoingTreeMerge mr -> isCompletedMergingRun mr PendingTreeMerge _ -> failI $ "not completed: PendingTreeMerge" +getCompletedMergingTree :: MergingTree s -> ST s (Maybe Run) +getCompletedMergingTree t = + either (const Nothing) Just + <$> evalInvariant (isCompletedMergingTree t) + type Invariant s = E.ExceptT String (ST s) assertI :: String -> Bool -> Invariant s () @@ -774,19 +801,46 @@ updates tr lsm = mapM_ (uncurry (update tr lsm)) update :: Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s () update tr (LSMHandle scr lsmr) k op = do sc <- readSTRef scr - content@(LSMContent wb ls unionLevel) <- readSTRef lsmr + content@(LSMContent wb regularLevels unionLevel) <- readSTRef lsmr modifySTRef' scr (+1) - supplyCreditsLevels (NominalCredit 1) ls + supplyCreditsLevels (NominalCredit 1) regularLevels invariant content let wb' = Map.insertWith combine k op wb if bufferSize wb' >= maxBufferSize then do - ls' <- increment tr sc (bufferToRun wb') ls unionLevel - let content' = LSMContent Map.empty ls' unionLevel + -- Before adding the run to the regular levels, we check if we can get + -- rid of the union level (by migrating it into into the regular ones). + -- + -- This state can be reached in two situations: + -- + -- * If the tree was already completed, flushing the write buffer + -- can lead to creating a new regular level, making the completed + -- tree fit in. + -- + -- This is easy to detect and can immediately be addressed by + -- migrating the run to the regular levels. + -- + -- * If the size of the union level alread fits, supplying credits + -- to the merging tree can complete it (and thus the union level). + -- + -- This can happen when calling 'suppyUnionCredits' on the union + -- table, but also through operations on other tables due to + -- sharing. This can be difficult to detect. Also, if we perform + -- an operation on one table, we probably don't want to modify + -- other tables that are not directly involved in the operation. + -- + -- Luckily, the only place where we care about the run being migrated + -- promptly, is when creating new merges. This allows runs from regular + -- and union levels to form new last level merges together, as soon as + -- possible. This means it is sufficient to check for migration + -- opportunities whenever we flush a write buffer. + (ls, ul) <- migrateUnionLevel tr sc regularLevels unionLevel + ls' <- increment tr sc (bufferToRun wb') ls ul + let content' = LSMContent Map.empty ls' ul invariant content' writeSTRef lsmr content' else - writeSTRef lsmr (LSMContent wb' ls unionLevel) + writeSTRef lsmr (LSMContent wb' regularLevels unionLevel) supplyMergeCredits :: LSM s -> NominalCredit -> ST s () supplyMergeCredits (LSMHandle scr lsmr) credits = do @@ -1158,9 +1212,44 @@ depositNominalCredit (NominalDebt nominalDebt) -- Updates -- +-- | At some point, we want to merge the union level with the regular levels. +-- We achieve this by moving it into a new last regular level once it is both +-- completed (merged down to a single run) and fits into such a new level. +-- +-- Our representation doesn't allow for empty levels, so we can only put the +-- run directly after the pre-existing regular levels. If it is too large for +-- that, we don't want to move it yet to avoid violating run size invariants +-- and doing inefficient merges of runs with very different sizes. +migrateUnionLevel :: forall s. Tracer (ST s) Event + -> Counter -> Levels s -> UnionLevel s + -> ST s (Levels s, UnionLevel s) +migrateUnionLevel _ _ ls NoUnion = do + -- nothing to do + return (ls, NoUnion) +migrateUnionLevel _tr _sc ls ul@(Union t _) = + -- TODO: tracing + getCompletedMergingTree t <&> \case + Just r + | null r -> + -- If the union level is empty, we can just drop it. + (ls, NoUnion) + | levellingRunSizeToLevel r <= length ls + 1 -> + -- If it fits into a hypothetical new last level, put it there. + -- + -- TODO: In some cases it seems desirable to even add it to the + -- existing last regular level (so it becomes part of a merge + -- sooner), but that would lead to additional merging work that was + -- not accounted for. We'd need to be careful to ensure the merge + -- completes in time, without doing a lot of work in a short time. + (ls ++ [Level (Single MigratedUnion r) []], NoUnion) + _ -> + -- Otherwise, just leave it for now. + (ls, ul) + increment :: forall s. Tracer (ST s) Event - -> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s) -increment tr sc run0 ls0 ul = do + -> Counter -> Run -> Levels s -> UnionLevel s + -> ST s (Levels s) +increment tr sc run0 ls0 ul = go 1 [run0] ls0 where mergeTypeFor :: Levels s -> LevelMergeType @@ -1177,7 +1266,7 @@ increment tr sc run0 ls0 ul = do go !ln incoming (Level ir rs : ls) = do r <- case ir of - Single r -> return r + Single _ r -> return r Merging mergePolicy _ _ mr -> do r <- expectCompletedMergingRun mr traceWith tr' MergeCompletedEvent { @@ -1235,7 +1324,7 @@ increment tr sc run0 ls0 ul = do newLevelMerge :: Tracer (ST s) EventDetail -> Int -> MergePolicy -> LevelMergeType -> [Run] -> ST s (IncomingRun s) -newLevelMerge _ _ _ _ [r] = return (Single r) +newLevelMerge _ _ _ _ [r] = return (Single Regular r) newLevelMerge tr level mergePolicy mergeType rs = do assertST (length rs `elem` [4, 5]) mergingRun@(MergingRun _ physicalDebt _) <- newMergingRun mergeType rs @@ -1320,24 +1409,20 @@ levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > -- | Ensures that the merge contains more than one input, avoiding creating a -- pending merge where possible. -newPendingLevelMerge :: [IncomingRun s] +newPendingLevelMerge :: [PreExistingRun s] -> Maybe (MergingTree s) -> ST s (Maybe (MergingTree s)) newPendingLevelMerge [] t = return t -newPendingLevelMerge [Single r] Nothing = +newPendingLevelMerge [PreExistingRun r] Nothing = Just . MergingTree <$> newSTRef (CompletedTreeMerge r) -newPendingLevelMerge [Merging{}] Nothing = +newPendingLevelMerge [PreExistingMergingRun{}] Nothing = -- This case should never occur. If there is a single entry in the list, -- there can only be one level in the input table. At level 1 there are no -- merging runs, so it must be a PreExistingRun. error "newPendingLevelMerge: singleton Merging run" -newPendingLevelMerge irs tree = do - let prs = map incomingToPreExistingRun irs - st = PendingTreeMerge (PendingLevelMerge prs tree) +newPendingLevelMerge prs tree = do + let st = PendingTreeMerge (PendingLevelMerge prs tree) Just . MergingTree <$> newSTRef st - where - incomingToPreExistingRun (Single r) = PreExistingRun r - incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr -- | Ensures that the merge contains more than one input. newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s)) @@ -1354,14 +1439,18 @@ contentToMergingTree (LSMContent wb ls ul) = -- flush the write buffer (but this should not modify the content) buffers | bufferSize wb == 0 = [] - | otherwise = [Single (bufferToRun wb)] + | otherwise = [PreExistingRun (bufferToRun wb)] - levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs + levels = flip concatMap ls $ \(Level ir rs) -> + incomingToPreExistingRun ir : map PreExistingRun rs trees = case ul of NoUnion -> Nothing Union t _ -> Just t + incomingToPreExistingRun (Single _ r) = PreExistingRun r + incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr + -- | When calculating (an upped bound of) the total debt of a recursive tree of -- merges, we also need to return an upper bound on the size of the resulting -- run. See 'remainingDebtPendingMerge'. @@ -1536,7 +1625,7 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir flattenIncomingRun :: IncomingRun s -> ST s [Run] flattenIncomingRun = \case - Single r -> return [r] + Single _ r -> return [r] Merging _ _ _ mr -> flattenMergingRun mr flattenMergingRun :: MergingRun t s -> ST s [Run] @@ -1599,7 +1688,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do return (wb, levels, tree) dumpLevel :: Level s -> ST s LevelRepresentation -dumpLevel (Level (Single r) rs) = +dumpLevel (Level (Single _ r) rs) = return (Nothing, (r:rs)) dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do mrs <- readSTRef ref diff --git a/prototypes/ScheduledMergesTest.hs b/prototypes/ScheduledMergesTest.hs index 0aa33c7b2..da1f2d40a 100644 --- a/prototypes/ScheduledMergesTest.hs +++ b/prototypes/ScheduledMergesTest.hs @@ -24,7 +24,8 @@ tests :: TestTree tests = testGroup "Unit and property tests" [ testCase "test_regression_empty_run" test_regression_empty_run , testCase "test_merge_again_with_incoming" test_merge_again_with_incoming - , testProperty "prop_union" prop_union + , testProperty "prop_union_supply_all" prop_union_supply_all + , testProperty "prop_union_merge_into_levels" prop_union_merge_into_levels , testGroup "T" [ localOption (QuickCheckTests 1000) $ -- super quick, run more testProperty "Arbitrary satisfies invariant" prop_arbitrarySatisfiesInvariant @@ -176,22 +177,25 @@ test_merge_again_with_incoming = -- properties -- +-- TODO: also generate nested unions? + -- | Supplying enough credits for the remaining debt completes the union merge. -prop_union :: [[(LSM.Key, LSM.Op)]] -> Property -prop_union kopss = length (filter (not . null) kopss) > 1 QC.==> +prop_union_supply_all :: [[(LSM.Key, LSM.Op)]] -> Property +prop_union_supply_all kopss = length (filter (not . null) kopss) > 1 QC.==> QC.ioProperty $ runWithTracer $ \tr -> stToIO $ do ts <- traverse (mkTable tr) kopss t <- LSM.unions ts debt@(UnionDebt x) <- LSM.remainingUnionDebt t - _ <- LSM.supplyUnionCredits t (UnionCredits x) + leftovers <- LSM.supplyUnionCredits t (UnionCredits x) debt' <- LSM.remainingUnionDebt t rep <- dumpRepresentation t return $ QC.counterexample (show (debt, debt')) $ QC.conjoin - [ debt =/= UnionDebt 0 - , debt' === UnionDebt 0 + [ QC.counterexample "debt before" $ debt =/= UnionDebt 0 + , QC.counterexample "debt after" $ debt' === UnionDebt 0 + , QC.counterexample "leftovers" $ leftovers >= 0 , hasUnionWith isCompleted rep ] where @@ -199,6 +203,45 @@ prop_union kopss = length (filter (not . null) kopss) > 1 QC.==> MLeaf{} -> True MNode{} -> False +-- | The union level will get merged into the last regular level once the union +-- merge is completed and sufficient new entries have been inserted. +prop_union_merge_into_levels :: [[(LSM.Key, LSM.Op)]] -> Property +prop_union_merge_into_levels kopss = length (filter (not . null) kopss) > 1 QC.==> + QC.forAll arbitrary $ \firstPay -> + QC.ioProperty $ runWithTracer $ \tr -> + stToIO $ do + ts <- traverse (mkTable tr) kopss + t <- LSM.unions ts + + -- pay off the union + let payOffDebt = do + UnionDebt d <- LSM.remainingUnionDebt t + _ <- LSM.supplyUnionCredits t (UnionCredits d) + return () + + -- insert as many new entries as there are in the completed + -- union level, so it fits into the last level + let fillTable = do + unionRunSize <- length <$> LSM.logicalValue t + LSM.inserts tr t + [(K k, V 0, Nothing) | k <- [1 .. unionRunSize]] + + -- we can do these in any order + if firstPay + then payOffDebt >> fillTable + else fillTable >> payOffDebt + + -- then flush the write buffer + LSM.inserts tr t + [(K k, V 0, Nothing) | k <- [1 .. maxBufferSize]] + + (_, _, mtree) <- representationShape <$> dumpRepresentation t + + -- the union level is gone + return $ QC.conjoin + [ mtree === Nothing + ] + mkTable :: Tracer (ST s) Event -> [(LSM.Key, LSM.Op)] -> ST s (LSM s) mkTable tr ks = do t <- LSM.new