Skip to content

Commit d594885

Browse files
committed
WIP: rework snapshots for ongoing merges
1 parent 8ddecb6 commit d594885

File tree

11 files changed

+261
-116
lines changed

11 files changed

+261
-116
lines changed

lsm-tree.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ library
152152
Database.LSMTree.Internal.RunReaders
153153
Database.LSMTree.Internal.Serialise
154154
Database.LSMTree.Internal.Serialise.Class
155+
Database.LSMTree.Internal.Snapshot
155156
Database.LSMTree.Internal.UniqCounter
156157
Database.LSMTree.Internal.Unsliced
157158
Database.LSMTree.Internal.Vector

src-extras/Database/LSMTree/Extras/NoThunks.hs

+3
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
280280
deriving stock instance Generic MergePolicyForLevel
281281
deriving anyclass instance NoThunks MergePolicyForLevel
282282

283+
deriving stock instance Generic NumRuns
284+
deriving anyclass instance NoThunks NumRuns
285+
283286
{-------------------------------------------------------------------------------
284287
Entry
285288
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal.hs

+23-65
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ import Control.Monad.Primitive
6868
import Control.TempRegistry
6969
import Control.Tracer
7070
import Data.Arena (ArenaManager, newArenaManager)
71-
import Data.Bifunctor (Bifunctor (..))
7271
import qualified Data.ByteString.Char8 as BSC
7372
import Data.Char (isNumber)
7473
import Data.Foldable
@@ -90,19 +89,19 @@ import qualified Database.LSMTree.Internal.Entry as Entry
9089
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
9190
ResolveSerialisedValue, lookupsIO)
9291
import Database.LSMTree.Internal.MergeSchedule
93-
import Database.LSMTree.Internal.Paths (RunFsPaths (..),
94-
SessionRoot (..), SnapshotName)
92+
import Database.LSMTree.Internal.Paths (SessionRoot (..),
93+
SnapshotName)
9594
import qualified Database.LSMTree.Internal.Paths as Paths
9695
import Database.LSMTree.Internal.Range (Range (..))
9796
import qualified Database.LSMTree.Internal.RawBytes as RB
9897
import Database.LSMTree.Internal.Run (Run)
9998
import qualified Database.LSMTree.Internal.Run as Run
100-
import Database.LSMTree.Internal.RunNumber
10199
import qualified Database.LSMTree.Internal.RunReader as Reader
102100
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
103101
import qualified Database.LSMTree.Internal.RunReaders as Readers
104102
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
105103
SerialisedKey, SerialisedValue)
104+
import Database.LSMTree.Internal.Snapshot
106105
import Database.LSMTree.Internal.UniqCounter
107106
import qualified Database.LSMTree.Internal.Vector as V
108107
import qualified Database.LSMTree.Internal.WriteBuffer as WB
@@ -1214,14 +1213,18 @@ snapshot resolve snap label th = do
12141213
traceWith (tableTracer th) $ TraceSnapshot snap
12151214
let conf = tableConfig th
12161215
withOpenTable th $ \thEnv -> do
1216+
let hfs = tableHasFS thEnv
1217+
let snapPath = Paths.snapshot (tableSessionRoot thEnv) snap
1218+
FS.doesFileExist (tableHasFS thEnv) snapPath >>= \b ->
1219+
when b $ throwIO (ErrSnapshotExists snap)
1220+
12171221
-- For the temporary implementation it is okay to just flush the buffer
12181222
-- before taking the snapshot.
1219-
let hfs = tableHasFS thEnv
12201223
content <- modifyWithTempRegistry
12211224
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
12221225
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
12231226
$ \reg content -> do
1224-
r <- flushWriteBuffer
1227+
content' <- flushWriteBuffer
12251228
(TraceMerge `contramap` tableTracer th)
12261229
conf
12271230
resolve
@@ -1231,35 +1234,29 @@ snapshot resolve snap label th = do
12311234
(tableSessionUniqCounter thEnv)
12321235
reg
12331236
content
1234-
pure (r, r)
1237+
pure (content', content')
12351238
-- At this point, we've flushed the write buffer but we haven't created the
12361239
-- snapshot file yet. If an asynchronous exception happens beyond this
12371240
-- point, we'll take that loss, as the inner state of the table is still
12381241
-- consistent.
1239-
runNumbers <- V.forM (tableLevels content) $ \(Level mr rs) -> do
1240-
(,V.map (runNumber . Run.runRunFsPaths) rs) <$>
1241-
case mr of
1242-
SingleRun r -> pure (True, runNumber (Run.runRunFsPaths r))
1243-
MergingRun _ _ var -> do
1244-
withMVar var $ \case
1245-
CompletedMerge r -> pure (False, runNumber (Run.runRunFsPaths r))
1246-
OngoingMerge{} -> error "snapshot: OngoingMerge not yet supported" -- TODO: implement
1247-
let snapPath = Paths.snapshot (tableSessionRoot thEnv) snap
1248-
FS.doesFileExist (tableHasFS thEnv) snapPath >>= \b ->
1249-
when b $ throwIO (ErrSnapshotExists snap)
1242+
1243+
snappedLevels <- snapLevels (tableLevels content)
1244+
let snapContents = BSC.pack $ show (label, snappedLevels, tableConfig th)
1245+
12501246
FS.withFile
12511247
(tableHasFS thEnv)
12521248
snapPath
12531249
(FS.WriteMode FS.MustBeNew) $ \h ->
1254-
void $ FS.hPutAllStrict (tableHasFS thEnv) h
1255-
(BSC.pack $ show (label, runNumbers, tableConfig th))
1256-
pure $! V.sum (V.map (\((_ :: (Bool, RunNumber)), rs) -> 1 + V.length rs) runNumbers)
1250+
void $ FS.hPutAllStrict (tableHasFS thEnv) h snapContents
1251+
1252+
pure $! numSnapRuns snappedLevels
12571253

