Skip to content

Commit 3ac6360

Browse files
committed
WIP: rework snapshots for ongoing merges
1 parent d594885 commit 3ac6360

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

src/Database/LSMTree/Internal.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,7 @@ open sesh label override snap resolve = do
12911291
<- allocateTemp reg
12921292
(WBB.new hfs blobpath)
12931293
WBB.removeReference
1294-
tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) resolve snappedLevels
1294+
tableLevels <- openLevels reg hfs hbio conf (sessionUniqCounter seshEnv) (sessionRoot seshEnv) resolve snappedLevels
12951295
tableCache <- mkLevelsCache reg tableLevels
12961296
newWith reg sesh seshEnv conf' am $! TableContent {
12971297
tableWriteBuffer = WB.empty

src/Database/LSMTree/Internal/Config.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -413,4 +413,4 @@ instance NFData MergeSchedule where
413413
--
414414
-- TODO: replace by 'Incremental'
415415
defaultMergeSchedule :: MergeSchedule
416-
defaultMergeSchedule = OneShot
416+
defaultMergeSchedule = Incremental

src/Database/LSMTree/Internal/MergeSchedule.hs

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Database.LSMTree.Internal.MergeSchedule (
2727
, maxRunSize
2828
-- * Credits
2929
, supplyCredits
30+
, supplyMergeCredits
3031
) where
3132

3233
import Control.Concurrent.Class.MonadMVar.Strict
@@ -872,7 +873,8 @@ supplyMergeCredits c (MergingRun _ _ var) = do
872873
CompletedMerge{} -> pure False
873874
(OngoingMerge _rs pvar m) -> do
874875
(n, stepResult) <- Merge.steps m c
875-
writePrimVar pvar $! NumStepsDone n
876+
(NumStepsDone x) <- readPrimVar pvar
877+
writePrimVar pvar $! NumStepsDone (n + x)
876878
pure $ stepResult == MergeComplete
877879
when b $
878880
modifyMVarMasked_ var $ \case

src/Database/LSMTree/Internal/Snapshot.hs

+21-21
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ import Control.Monad.Class.MonadThrow (MonadMask)
2121
import Control.Monad.Fix (MonadFix)
2222
import Control.Monad.Primitive (PrimMonad)
2323
import Control.TempRegistry
24-
import Data.Foldable (traverse_)
24+
import Data.Foldable (forM_, traverse_)
2525
import Data.Primitive.PrimVar
2626
import qualified Data.Vector as V
2727
import Database.LSMTree.Internal.Config
2828
import Database.LSMTree.Internal.Entry
2929
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
30-
import Database.LSMTree.Internal.Merge (Merge)
3130
import qualified Database.LSMTree.Internal.Merge as Merge
3231
import Database.LSMTree.Internal.MergeSchedule
3332
import Database.LSMTree.Internal.Paths (SessionRoot)
3433
import qualified Database.LSMTree.Internal.Paths as Paths
3534
import Database.LSMTree.Internal.Run (Run)
3635
import qualified Database.LSMTree.Internal.Run as Run
37-
import qualified Database.LSMTree.Internal.RunBuilder as RunBuilder
3836
import Database.LSMTree.Internal.RunNumber
37+
import Database.LSMTree.Internal.UniqCounter (UniqCounter,
38+
incrUniqCounter, uniqueToRunNumber)
3939
import System.FS.API (HasFS)
4040
import System.FS.BlockIO.API (HasBlockIO)
4141

@@ -49,8 +49,8 @@ numSnapRuns a = V.sum $ V.map go1 a
4949
go1 (SnapLevel b c) = go2 b + V.length c
5050
go2 (SnapMergingRun _ _ d) = go3 d
5151
go2 (SnapSingleRun _) = 1
52-
go3 (SnapCompletedMerge _) = 1
53-
go3 (SnapOngoingMerge e _ _ _) = V.length e
52+
go3 (SnapCompletedMerge _) = 1
53+
go3 (SnapOngoingMerge e _ _) = V.length e
5454

5555
type SnapLevels = V.Vector SnapLevel
5656

@@ -67,7 +67,7 @@ data SnapMergingRun =
6767

6868
data SnapMergingRunState =
6969
SnapCompletedMerge !RunNumber
70-
| SnapOngoingMerge !(V.Vector RunNumber) !NumStepsDone {- merge -} !RunNumber !Merge.Level
70+
| SnapOngoingMerge !(V.Vector RunNumber) !NumStepsDone {- merge -} !Merge.Level
7171
deriving stock (Show, Eq, Read)
7272

