Skip to content

Commit 6eadec9

Browse files
committed
WIP: implement scheduled merges
1 parent 66aa5f1 commit 6eadec9

File tree

5 files changed

+157
-8
lines changed

5 files changed

+157
-8
lines changed

src-extras/Database/LSMTree/Extras/NoThunks.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ instance NoThunks a => NoThunks (StrictTVar IO a) where
527527
instance NoThunks a => NoThunks (StrictMVar IO a) where
528528
showTypeOf (_ :: Proxy (StrictMVar IO a)) = "StrictMVar IO"
529529
wNoThunks ctx var = do
530-
x <- readMVar var
530+
!x <- readMVar var -- TODO: undo
531531
noThunks ctx x
532532

533533
{-------------------------------------------------------------------------------

src/Database/LSMTree/Internal.hs

+1
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,7 @@ snapshot resolve snap label th = do
12111211
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
12121212
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
12131213
$ \reg content -> do
1214+
supplyCredits (Entry.unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
12141215
content' <- flushWriteBuffer
12151216
(TraceMerge `contramap` tableTracer th)
12161217
conf

src/Database/LSMTree/Internal/MergeSchedule.hs

+11-4
Original file line numberDiff line numberDiff line change
@@ -706,11 +706,18 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
706706
traceWith tr $ AtLevel ln $ TraceExpectCompletedMergeSingleRun (runNumber $ Run.runRunFsPaths r)
707707
pure r
708708
expectCompletedMerge ln (MergingRun _ _ var) = do
709-
withMVar var $ \case
710-
CompletedMerge r -> do
709+
modifyMVarMasked var $ \case
710+
x@(CompletedMerge r) -> do
711711
traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (runNumber $ Run.runRunFsPaths r)
712-
pure r
713-
OngoingMerge _rs _ _m -> error "expectCompletedMerge: OngoingMerge not yet supported" -- TODO: implement.
712+
pure (x, r)
713+
OngoingMerge _rs _ _m -> do
714+
-- RefCount n <- Merge.readRefCount m
715+
-- let !n' = fromIntegralChecked n
716+
-- V.forM_ rs $ \r -> Run.removeReferenceN r n'
717+
-- r <- Merge.complete m
718+
-- Merge.removeReferenceN m n'
719+
-- pure (CompletedMerge r, r)
720+
error "expectCompletedMerge: OngoingMerge not yet supported" -- TODO: implement.
714721

715722
newMerge :: MergePolicyForLevel
716723
-> Merge.Level

src/Database/LSMTree/Internal/Snapshot.hs

+2
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ runNumber r = Paths.runNumber (Run.runRunFsPaths r)
122122
-> HasFS IO h
123123
-> HasBlockIO IO h
124124
-> TableConfig
125+
-> UniqCounter IO
125126
-> SessionRoot
127+
-> ResolveSerialisedValue
126128
-> SnapLevels
127129
-> IO (Levels IO h)
128130
#-}

0 commit comments

Comments
 (0)