12581254
{-# SPECIALISE open ::
12591255
Session IO h
12601256
-> SnapshotLabel
12611257
-> TableConfigOverride
12621258
-> SnapshotName
1259+
-> ResolveSerialisedValue
12631260
-> IO (TableHandle IO h) #-}
12641261
-- | See 'Database.LSMTree.Normal.open'.
12651262
open ::
@@ -1268,8 +1265,9 @@ open ::
12681265
-> SnapshotLabel -- ^ Expected label
12691266
-> TableConfigOverride -- ^ Optional config override
12701267
-> SnapshotName
1268+
-> ResolveSerialisedValue
12711269
-> m (TableHandle m h)
1272-
open sesh label override snap = do
1270+
open sesh label override snap resolve = do
12731271
traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override
12741272
withOpenSession sesh $ \seshEnv -> do
12751273
withTempRegistry $ \reg -> do
@@ -1283,26 +1281,17 @@ open sesh label override snap = do
12831281
snapPath
12841282
FS.ReadMode $ \h ->
12851283
FS.hGetAll (sessionHasFS seshEnv) h
1286-
let (label', runNumbers, conf) =
1287-
-- why we are using read for this?
1288-
-- apparently this is a temporary solution, to be done properly in WP15
1289-
read @(SnapshotLabel, V.Vector ((Bool, RunNumber), V.Vector RunNumber), TableConfig) $
1290-
BSC.unpack $ BSC.toStrict $ bs
1291-
1292-
let conf' = applyOverride override conf
1284+
let (label', snappedLevels, conf) = read $ BSC.unpack $ BSC.toStrict $ bs
12931285
unless (label == label') $ throwIO (ErrSnapshotWrongType snap)
1294-
let runPaths = V.map (bimap (second $ RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))
1295-
(V.map (RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))))
1296-
runNumbers
1297-
1286+
let conf' = applyOverride override conf
12981287
am <- newArenaManager
12991288
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
13001289
incrUniqCounter (sessionUniqCounter seshEnv)
13011290
tableWriteBufferBlobs
13021291
<- allocateTemp reg
13031292
(WBB.new hfs blobpath)
13041293
WBB.removeReference
1305-
tableLevels <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
1294+
tableLevels <- openLevels reg hfs hbio conf (sessionRoot seshEnv) resolve snappedLevels
13061295
tableCache <- mkLevelsCache reg tableLevels
13071296
newWith reg sesh seshEnv conf' am $! TableContent {
13081297
tableWriteBuffer = WB.empty
@@ -1311,37 +1300,6 @@ open sesh label override snap = do
13111300
, tableCache
13121301
}
13131302

1314-
{-# SPECIALISE openLevels ::
1315-
TempRegistry IO
1316-
-> HasFS IO h
1317-
-> HasBlockIO IO h
1318-
-> DiskCachePolicy
1319-
-> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths)
1320-
-> IO (Levels IO h) #-}
1321-
-- | Open multiple levels.
1322-
openLevels ::
1323-
(MonadFix m, MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m)
1324-
=> TempRegistry m
1325-
-> HasFS m h
1326-
-> HasBlockIO m h
1327-
-> DiskCachePolicy
1328-
-> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths)
1329-
-> m (Levels m h)
1330-
openLevels reg hfs hbio diskCachePolicy levels =
1331-
flip V.imapMStrict levels $ \i (mrPath, rsPaths) -> do
1332-
let ln = LevelNo (i+1) -- level 0 is the write buffer
1333-
caching = diskCachePolicyForLevel diskCachePolicy ln
1334-
!r <- allocateTemp reg
1335-
(Run.openFromDisk hfs hbio caching (snd mrPath))
1336-
Run.removeReference
1337-
!rs <- flip V.mapMStrict rsPaths $ \run ->
1338-
allocateTemp reg
1339-
(Run.openFromDisk hfs hbio caching run)
1340-
Run.removeReference
1341-
let !mr = if fst mrPath then SingleRun r
1342-
else error "openLevels: OngoingMerge not yet supported"
1343-
pure $! Level mr rs
1344-
13451303
{-# SPECIALISE deleteSnapshot ::
13461304
Session IO h
13471305
-> SnapshotName

src/Database/LSMTree/Internal/Config.hs

-28
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ instance NFData TableConfig where
8080
rnf (TableConfig a b c d e f g) =
8181
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f `seq` rnf g
8282

83-
-- | TODO: this should be removed once we have proper snapshotting with proper
84-
-- persistence of the config to disk.
85-
deriving stock instance Read TableConfig
86-
8783
-- | A reasonable default 'TableConfig'.
8884
--
8985
-- This uses a write buffer with up to 20,000 elements and a generous amount of
@@ -169,10 +165,6 @@ data MergePolicy =
169165
instance NFData MergePolicy where
170166
rnf MergePolicyLazyLevelling = ()
171167

172-
-- | TODO: this should be removed once we have proper snapshotting with proper
173-
-- persistence of the config to disk.
174-
deriving stock instance Read MergePolicy
175-
176168
{-------------------------------------------------------------------------------
177169
Size ratio
178170
-------------------------------------------------------------------------------}
@@ -183,10 +175,6 @@ data SizeRatio = Four
183175
instance NFData SizeRatio where
184176
rnf Four = ()
185177

186-
-- | TODO: this should be removed once we have proper snapshotting with proper
187-
-- persistence of the config to disk.
188-
deriving stock instance Read SizeRatio
189-
190178
sizeRatioInt :: SizeRatio -> Int
191179
sizeRatioInt = \case Four -> 4
192180

@@ -214,14 +202,6 @@ data WriteBufferAlloc =
214202
instance NFData WriteBufferAlloc where
215203
rnf (AllocNumEntries n) = rnf n
216204

217-
-- | TODO: this should be removed once we have proper snapshotting with proper
218-
-- persistence of the config to disk.
219-
deriving stock instance Read WriteBufferAlloc
220-
221-
-- | TODO: this should be removed once we have proper snapshotting with proper
222-
-- persistence of the config to disk.
223-
deriving stock instance Read NumEntries
224-
225205
{-------------------------------------------------------------------------------
226206
Bloom filter allocation
227207
-------------------------------------------------------------------------------}
@@ -263,10 +243,6 @@ instance NFData BloomFilterAlloc where
263243
rnf (AllocRequestFPR fpr) = rnf fpr
264244
rnf (AllocMonkey a b) = rnf a `seq` rnf b
265245

266-
-- | TODO: this should be removed once we have proper snapshotting with proper
267-
-- persistence of the config to disk.
268-
deriving stock instance Read BloomFilterAlloc
269-
270246
defaultBloomFilterAlloc :: BloomFilterAlloc
271247
defaultBloomFilterAlloc = AllocFixed 10
272248

@@ -334,10 +310,6 @@ instance NFData FencePointerIndex where
334310
rnf CompactIndex = ()
335311
rnf OrdinaryIndex = ()
336312

337-
-- | TODO: this should be removed once we have proper snapshotting with proper
338-
-- persistence of the config to disk.
339-
deriving stock instance Read FencePointerIndex
340-
341313
{-------------------------------------------------------------------------------
342314
Disk cache policy
343315
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal/MergeSchedule.hs

+30-15
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@ module Database.LSMTree.Internal.MergeSchedule (
1616
, Levels
1717
, Level (..)
1818
, MergingRun (..)
19+
, NumRuns (..)
1920
, MergingRunState (..)
21+
, NumStepsDone (..)
2022
-- * Flushes and scheduled merges
2123
, updatesWithInterleavedFlushes
2224
, flushWriteBuffer
2325
-- * Exported for cabal-docspec
2426
, MergePolicyForLevel (..)
2527
, maxRunSize
28+
-- * Credits
29+
, supplyCredits
2630
) where
2731

2832
import Control.Concurrent.Class.MonadMVar.Strict
@@ -38,6 +42,8 @@ import Control.TempRegistry
3842
import Control.Tracer
3943
import Data.BloomFilter (Bloom)
4044
import Data.Foldable (traverse_)
45+
import Data.Primitive (Prim)
46+
import Data.Primitive.PrimVar
4147
import qualified Data.Vector as V
4248
import Database.LSMTree.Internal.Assertions (assert,
4349
fromIntegralChecked)
@@ -260,12 +266,19 @@ data Level m h = Level {
260266

261267
-- | A merging run is either a single run, or some ongoing merge.
262268
data MergingRun m h =
263-
MergingRun !MergePolicyForLevel !Int !(StrictMVar m (MergingRunState m h))
269+
MergingRun !MergePolicyForLevel !NumRuns !(StrictMVar m (MergingRunState m h))
264270
| SingleRun !(Run m (Handle h))
265271

272+
newtype NumRuns = NumRuns { unNumRuns :: Int }
273+
deriving stock (Show, Eq)
274+
266275
data MergingRunState m h =
267276
CompletedMerge !(Run m (Handle h))
268-
| OngoingMerge !(V.Vector (Run m (Handle h))) !(Merge m h)
277+
| OngoingMerge !(V.Vector (Run m (Handle h))) !(PrimVar (PrimState m) NumStepsDone) !(Merge m h)
278+
279+
newtype NumStepsDone = NumStepsDone { unNumStepsDone :: Int }
280+
deriving stock (Show, Eq)
281+
deriving newtype Prim
269282

270283
{-# SPECIALISE addReferenceLevels :: TempRegistry IO -> Levels IO h -> IO () #-}
271284
addReferenceLevels ::
@@ -305,7 +318,7 @@ forRunAndMergeM_ lvls k1 k2 = V.forM_ lvls $ \(Level mr rs) -> do
305318
SingleRun r -> k1 r
306319
MergingRun _ _ var -> withMVar var $ \case
307320
CompletedMerge r -> k1 r
308-
OngoingMerge irs m -> V.mapM_ k1 irs >> k2 m
321+
OngoingMerge irs _ m -> V.mapM_ k1 irs >> k2 m
309322
V.mapM_ k1 rs
310323

311324
{-# SPECIALISE foldRunM ::
@@ -324,7 +337,7 @@ foldRunM f x lvls = flip (flip V.foldM x) lvls $ \y (Level mr rs) -> do
324337
SingleRun r -> f y r
325338
MergingRun _ _ var -> withMVar var $ \case
326339
CompletedMerge r -> f y r
327-
OngoingMerge irs _m -> V.foldM f y irs
340+
OngoingMerge irs _ _m -> V.foldM f y irs
328341
V.foldM f z rs
329342

330343
{-# SPECIALISE forRunM ::
@@ -695,7 +708,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
695708
CompletedMerge r -> do
696709
traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (runNumber $ Run.runRunFsPaths r)
697710
pure r
698-
OngoingMerge _rs _m -> error "expectCompletedMerge: OngoingMerge not yet supported" -- TODO: implement.
711+
OngoingMerge _rs _ _m -> error "expectCompletedMerge: OngoingMerge not yet supported" -- TODO: implement.
699712

700713
newMerge :: MergePolicyForLevel
701714
-> Merge.Level
@@ -722,19 +735,20 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
722735
traceWith tr $ AtLevel ln $ TraceCompletedMerge (Run.runNumEntries r) (runNumber $ Run.runRunFsPaths r)
723736
V.mapM_ (freeTemp reg . Run.removeReference) rs
724737
var <- newMVar $! CompletedMerge r
725-
pure $! MergingRun mergepolicy (V.length rs) var
738+
pure $! MergingRun mergepolicy (NumRuns $ V.length rs) var
726739
Incremental -> do
727740
mergeMaybe <- allocateTemp reg
728741
(Merge.new hfs hbio caching alloc mergelast resolve runPaths rs)
729742
(traverse_ Merge.removeReference)
730743
case mergeMaybe of
731744
Nothing -> error "newMerge: merges can not be empty"
732745
Just m -> do
733-
var <- newMVar $! OngoingMerge rs m
734-
pure $! MergingRun mergepolicy (V.length rs) var
746+
pvar <- newPrimVar $! NumStepsDone 0
747+
var <- newMVar $! OngoingMerge rs pvar m
748+
pure $! MergingRun mergepolicy (NumRuns $ V.length rs) var
735749

736750
data MergePolicyForLevel = LevelTiering | LevelLevelling
737-
deriving stock Show
751+
deriving stock (Show, Eq)
738752

739753
mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel
740754
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels
@@ -835,9 +849,9 @@ supplyCredits c levels =
835849
supplyMergeCredits (ceiling (fromIntegral c * cr)) mr
836850

837851
creditsForMerge :: MergingRun m h -> Rational
838-
creditsForMerge SingleRun{} = 0
839-
creditsForMerge (MergingRun LevelLevelling _ _) = 1 + 4
840-
creditsForMerge (MergingRun LevelTiering numRuns _) = fromIntegral numRuns / 4
852+
creditsForMerge SingleRun{} = 0
853+
creditsForMerge (MergingRun LevelLevelling _ _) = 1 + 4
854+
creditsForMerge (MergingRun LevelTiering (NumRuns n) _) = fromIntegral n / 4
841855

842856
{-# SPECIALISE supplyMergeCredits ::
843857
Credit
@@ -856,13 +870,14 @@ supplyMergeCredits _ SingleRun{} = pure ()
856870
supplyMergeCredits c (MergingRun _ _ var) = do
857871
b <- withMVar var $ \case
858872
CompletedMerge{} -> pure False
859-
(OngoingMerge _rs m) -> do
860-
(_n, stepResult) <- Merge.steps m c
873+
(OngoingMerge _rs pvar m) -> do
874+
(n, stepResult) <- Merge.steps m c
875+
writePrimVar pvar $! NumStepsDone n
861876
pure $ stepResult == MergeComplete
862877
when b $
863878
modifyMVarMasked_ var $ \case
864879
mr@CompletedMerge{} -> pure $! mr
865-
(OngoingMerge rs m) -> do
880+
(OngoingMerge rs _var m) -> do
866881
RefCount n <- Merge.readRefCount m
867882
let !n' = fromIntegralChecked n
868883
V.forM_ rs $ \r -> Run.removeReferenceN r n'

0 commit comments

Comments
 (0)