diff --git a/src-prototypes/ScheduledMerges.hs b/src-prototypes/ScheduledMerges.hs index cfcd51e70..a3ae74ef2 100644 --- a/src-prototypes/ScheduledMerges.hs +++ b/src-prototypes/ScheduledMerges.hs @@ -27,8 +27,10 @@ module ScheduledMerges ( -- * Main API LSM, + LSMConfig (..), Key (K), Value (V), resolveValue, Blob (B), new, + newWith, LookupResult (..), lookup, lookups, Op, @@ -103,8 +105,14 @@ import GHC.Stack (HasCallStack, callStack) import qualified Test.QuickCheck as QC data LSM s = LSMHandle !(STRef s Counter) + !LSMConfig !(STRef s (LSMContent s)) +data LSMConfig = LSMConfig { + maxWriteBufferSize :: !Int + } + deriving stock (Show, Eq) + -- | A simple count of LSM operations to allow logging the operation -- number in each event. This enables relating merge events to the -- operation number (which is interesting for numerical representations @@ -276,30 +284,32 @@ newtype Blob = B Int deriving stock (Eq, Show) -- | The size of the 4 tiering runs at each level are allowed to be: --- @4^(level-1) < size <= 4^level@ +-- @maxWriteBufferSize * 4^(level-1) < size <= maxWriteBufferSize * 4^level@ -- -tieringRunSize :: Int -> Int -tieringRunSize n = 4^n +tieringRunSize :: LSMConfig -> Int -> Int +tieringRunSize LSMConfig {maxWriteBufferSize} n + | n <= 0 = error $ "tieringRunSize: level number must be positive, but the level number is " ++ show n + | otherwise = maxWriteBufferSize * 4^(pred n) -- | Levelling runs take up the whole level, so are 4x larger. -- -levellingRunSize :: Int -> Int -levellingRunSize n = 4^(n+1) +levellingRunSize :: LSMConfig -> Int -> Int +levellingRunSize LSMConfig {maxWriteBufferSize} n = maxWriteBufferSize * 4^(succ n) -tieringRunSizeToLevel :: Run -> Int -tieringRunSizeToLevel r - | s <= maxBufferSize = 1 -- level numbers start at 1 +tieringRunSizeToLevel :: LSMConfig -> Run -> Int +tieringRunSizeToLevel conf r + | s <= maxBufferSize conf = 1 -- level numbers start at 1 | otherwise = 1 + (finiteBitSize s - countLeadingZeros (s-1) - 1) `div` 2 where s = runSize r -levellingRunSizeToLevel :: Run -> Int -levellingRunSizeToLevel r = - max 1 (tieringRunSizeToLevel r - 1) -- level numbers start at 1 +levellingRunSizeToLevel :: LSMConfig -> Run -> Int +levellingRunSizeToLevel conf r = + max 1 (tieringRunSizeToLevel conf r - 1) -- level numbers start at 1 -maxBufferSize :: Int -maxBufferSize = tieringRunSize 1 -- 4 +maxBufferSize :: LSMConfig -> Int +maxBufferSize conf = tieringRunSize conf 1 -- maxWriteBufferSize -- | We use levelling on the last level, unless that is also the first level. mergePolicyForLevel :: Int -> [Level s] -> UnionLevel s -> MergePolicy @@ -316,8 +326,8 @@ mergeTypeForLevel _ _ = MergeMidLevel -- | Note that the invariants rely on the fact that levelling is only used on -- the last level. -- -invariant :: forall s. LSMContent s -> ST s () -invariant (LSMContent _ levels ul) = do +invariant :: forall s. LSMConfig -> LSMContent s -> ST s () +invariant conf (LSMContent _ levels ul) = do levelsInvariant 1 levels case ul of NoUnion -> return () @@ -355,7 +365,7 @@ invariant (LSMContent _ levels ul) = do MergePolicyLevelling -> assertST $ null rs -- Runs in tiering levels usually fit that size, but they can be one -- larger, if a run has been held back (creating a 5-way merge). - MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs + MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel conf r `elem` [ln, ln+1]) rs -- (This is actually still not really true, but will hold in practice. -- In the pathological case, all runs passed to the next level can be -- factor (5/4) too large, and there the same holding back can lead to @@ -374,13 +384,13 @@ invariant (LSMContent _ levels ul) = do (Single r, m) -> do assertST $ case m of CompletedMerge{} -> True OngoingMerge{} -> False - assertST $ levellingRunSizeToLevel r == ln + assertST $ levellingRunSizeToLevel conf r == ln -- A completed merge for levelling can be of almost any size at all! -- It can be smaller, due to deletions in the last level. But it -- can't be bigger than would fit into the next level. (_, CompletedMerge r) -> - assertST $ levellingRunSizeToLevel r <= ln+1 + assertST $ levellingRunSizeToLevel conf r <= ln+1 -- An ongoing merge for levelling should have 4 incoming runs of -- the right size for the level below (or slightly larger due to @@ -393,8 +403,8 @@ invariant (LSMContent _ levels ul) = do assertST $ all (\r -> runSize r > 0) rs -- don't merge empty runs let incoming = take 4 rs let resident = drop 4 rs - assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln-1, ln]) incoming - assertST $ all (\r -> levellingRunSizeToLevel r <= ln+1) resident + assertST $ all (\r -> tieringRunSizeToLevel conf r `elem` [ln-1, ln]) incoming + assertST $ all (\r -> levellingRunSizeToLevel conf r <= ln+1) resident MergePolicyTiering -> case (ir, mrs, mergeTypeForLevel ls ul) of @@ -403,7 +413,7 @@ invariant (LSMContent _ levels ul) = do (Single r, m, _) -> do assertST $ case m of CompletedMerge{} -> True OngoingMerge{} -> False - assertST $ tieringRunSizeToLevel r == ln + assertST $ tieringRunSizeToLevel conf r == ln -- A completed last level run can be of almost any smaller size due -- to deletions, but it can't be bigger than the next level down. @@ -411,14 +421,14 @@ invariant (LSMContent _ levels ul) = do -- a single level only. (_, CompletedMerge r, MergeLastLevel) -> do assertST $ ln == 1 - assertST $ tieringRunSizeToLevel r <= ln+1 + assertST $ tieringRunSizeToLevel conf r <= ln+1 -- A completed mid level run is usually of the size for the -- level it is entering, but can also be one smaller (in which case -- it'll be held back and merged again) or one larger (because it -- includes a run that has been held back before). (_, CompletedMerge r, MergeMidLevel) -> - assertST $ tieringRunSizeToLevel r `elem` [ln-1, ln, ln+1] + assertST $ tieringRunSizeToLevel conf r `elem` [ln-1, ln, ln+1] -- An ongoing merge for tiering should have 4 incoming runs of -- the right size for the level below, and at most 1 run held back @@ -426,7 +436,7 @@ invariant (LSMContent _ levels ul) = do -- the level below). (_, OngoingMerge _ rs _, _) -> do assertST $ length rs == 4 || length rs == 5 - assertST $ all (\r -> tieringRunSizeToLevel r == ln-1) rs + assertST $ all (\r -> tieringRunSizeToLevel conf r == ln-1) rs -- We don't make many assumptions apart from what the types already enforce. -- In particular, there are no invariants on the progress of the merges, @@ -742,10 +752,17 @@ suppliedCreditMergingRun (MergingRun _ d ref) = -- new :: ST s (LSM s) -new = do +new = newWith conf + where + conf = LSMConfig { + maxWriteBufferSize = 4 + } + +newWith :: LSMConfig -> ST s (LSM s) +newWith conf = do c <- newSTRef 0 lsm <- newSTRef (LSMContent Map.empty [] NoUnion) - return (LSMHandle c lsm) + return (LSMHandle c conf lsm) inserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value, Maybe Blob)] -> ST s () inserts tr lsm kvbs = updates tr lsm [ (k, Insert v b) | (k, v, b) <- kvbs ] @@ -775,28 +792,28 @@ updates :: Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s () updates tr lsm = mapM_ (uncurry (update tr lsm)) update :: Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s () -update tr (LSMHandle scr lsmr) k op = do +update tr (LSMHandle scr conf lsmr) k op = do sc <- readSTRef scr content@(LSMContent wb ls unionLevel) <- readSTRef lsmr modifySTRef' scr (+1) supplyCreditsLevels (NominalCredit 1) ls - invariant content + invariant conf content let wb' = Map.insertWith combine k op wb - if bufferSize wb' >= maxBufferSize + if bufferSize wb' >= maxBufferSize conf then do - ls' <- increment tr sc (bufferToRun wb') ls unionLevel + ls' <- increment tr sc conf (bufferToRun wb') ls unionLevel let content' = LSMContent Map.empty ls' unionLevel - invariant content' + invariant conf content' writeSTRef lsmr content' else writeSTRef lsmr (LSMContent wb' ls unionLevel) supplyMergeCredits :: LSM s -> NominalCredit -> ST s () -supplyMergeCredits (LSMHandle scr lsmr) credits = do +supplyMergeCredits (LSMHandle scr conf lsmr) credits = do content@(LSMContent _ ls _) <- readSTRef lsmr modifySTRef' scr (+1) supplyCreditsLevels credits ls - invariant content + invariant conf content data LookupResult v b = NotFound @@ -804,22 +821,22 @@ data LookupResult v b = deriving stock (Eq, Show) lookups :: LSM s -> [Key] -> ST s [LookupResult Value Blob] -lookups (LSMHandle _ lsmr) ks = do +lookups (LSMHandle _ _conf lsmr) ks = do LSMContent wb ls ul <- readSTRef lsmr runs <- concat <$> flattenLevels ls traverse (doLookup wb runs ul) ks lookup :: LSM s -> Key -> ST s (LookupResult Value Blob) -lookup (LSMHandle _ lsmr) k = do +lookup (LSMHandle _ _conf lsmr) k = do LSMContent wb ls ul <- readSTRef lsmr runs <- concat <$> flattenLevels ls doLookup wb runs ul k duplicate :: LSM s -> ST s (LSM s) -duplicate (LSMHandle _scr lsmr) = do +duplicate (LSMHandle _scr conf lsmr) = do scr' <- newSTRef 0 lsmr' <- newSTRef =<< readSTRef lsmr - return (LSMHandle scr' lsmr') + return (LSMHandle scr' conf lsmr') -- it's that simple here, because we share all the pure value and all the -- STRefs and there's no ref counting to be done @@ -832,8 +849,11 @@ duplicate (LSMHandle _scr lsmr) = do -- The more merge work remains, the more expensive are lookups on the table. unions :: [LSM s] -> ST s (LSM s) unions lsms = do - trees <- forM lsms $ \(LSMHandle _ lsmr) -> - contentToMergingTree =<< readSTRef lsmr + (unzip -> (confs, trees)) <- forM lsms $ \(LSMHandle _ conf lsmr) -> + (conf,) <$> (contentToMergingTree =<< readSTRef lsmr) + conf <- case confs of + [] -> error "unions: 0 tables" + conf : _ -> assert (all (conf==) confs) $ pure conf -- TODO: if only one table is non-empty, we don't have to create a Union, -- we can just duplicate the table. unionLevel <- newPendingUnionMerge (catMaybes trees) >>= \case @@ -843,7 +863,7 @@ unions lsms = do Union tree <$> newSTRef debt lsmr <- newSTRef (LSMContent Map.empty [] unionLevel) c <- newSTRef 0 - return (LSMHandle c lsmr) + return (LSMHandle c conf lsmr) -- | The /current/ upper bound on the number of 'UnionCredits' that have to be -- supplied before a 'union' is completed. @@ -859,7 +879,7 @@ newtype UnionDebt = UnionDebt Debt -- | Return the current union debt. This debt can be reduced until it is paid -- off using 'supplyUnionCredits'. remainingUnionDebt :: LSM s -> ST s UnionDebt -remainingUnionDebt (LSMHandle _ lsmr) = do +remainingUnionDebt (LSMHandle _ _conf lsmr) = do LSMContent _ _ ul <- readSTRef lsmr UnionDebt <$> case ul of NoUnion -> return 0 @@ -885,7 +905,7 @@ newtype UnionCredits = UnionCredits Credit -- a union has finished. In particular, if the returned number of credits is -- non-negative, then the union is finished. supplyUnionCredits :: LSM s -> UnionCredits -> ST s UnionCredits -supplyUnionCredits (LSMHandle scr lsmr) (UnionCredits credits) +supplyUnionCredits (LSMHandle scr conf lsmr) (UnionCredits credits) | credits <= 0 = return (UnionCredits 0) | otherwise = do content@(LSMContent _ _ ul) <- readSTRef lsmr @@ -899,7 +919,7 @@ supplyUnionCredits (LSMHandle scr lsmr) (UnionCredits credits) debt' <- checkedUnionDebt tree debtRef when (debt' > 0) $ assertST $ c' == 0 -- should have spent these credits - invariant content + invariant conf content return c' -- TODO: At some point the completed merging tree should to moved into the @@ -1162,8 +1182,10 @@ depositNominalCredit (NominalDebt nominalDebt) -- increment :: forall s. Tracer (ST s) Event - -> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s) -increment tr sc run0 ls0 ul = do + -> Counter + -> LSMConfig + -> Run -> Levels s -> UnionLevel s -> ST s (Levels s) +increment tr sc conf run0 ls0 ul = do go 1 [run0] ls0 where mergeTypeFor :: Levels s -> LevelMergeType @@ -1173,7 +1195,7 @@ increment tr sc run0 ls0 ul = do go !ln incoming [] = do let mergePolicy = mergePolicyForLevel ln [] ul traceWith tr' AddLevelEvent - ir <- newLevelMerge tr' ln mergePolicy (mergeTypeFor []) incoming + ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeFor []) incoming return (Level ir [] : []) where tr' = contramap (EventAt sc ln) tr @@ -1195,8 +1217,8 @@ increment tr sc run0 ls0 ul = do -- If r is still too small for this level then keep it and merge again -- with the incoming runs. - MergePolicyTiering | tieringRunSizeToLevel r < ln -> do - ir' <- newLevelMerge tr' ln MergePolicyTiering (mergeTypeFor ls) (incoming ++ [r]) + MergePolicyTiering | tieringRunSizeToLevel conf r < ln -> do + ir' <- newLevelMerge tr' conf ln MergePolicyTiering (mergeTypeFor ls) (incoming ++ [r]) return (Level ir' rs : ls) -- This tiering level is now full. We take the completed merged run @@ -1204,14 +1226,14 @@ increment tr sc run0 ls0 ul = do -- as a bundle and move them down to the level below. We start a merge -- for the new incoming runs. This level is otherwise empty. MergePolicyTiering | tieringLevelIsFull ln incoming resident -> do - ir' <- newLevelMerge tr' ln MergePolicyTiering MergeMidLevel incoming + ir' <- newLevelMerge tr' conf ln MergePolicyTiering MergeMidLevel incoming ls' <- go (ln+1) resident ls return (Level ir' [] : ls') -- This tiering level is not yet full. We move the completed merged run -- into the level proper, and start the new merge for the incoming runs. MergePolicyTiering -> do - ir' <- newLevelMerge tr' ln MergePolicyTiering (mergeTypeFor ls) incoming + ir' <- newLevelMerge tr' conf ln MergePolicyTiering (mergeTypeFor ls) incoming traceWith tr' (AddRunEvent (length resident)) return (Level ir' resident : ls) @@ -1219,16 +1241,16 @@ increment tr sc run0 ls0 ul = do -- run is too large for this level, we promote the run to the next -- level and start merging the incoming runs into this (otherwise -- empty) level . - MergePolicyLevelling | levellingLevelIsFull ln incoming r -> do + MergePolicyLevelling | levellingLevelIsFull conf ln incoming r -> do assert (null rs && null ls) $ return () - ir' <- newLevelMerge tr' ln MergePolicyTiering MergeMidLevel incoming + ir' <- newLevelMerge tr' conf ln MergePolicyTiering MergeMidLevel incoming ls' <- go (ln+1) [r] [] return (Level ir' [] : ls') -- Otherwise we start merging the incoming runs into the run. MergePolicyLevelling -> do assert (null rs && null ls) $ return () - ir' <- newLevelMerge tr' ln MergePolicyLevelling (mergeTypeFor ls) + ir' <- newLevelMerge tr' conf ln MergePolicyLevelling (mergeTypeFor ls) (incoming ++ [r]) return (Level ir' [] : []) @@ -1236,10 +1258,11 @@ increment tr sc run0 ls0 ul = do tr' = contramap (EventAt sc ln) tr newLevelMerge :: Tracer (ST s) EventDetail + -> LSMConfig -> Int -> MergePolicy -> LevelMergeType -> [Run] -> ST s (IncomingRun s) -newLevelMerge _ _ _ _ [r] = return (Single r) -newLevelMerge tr level mergePolicy mergeType rs = do +newLevelMerge _ _ _ _ _ [r] = return (Single r) +newLevelMerge tr conf level mergePolicy mergeType rs = do assertST (length rs `elem` [4, 5]) mergingRun@(MergingRun _ physicalDebt _) <- newMergingRun mergeType rs assertST (totalDebt physicalDebt <= maxPhysicalDebt) @@ -1255,7 +1278,7 @@ newLevelMerge tr level mergePolicy mergeType rs = do -- The nominal debt equals the minimum of credits we will supply before we -- expect the merge to complete. This is the same as the number of updates -- in a run that gets moved to this level. - nominalDebt = NominalDebt (tieringRunSize level) + nominalDebt = NominalDebt (tieringRunSize conf level) -- The physical debt is the number of actual merge steps we will need to -- perform before the merge is complete. This is always the sum of the @@ -1269,9 +1292,9 @@ newLevelMerge tr level mergePolicy mergeType rs = do -- includes the single run in the current level. maxPhysicalDebt = case mergePolicy of - MergePolicyLevelling -> 4 * tieringRunSize (level-1) - + levellingRunSize level - MergePolicyTiering -> length rs * tieringRunSize (level-1) + MergePolicyLevelling -> 4 * tieringRunSize conf (level-1) + + levellingRunSize conf level + MergePolicyTiering -> length rs * tieringRunSize conf (level-1) -- | Only based on run count, not their sizes. tieringLevelIsFull :: Int -> [Run] -> [Run] -> Bool @@ -1279,8 +1302,8 @@ tieringLevelIsFull _ln _incoming resident = length resident >= 4 -- | The level is only considered full once the resident run is /too large/ for -- the level. -levellingLevelIsFull :: Int -> [Run] -> Run -> Bool -levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln +levellingLevelIsFull :: LSMConfig -> Int -> [Run] -> Run -> Bool +levellingLevelIsFull conf ln _incoming resident = levellingRunSizeToLevel conf resident > ln ------------------------------------------------------------------------------- -- MergingTree abstraction @@ -1523,7 +1546,7 @@ data MTree r = MLeaf r deriving stock (Eq, Foldable, Functor, Show) allLevels :: LSM s -> ST s (Buffer, [[Run]], Maybe (MTree Run)) -allLevels (LSMHandle _ lsmr) = do +allLevels (LSMHandle _ _conf lsmr) = do LSMContent wb ls ul <- readSTRef lsmr rs <- flattenLevels ls tree <- case ul of @@ -1593,7 +1616,7 @@ type LevelRepresentation = [Run]) dumpRepresentation :: LSM s -> ST s Representation -dumpRepresentation (LSMHandle _ lsmr) = do +dumpRepresentation (LSMHandle _ _conf lsmr) = do LSMContent wb ls ul <- readSTRef lsmr levels <- mapM dumpLevel ls tree <- case ul of diff --git a/test-prototypes/Test/ScheduledMergesQLS.hs b/test-prototypes/Test/ScheduledMergesQLS.hs index 45a5592d5..1d1c8f0a4 100644 --- a/test-prototypes/Test/ScheduledMergesQLS.hs +++ b/test-prototypes/Test/ScheduledMergesQLS.hs @@ -126,7 +126,7 @@ modelDump mlsm model@Model {mlsms} = instance StateModel (Lockstep Model) where data Action (Lockstep Model) a where - ANew :: Action (Lockstep Model) (LSM RealWorld) + ANew :: Int -> Action (Lockstep Model) (LSM RealWorld) AInsert :: ModelVar Model (LSM RealWorld) -> Either (ModelVar Model Key) Key -- to refer to a prior key @@ -194,7 +194,7 @@ instance InLockstep Model where observeModel (MLookup x) = OId x observeModel (MDump x) = OId x - usedVars ANew = [] + usedVars ANew{} = [] usedVars (AInsert v evk _ _) = SomeGVar v : case evk of Left vk -> [SomeGVar vk]; _ -> [] usedVars (ADelete v evk) = SomeGVar v @@ -212,7 +212,7 @@ instance InLockstep Model where arbitraryWithVars ctx model = case findVars ctx (Proxy :: Proxy (LSM RealWorld)) of - [] -> return (Some ANew) + [] -> fmap Some $ ANew <$> choose (3,5) vars -> let kvars = findVars ctx (Proxy :: Proxy Key) existingKey = Left <$> elements kvars @@ -327,7 +327,7 @@ deriving newtype instance Arbitrary UnionCredits instance RunLockstep Model IO where observeReal _ action result = case (action, result) of - (ANew, _) -> ORef + (ANew{}, _) -> ORef (AInsert{}, x) -> OId x (ADelete{}, x) -> OId x (AMupsert{}, x) -> OId x @@ -337,7 +337,7 @@ instance RunLockstep Model IO where (ASupplyUnion{}, x) -> OId x (ADump{}, x) -> OId x - showRealResponse _ ANew = Nothing + showRealResponse _ ANew{} = Nothing showRealResponse _ AInsert{} = Just Dict showRealResponse _ ADelete{} = Just Dict showRealResponse _ AMupsert{} = Just Dict @@ -362,7 +362,7 @@ runActionIO :: Action (Lockstep Model) a runActionIO action lookUp = stToIO $ case action of - ANew -> new + ANew x -> newWith (LSMConfig x) AInsert var evk v b -> insert tr (lookUpVar var) k v b >> return k where k = either lookUpVar id evk ADelete var evk -> delete tr (lookUpVar var) k >> return () @@ -388,7 +388,7 @@ runModel :: Action (Lockstep Model) a -> (ModelValue Model a, Model) runModel action ctx m = case action of - ANew -> (MLSM mlsm, m') + ANew{} -> (MLSM mlsm, m') where (mlsm, m') = modelNew m AInsert var evk v b -> (MInsert k, m')