7373
{-------------------------------------------------------------------------------
@@ -104,14 +104,11 @@ snapMergingRunState ::
104104
snapMergingRunState (CompletedMerge r) = pure (SnapCompletedMerge (runNumber r))
105105
snapMergingRunState (OngoingMerge rs nsdVar m) = do
106106
nsd <- readPrimVar nsdVar
107-
pure (SnapOngoingMerge (V.map runNumber rs) nsd (mergeNumber m) (Merge.mergeLevel m))
107+
pure (SnapOngoingMerge (V.map runNumber rs) nsd (Merge.mergeLevel m))
108108

109109
runNumber :: Run m h -> RunNumber
110110
runNumber r = Paths.runNumber (Run.runRunFsPaths r)
111111

112-
mergeNumber :: Merge m h -> RunNumber
113-
mergeNumber m = Paths.runNumber (RunBuilder.runBuilderFsPaths (Merge.mergeBuilder m))
114-
115112
{-------------------------------------------------------------------------------
116113
Opening from snapshot format
117114
-------------------------------------------------------------------------------}
@@ -122,18 +119,20 @@ openLevels ::
122119
-> HasFS m h
123120
-> HasBlockIO m h
124121
-> TableConfig
122+
-> UniqCounter m
125123
-> SessionRoot
126124
-> ResolveSerialisedValue
127125
-> SnapLevels
128126
-> m (Levels m h)
129-
openLevels reg hfs hbio conf@TableConfig{..} sessionRoot resolve levels =
127+
openLevels reg hfs hbio conf@TableConfig{..} uc sessionRoot resolve levels =
130128
V.iforM levels $ \i -> openLevel (LevelNo (i+1))
131129
where
132130
mkPath = Paths.RunFsPaths (Paths.activeDir sessionRoot)
133131

134132
openLevel :: LevelNo -> SnapLevel -> m (Level m h)
135133
openLevel ln SnapLevel{..} = do
136-
incomingRuns <- openMergingRun snapIncomingRuns
134+
(mmmay, incomingRuns) <- openMergingRun snapIncomingRuns
135+
forM_ mmmay $ \c -> supplyMergeCredits c incomingRuns -- TODO: this part is leaky!
137136
residentRuns <- V.forM snapResidentRuns $ \rn ->
138137
allocateTemp reg
139138
(Run.openFromDisk hfs hbio caching (mkPath rn))
@@ -143,36 +142,37 @@ openLevels reg hfs hbio conf@TableConfig{..} sessionRoot resolve levels =
143142
caching = diskCachePolicyForLevel confDiskCachePolicy ln
144143
alloc = bloomFilterAllocForLevel conf ln
145144

146-
openMergingRun :: SnapMergingRun -> m (MergingRun m h)
145+
openMergingRun :: SnapMergingRun -> m (Maybe Int, MergingRun m h)
147146
openMergingRun (SnapMergingRun mpfl nr smrs) = do
148-
mrs <- openMergingRunState smrs
149-
MergingRun mpfl nr <$> newMVar mrs
147+
(n, mrs) <- openMergingRunState smrs
148+
(n,) . MergingRun mpfl nr <$> newMVar mrs
150149
openMergingRun (SnapSingleRun rn) =
151-
SingleRun <$>
150+
(Nothing,) . SingleRun <$>
152151
allocateTemp reg
153152
(Run.openFromDisk hfs hbio caching (mkPath rn))
154153
Run.removeReference
155154

156-
openMergingRunState :: SnapMergingRunState -> m (MergingRunState m h)
155+
openMergingRunState :: SnapMergingRunState -> m (Maybe Int, MergingRunState m h)
157156
openMergingRunState (SnapCompletedMerge rn) =
158-
CompletedMerge <$>
157+
(Nothing,) . CompletedMerge <$>
159158
allocateTemp reg
160159
(Run.openFromDisk hfs hbio caching (mkPath rn))
161160
Run.removeReference
162-
openMergingRunState (SnapOngoingMerge rns nsd rnm mergeLast) = do
161+
openMergingRunState (SnapOngoingMerge rns nsd mergeLast) = do
163162
rs <- V.forM rns $ \rn ->
164163
allocateTemp reg
165164
(Run.openFromDisk hfs hbio caching ((mkPath rn)))
166165
Run.removeReference
167166
nsdVar <- newPrimVar nsd
167+
rn <- uniqueToRunNumber <$> incrUniqCounter uc
168168
mergeMaybe <- allocateTemp reg
169-
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rnm) rs)
169+
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) rs)
170170
(traverse_ Merge.removeReference)
171171
-- TODO: progress merge
172172
-- TODO: write test that shows a failure because we are not progressing the merge
173173
case mergeMaybe of
174174
Nothing -> error "openLevels: merges can not be empty"
175-
Just m -> pure (OngoingMerge rs nsdVar m)
175+
Just m -> pure (Just (unNumStepsDone nsd), OngoingMerge rs nsdVar m)
176176

177177
{-------------------------------------------------------------------------------
178178
Levels

test/Test/Database/LSMTree/Normal/StateMachine.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ instance Arbitrary R.TableConfig where
200200
, R.confBloomFilterAlloc = R.AllocFixed 10
201201
, R.confFencePointerIndex = R.CompactIndex
202202
, R.confDiskCachePolicy = R.DiskCacheNone
203-
, R.confMergeSchedule = R.OneShot
203+
, R.confMergeSchedule = R.Incremental
204204
}
205205

206206
propLockstep_RealImpl_RealFS_IO ::

0 commit comments

Comments
 (0)