@@ -26,22 +26,26 @@ module Database.LSMTree.Internal.MergeSchedule (
26
26
) where
27
27
28
28
import Control.Concurrent.Class.MonadMVar.Strict
29
+ import Control.Monad (when )
29
30
import Control.Monad.Class.MonadST (MonadST )
30
31
import Control.Monad.Class.MonadSTM (MonadSTM (.. ))
31
32
import Control.Monad.Class.MonadThrow (MonadCatch , MonadMask ,
32
33
MonadThrow (.. ))
33
34
import Control.Monad.Fix (MonadFix )
34
35
import Control.Monad.Primitive
36
+ import Control.RefCount (RefCount (RefCount ))
35
37
import Control.TempRegistry
36
38
import Control.Tracer
37
39
import Data.BloomFilter (Bloom )
40
+ import Data.Foldable (traverse_ )
38
41
import qualified Data.Vector as V
39
- import Database.LSMTree.Internal.Assertions (assert )
42
+ import Database.LSMTree.Internal.Assertions (assert ,
43
+ fromIntegralChecked )
40
44
import Database.LSMTree.Internal.Config
41
45
import Database.LSMTree.Internal.Entry (Entry , NumEntries (.. ))
42
46
import Database.LSMTree.Internal.IndexCompact (IndexCompact )
43
47
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue )
44
- import Database.LSMTree.Internal.Merge (Merge )
48
+ import Database.LSMTree.Internal.Merge (Merge , StepResult ( .. ) )
45
49
import qualified Database.LSMTree.Internal.Merge as Merge
46
50
import Database.LSMTree.Internal.Paths (RunFsPaths (.. ),
47
51
SessionRoot (.. ))
@@ -199,6 +203,12 @@ mkLevelsCache reg lvls = do
199
203
--
200
204
-- * Keep the cache feature, but force a rebuild every once in a while, e.g.,
201
205
-- once in every 100 lookups.
206
+ --
207
+ -- TODO: rebuilding the cache can invalidate blob references if the cache was
208
+ -- holding the last reference to a run. This is not really a problem of just the
209
+ -- caching approach, but allowing merges to finish early. We should come up with
210
+ -- a solution to keep blob references valid until the next /update/ comes along.
211
+ -- Lookups should no invalidate blob erferences.
202
212
rebuildCache ::
203
213
(PrimMonad m , MonadMVar m , MonadMask m )
204
214
=> TempRegistry m
@@ -250,7 +260,7 @@ data Level m h = Level {
250
260
251
261
-- | A merging run is either a single run, or some ongoing merge.
252
262
data MergingRun m h =
253
- MergingRun ! (StrictMVar m (MergingRunState m h ))
263
+ MergingRun ! MergePolicyForLevel ! Int ! (StrictMVar m (MergingRunState m h ))
254
264
| SingleRun ! (Run m (Handle h ))
255
265
256
266
data MergingRunState m h =
@@ -293,7 +303,7 @@ forRunAndMergeM_ ::
293
303
forRunAndMergeM_ lvls k1 k2 = V. forM_ lvls $ \ (Level mr rs) -> do
294
304
case mr of
295
305
SingleRun r -> k1 r
296
- MergingRun var -> withMVar var $ \ case
306
+ MergingRun _ _ var -> withMVar var $ \ case
297
307
CompletedMerge r -> k1 r
298
308
OngoingMerge irs m -> V. mapM_ k1 irs >> k2 m
299
309
V. mapM_ k1 rs
@@ -312,7 +322,7 @@ foldRunM ::
312
322
foldRunM f x lvls = flip (flip V. foldM x) lvls $ \ y (Level mr rs) -> do
313
323
z <- case mr of
314
324
SingleRun r -> f y r
315
- MergingRun var -> withMVar var $ \ case
325
+ MergingRun _ _ var -> withMVar var $ \ case
316
326
CompletedMerge r -> f y r
317
327
OngoingMerge irs _m -> V. foldM f y irs
318
328
V. foldM f z rs
@@ -390,6 +400,7 @@ updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do
390
400
let wb = tableWriteBuffer tc
391
401
wbblobs = tableWriteBufferBlobs tc
392
402
(wb', es') <- addWriteBufferEntries hfs resolve wbblobs maxn wb es
403
+ supplyCredits (V. length es - V. length es') (tableLevels tc)
393
404
let tc' = tc { tableWriteBuffer = wb' }
394
405
if WB. numEntries wb' < maxn then do
395
406
pure $! tc'
@@ -682,7 +693,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
682
693
expectCompletedMerge ln (SingleRun r) = do
683
694
traceWith tr $ AtLevel ln $ TraceExpectCompletedMergeSingleRun (runNumber $ Run. runRunFsPaths r)
684
695
pure r
685
- expectCompletedMerge ln (MergingRun var) = do
696
+ expectCompletedMerge ln (MergingRun _ _ var) = do
686
697
withMVar var $ \ case
687
698
CompletedMerge r -> do
688
699
traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (runNumber $ Run. runRunFsPaths r)
@@ -715,9 +726,17 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
715
726
Run. removeReference
716
727
traceWith tr $ AtLevel ln $ TraceCompletedMerge (Run. runNumEntries r) (runNumber $ Run. runRunFsPaths r)
717
728
V. mapM_ (freeTemp reg . Run. removeReference) rs
718
- var <- newMVar (CompletedMerge r)
719
- pure $! MergingRun var
720
- Incremental -> error " newMerge: Incremental is not yet supported" -- TODO: implement
729
+ var <- newMVar $! CompletedMerge r
730
+ pure $! MergingRun mergepolicy (V. length rs) var
731
+ Incremental -> do
732
+ mergeMaybe <- allocateTemp reg
733
+ (Merge. new hfs hbio caching alloc mergelast resolve runPaths rs)
734
+ (traverse_ Merge. removeReference)
735
+ case mergeMaybe of
736
+ Nothing -> error " newMerge: merges can not be empty"
737
+ Just m -> do
738
+ var <- newMVar $! OngoingMerge rs m
739
+ pure $! MergingRun mergepolicy (V. length rs) var
721
740
722
741
data MergePolicyForLevel = LevelTiering | LevelLevelling
723
742
deriving stock Show
@@ -797,3 +816,61 @@ mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
797
816
Merge. new hfs hbio caching alloc mergeLevel resolve runPaths runs >>= \ case
798
817
Nothing -> error " mergeRuns: no inputs"
799
818
Just m -> Merge. stepsToCompletion m 1024
819
+
820
+ {- ------------------------------------------------------------------------------
821
+ Credits
822
+ -------------------------------------------------------------------------------}
823
+
824
+ type Credit = Int
825
+
826
+ {-# SPECIALISE supplyCredits ::
827
+ Credit
828
+ -> Levels IO h
829
+ -> IO ()
830
+ #-}
831
+ supplyCredits ::
832
+ (MonadSTM m , MonadST m , MonadMVar m , MonadMask m , MonadFix m )
833
+ => Credit
834
+ -> Levels m h
835
+ -> m ()
836
+ supplyCredits c levels =
837
+ V. iforM_ levels $ \ _i (Level mr _rs) ->
838
+ -- let !ln = i + 1 in
839
+ let ! cr = creditsForMerge mr in
840
+ supplyMergeCredits (ceiling (fromIntegral c * cr)) mr
841
+
842
+ creditsForMerge :: MergingRun m h -> Rational
843
+ creditsForMerge SingleRun {} = 0
844
+ creditsForMerge (MergingRun LevelLevelling _ _) = 1 + 4
845
+ creditsForMerge (MergingRun LevelTiering numRuns _) = fromIntegral numRuns / 4
846
+
847
+ {-# SPECIALISE supplyMergeCredits ::
848
+ Credit
849
+ -> MergingRun IO h
850
+ -> IO ()
851
+ #-}
852
+ -- TODO: implement doing merge werk in batches, instead of always taking the
853
+ -- MVar. The thresholds for doing merge work should be different for each level,
854
+ -- maybe co-prime?
855
+ supplyMergeCredits ::
856
+ (MonadSTM m , MonadST m , MonadMVar m , MonadMask m , MonadFix m )
857
+ => Credit
858
+ -> MergingRun m h
859
+ -> m ()
860
+ supplyMergeCredits _ SingleRun {} = pure ()
861
+ supplyMergeCredits c (MergingRun _ _ var) = do
862
+ b <- withMVar var $ \ case
863
+ CompletedMerge {} -> pure False
864
+ (OngoingMerge _rs m) -> do
865
+ (_n, stepResult) <- Merge. steps m c
866
+ pure $ stepResult == MergeComplete
867
+ when b $
868
+ modifyMVarMasked_ var $ \ case
869
+ mr@ CompletedMerge {} -> pure $! mr
870
+ (OngoingMerge rs m) -> do
871
+ RefCount n <- Merge. readRefCount m
872
+ let ! n' = fromIntegralChecked n
873
+ V. forM_ rs $ \ r -> Run. removeReferenceN r n'
874
+ r <- Merge. complete m
875
+ Merge. removeReferenceN m n'
876
+ pure $! CompletedMerge r
0 commit comments