Skip to content

In the prototype, merge the union level into regular levels #709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 130 additions & 41 deletions prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ module ScheduledMerges (
MergeDebt(..),
NominalCredit(..),
NominalDebt(..),
maxBufferSize,
Run,
runSize,
UnionCredits (..),
Expand All @@ -85,6 +86,7 @@ import Prelude hiding (lookup)

import Data.Bits
import Data.Foldable (for_, toList, traverse_)
import Data.Functor ((<&>))
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes)
Expand Down Expand Up @@ -130,7 +132,18 @@ data Level s = Level !(IncomingRun s) ![Run]
data IncomingRun s = Merging !MergePolicy
!NominalDebt !(STRef s NominalCredit)
!(MergingRun LevelMergeType s)
| Single !Run
| Single !SingleRunOrigin !Run

-- | Additional information about the origin of a 'Single' run. This allows us
-- to have stronger invariants, depending on the origin.
data SingleRunOrigin = -- | Either a flushed write buffer or last level run.
--
-- TODO distinguish there two cases? One only happens in
-- first, the other in last level.
Regular
-- | A former union level that was completed (merged down
-- to a single run) and became the last regular level.
| MigratedUnion

-- | The merge policy for a LSM level can be either tiering or levelling.
-- In this design we use levelling for the last level, and tiering for
Expand Down Expand Up @@ -325,7 +338,7 @@ invariant (LSMContent _ levels ul) = do

levelsInvariant !ln (Level ir rs : ls) = do
mrs <- case ir of
Single r ->
Single _ r ->
return (CompletedMerge r)
Merging mp _ _ (MergingRun mt _ ref) -> do
assertST $ ln > 1 -- no merges on level 1
Expand All @@ -344,19 +357,25 @@ invariant (LSMContent _ levels ul) = do
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
expectedRunLengths ln rs ls =
case mergePolicyForLevel ln ls ul of
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
MergePolicyLevelling -> assertST $ null rs
-- Runs in tiering levels usually fit that size, but they can be one
-- larger, if a run has been held back (creating a 5-way merge).
MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs
-- (This is actually still not really true, but will hold in practice.
-- In the pathological case, all runs passed to the next level can be
-- factor (5/4) too large, and there the same holding back can lead to
-- factor (6/4) etc., until at level 12 a run is two levels too large.
MergePolicyLevelling ->
-- Levels using levelling have only one (incoming) run, which almost
-- always consists of an ongoing merge. The exception is when a
-- levelling run becomes too large and is promoted, in that case
-- initially there's no merge, but it is still represented as an
-- 'IncomingRun', using 'Single'. Thus there are no other resident
-- runs.
assertST $ null rs
MergePolicyTiering -> do
-- Runs in tiering levels usually fit that size, but they can be one
-- larger, if a run has been held back (creating a 5-way merge).
--
-- TODO: This is actually still not really true, but will hold in
-- practice. In the pathological case, all runs passed to the next
-- level can be factor (5/4) too large, and there the same holding
-- back can lead to factor (6/4) etc., until at level 12 a run is two
-- levels too large.
assertST $ all (\r -> runSize r > 0) rs
Copy link
Collaborator

@jorisdral jorisdral May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new assertion. Is it related to unions or just there to because it's useful?

assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs

-- Incoming runs being merged also need to be of the right size, but the
-- conditions are more complicated.
Expand All @@ -367,11 +386,14 @@ invariant (LSMContent _ levels ul) = do
MergePolicyLevelling -> do
case (ir, mrs) of
-- A single incoming run (which thus didn't need merging) must be
-- of the expected size range already
(Single r, m) -> do
-- of the expected size range already, but it could also be smaller
-- if it comes from a union level.
Comment on lines +389 to +390
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And could it not also be larger if it came form a union level?

(Single origin r, m) -> do
assertST $ case m of CompletedMerge{} -> True
OngoingMerge{} -> False
assertST $ levellingRunSizeToLevel r == ln
case origin of
Regular -> assertST $ levellingRunSizeToLevel r == ln
MigratedUnion -> assertST $ levellingRunSizeToLevel r <= ln

-- A completed merge for levelling can be of almost any size at all!
-- It can be smaller, due to deletions in the last level. But it
Expand All @@ -397,7 +419,7 @@ invariant (LSMContent _ levels ul) = do
case (ir, mrs, mergeTypeForLevel ls ul) of
-- A single incoming run (which thus didn't need merging) must be
-- of the expected size already
(Single r, m, _) -> do
(Single _ r, m, _) -> do
assertST $ case m of CompletedMerge{} -> True
OngoingMerge{} -> False
assertST $ tieringRunSizeToLevel r == ln
Expand Down Expand Up @@ -496,6 +518,11 @@ isCompletedMergingTree (MergingTree ref) = do
OngoingTreeMerge mr -> isCompletedMergingRun mr
PendingTreeMerge _ -> failI $ "not completed: PendingTreeMerge"

getCompletedMergingTree :: MergingTree s -> ST s (Maybe Run)
getCompletedMergingTree t =
either (const Nothing) Just
<$> evalInvariant (isCompletedMergingTree t)

type Invariant s = E.ExceptT String (ST s)

assertI :: String -> Bool -> Invariant s ()
Expand Down Expand Up @@ -774,19 +801,46 @@ updates tr lsm = mapM_ (uncurry (update tr lsm))
update :: Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update tr (LSMHandle scr lsmr) k op = do
sc <- readSTRef scr
content@(LSMContent wb ls unionLevel) <- readSTRef lsmr
content@(LSMContent wb regularLevels unionLevel) <- readSTRef lsmr
modifySTRef' scr (+1)
supplyCreditsLevels (NominalCredit 1) ls
supplyCreditsLevels (NominalCredit 1) regularLevels
invariant content
let wb' = Map.insertWith combine k op wb
if bufferSize wb' >= maxBufferSize
then do
ls' <- increment tr sc (bufferToRun wb') ls unionLevel
let content' = LSMContent Map.empty ls' unionLevel
-- Before adding the run to the regular levels, we check if we can get
-- rid of the union level (by migrating it into into the regular ones).
--
-- This state can be reached in two situations:
--
-- * If the tree was already completed, flushing the write buffer
-- can lead to creating a new regular level, making the completed
-- tree fit in.
--
-- This is easy to detect and can immediately be addressed by
-- migrating the run to the regular levels.
--
-- * If the size of the union level alread fits, supplying credits
-- to the merging tree can complete it (and thus the union level).
--
-- This can happen when calling 'suppyUnionCredits' on the union
-- table, but also through operations on other tables due to
-- sharing. This can be difficult to detect. Also, if we perform
-- an operation on one table, we probably don't want to modify
-- other tables that are not directly involved in the operation.
--
-- Luckily, the only place where we care about the run being migrated
-- promptly, is when creating new merges. This allows runs from regular
-- and union levels to form new last level merges together, as soon as
-- possible. This means it is sufficient to check for migration
-- opportunities whenever we flush a write buffer.
(ls, ul) <- migrateUnionLevel tr sc regularLevels unionLevel
ls' <- increment tr sc (bufferToRun wb') ls ul
let content' = LSMContent Map.empty ls' ul
invariant content'
writeSTRef lsmr content'
else
writeSTRef lsmr (LSMContent wb' ls unionLevel)
writeSTRef lsmr (LSMContent wb' regularLevels unionLevel)

supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
supplyMergeCredits (LSMHandle scr lsmr) credits = do
Expand Down Expand Up @@ -1158,9 +1212,44 @@ depositNominalCredit (NominalDebt nominalDebt)
-- Updates
--

-- | At some point, we want to merge the union level with the regular levels.
-- We achieve this by moving it into a new last regular level once it is both
-- completed (merged down to a single run) and fits into such a new level.
--
-- Our representation doesn't allow for empty levels, so we can only put the
-- run directly after the pre-existing regular levels. If it is too large for
-- that, we don't want to move it yet to avoid violating run size invariants
-- and doing inefficient merges of runs with very different sizes.
Comment on lines +1219 to +1222
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make a TODO to allow empty levels? Or maybe having the Single vs. MigratedUnion distinction would help with thiss?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised I originally wanted to do something even simpler: When the union is completed, always move it to a new level. If it is much larger than that level should be, the existing code will already handle, not creating a merge with it, but just pushing the oversized run down the levels over time, until it fits in and becomes part of a new last level merge. I think combined with the MigratedUnion constructor (to avoid watering down the invariant too much), that could be a decent solution. Kind of like allowing empty levels just before a MigratedUnion, but not explicitly representing them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, maybe we should have a dedicated test that triggers this particular behaviour so that we can check that it works correctly

migrateUnionLevel :: forall s. Tracer (ST s) Event
-> Counter -> Levels s -> UnionLevel s
-> ST s (Levels s, UnionLevel s)
migrateUnionLevel _ _ ls NoUnion = do
-- nothing to do
return (ls, NoUnion)
migrateUnionLevel _tr _sc ls ul@(Union t _) =
-- TODO: tracing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something you still want to do in this PR?

getCompletedMergingTree t <&> \case
Just r
| null r ->
-- If the union level is empty, we can just drop it.
(ls, NoUnion)
| levellingRunSizeToLevel r <= length ls + 1 ->
-- If it fits into a hypothetical new last level, put it there.
--
-- TODO: In some cases it seems desirable to even add it to the
-- existing last regular level (so it becomes part of a merge
-- sooner), but that would lead to additional merging work that was
-- not accounted for. We'd need to be careful to ensure the merge
-- completes in time, without doing a lot of work in a short time.
(ls ++ [Level (Single MigratedUnion r) []], NoUnion)
_ ->
-- Otherwise, just leave it for now.
(ls, ul)
Comment on lines +1236 to +1247
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed elsewhere, it might still be useful to migrate the union level even if it doesn't fit


increment :: forall s. Tracer (ST s) Event
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
increment tr sc run0 ls0 ul = do
-> Counter -> Run -> Levels s -> UnionLevel s
-> ST s (Levels s)
increment tr sc run0 ls0 ul =
go 1 [run0] ls0
where
mergeTypeFor :: Levels s -> LevelMergeType
Expand All @@ -1177,7 +1266,7 @@ increment tr sc run0 ls0 ul = do

go !ln incoming (Level ir rs : ls) = do
r <- case ir of
Single r -> return r
Single _ r -> return r
Merging mergePolicy _ _ mr -> do
r <- expectCompletedMergingRun mr
traceWith tr' MergeCompletedEvent {
Expand Down Expand Up @@ -1235,7 +1324,7 @@ increment tr sc run0 ls0 ul = do
newLevelMerge :: Tracer (ST s) EventDetail
-> Int -> MergePolicy -> LevelMergeType
-> [Run] -> ST s (IncomingRun s)
newLevelMerge _ _ _ _ [r] = return (Single r)
newLevelMerge _ _ _ _ [r] = return (Single Regular r)
newLevelMerge tr level mergePolicy mergeType rs = do
assertST (length rs `elem` [4, 5])
mergingRun@(MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
Expand Down Expand Up @@ -1320,24 +1409,20 @@ levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident >

-- | Ensures that the merge contains more than one input, avoiding creating a
-- pending merge where possible.
newPendingLevelMerge :: [IncomingRun s]
newPendingLevelMerge :: [PreExistingRun s]
-> Maybe (MergingTree s)
-> ST s (Maybe (MergingTree s))
newPendingLevelMerge [] t = return t
newPendingLevelMerge [Single r] Nothing =
newPendingLevelMerge [PreExistingRun r] Nothing =
Just . MergingTree <$> newSTRef (CompletedTreeMerge r)
newPendingLevelMerge [Merging{}] Nothing =
newPendingLevelMerge [PreExistingMergingRun{}] Nothing =
-- This case should never occur. If there is a single entry in the list,
-- there can only be one level in the input table. At level 1 there are no
-- merging runs, so it must be a PreExistingRun.
error "newPendingLevelMerge: singleton Merging run"
newPendingLevelMerge irs tree = do
let prs = map incomingToPreExistingRun irs
st = PendingTreeMerge (PendingLevelMerge prs tree)
newPendingLevelMerge prs tree = do
let st = PendingTreeMerge (PendingLevelMerge prs tree)
Just . MergingTree <$> newSTRef st
where
incomingToPreExistingRun (Single r) = PreExistingRun r
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr

-- | Ensures that the merge contains more than one input.
newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s))
Expand All @@ -1354,14 +1439,18 @@ contentToMergingTree (LSMContent wb ls ul) =
-- flush the write buffer (but this should not modify the content)
buffers
| bufferSize wb == 0 = []
| otherwise = [Single (bufferToRun wb)]
| otherwise = [PreExistingRun (bufferToRun wb)]

levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs
levels = flip concatMap ls $ \(Level ir rs) ->
incomingToPreExistingRun ir : map PreExistingRun rs

trees = case ul of
NoUnion -> Nothing
Union t _ -> Just t

incomingToPreExistingRun (Single _ r) = PreExistingRun r
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr

-- | When calculating (an upped bound of) the total debt of a recursive tree of
-- merges, we also need to return an upper bound on the size of the resulting
-- run. See 'remainingDebtPendingMerge'.
Expand Down Expand Up @@ -1536,7 +1625,7 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir

flattenIncomingRun :: IncomingRun s -> ST s [Run]
flattenIncomingRun = \case
Single r -> return [r]
Single _ r -> return [r]
Merging _ _ _ mr -> flattenMergingRun mr

flattenMergingRun :: MergingRun t s -> ST s [Run]
Expand Down Expand Up @@ -1599,7 +1688,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do
return (wb, levels, tree)

dumpLevel :: Level s -> ST s LevelRepresentation
dumpLevel (Level (Single r) rs) =
dumpLevel (Level (Single _ r) rs) =
return (Nothing, (r:rs))
dumpLevel (Level (Merging mp nd ncv (MergingRun mt _ ref)) rs) = do
mrs <- readSTRef ref
Expand Down
55 changes: 49 additions & 6 deletions prototypes/ScheduledMergesTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ tests :: TestTree
tests = testGroup "Unit and property tests"
[ testCase "test_regression_empty_run" test_regression_empty_run
, testCase "test_merge_again_with_incoming" test_merge_again_with_incoming
, testProperty "prop_union" prop_union
, testProperty "prop_union_supply_all" prop_union_supply_all
, testProperty "prop_union_merge_into_levels" prop_union_merge_into_levels
, testGroup "T"
[ localOption (QuickCheckTests 1000) $ -- super quick, run more
testProperty "Arbitrary satisfies invariant" prop_arbitrarySatisfiesInvariant
Expand Down Expand Up @@ -176,29 +177,71 @@ test_merge_again_with_incoming =
-- properties
--

-- TODO: also generate nested unions?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nesting at least once would potentially show some edge case behaviour


-- | Supplying enough credits for the remaining debt completes the union merge.
prop_union :: [[(LSM.Key, LSM.Op)]] -> Property
prop_union kopss = length (filter (not . null) kopss) > 1 QC.==>
prop_union_supply_all :: [[(LSM.Key, LSM.Op)]] -> Property
prop_union_supply_all kopss = length (filter (not . null) kopss) > 1 QC.==>
QC.ioProperty $ runWithTracer $ \tr ->
stToIO $ do
ts <- traverse (mkTable tr) kopss
t <- LSM.unions ts

debt@(UnionDebt x) <- LSM.remainingUnionDebt t
_ <- LSM.supplyUnionCredits t (UnionCredits x)
leftovers <- LSM.supplyUnionCredits t (UnionCredits x)
debt' <- LSM.remainingUnionDebt t

rep <- dumpRepresentation t
return $ QC.counterexample (show (debt, debt')) $ QC.conjoin
[ debt =/= UnionDebt 0
, debt' === UnionDebt 0
[ QC.counterexample "debt before" $ debt =/= UnionDebt 0
, QC.counterexample "debt after" $ debt' === UnionDebt 0
, QC.counterexample "leftovers" $ leftovers >= 0
, hasUnionWith isCompleted rep
]
where
isCompleted = \case
MLeaf{} -> True
MNode{} -> False

-- | The union level will get merged into the last regular level once the union
-- merge is completed and sufficient new entries have been inserted.
prop_union_merge_into_levels :: [[(LSM.Key, LSM.Op)]] -> Property
prop_union_merge_into_levels kopss = length (filter (not . null) kopss) > 1 QC.==>
QC.forAll arbitrary $ \firstPay ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be intentional, but QC.forAll does not shrink, you'd have to use QC.forAllShrink

QC.ioProperty $ runWithTracer $ \tr ->
stToIO $ do
ts <- traverse (mkTable tr) kopss
t <- LSM.unions ts

-- pay off the union
let payOffDebt = do
UnionDebt d <- LSM.remainingUnionDebt t
_ <- LSM.supplyUnionCredits t (UnionCredits d)
return ()

-- insert as many new entries as there are in the completed
-- union level, so it fits into the last level
let fillTable = do
unionRunSize <- length <$> LSM.logicalValue t
LSM.inserts tr t
[(K k, V 0, Nothing) | k <- [1 .. unionRunSize]]

-- we can do these in any order
if firstPay
then payOffDebt >> fillTable
else fillTable >> payOffDebt

-- then flush the write buffer
LSM.inserts tr t
[(K k, V 0, Nothing) | k <- [1 .. maxBufferSize]]

(_, _, mtree) <- representationShape <$> dumpRepresentation t

-- the union level is gone
return $ QC.conjoin
[ mtree === Nothing
]

mkTable :: Tracer (ST s) Event -> [(LSM.Key, LSM.Op)] -> ST s (LSM s)
mkTable tr ks = do
t <- LSM.new
Expand Down