Skip to content

Commit a06bf8b

Browse files
committed
Fix adding/removing references on table content
In some places, we were adding/removing references but not taking into account rollbacks/delays that should be performed when exceptions occur. In particular, correct acquisition and release of write buffer blob files were not guarded by `allocateTemp` and `freeTemp`. In addition, to make our reference accounting slightly less error prone, we make a few related changes: * We add new `addReferenceTableContent` and `removeReferenceTableContent` functions, which replace most occurrences of manually increasing/decreasing reference counts for the components of a `TableContent`. For example, in `duplicate` and `close` we now use the new functions. * `newWith` now uses a `TempRegistry`, which fixes a double-free TODO. * `newWith` now requires a full `TableContent` to be passed in.
1 parent 6186bf6 commit a06bf8b

File tree

2 files changed

+108
-57
lines changed

2 files changed

+108
-57
lines changed

src/Database/LSMTree/Internal.hs

+51-46
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
104104
SerialisedKey, SerialisedValue)
105105
import Database.LSMTree.Internal.UniqCounter
106106
import qualified Database.LSMTree.Internal.Vector as V
107-
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
108107
import qualified Database.LSMTree.Internal.WriteBuffer as WB
109-
import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
110108
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
111109
import qualified System.FS.API as FS
112110
import System.FS.API (FsError, FsErrorPath (..), FsPath, Handle,
@@ -620,53 +618,58 @@ withTable sesh conf = bracket (new sesh conf) close
620618
-> IO (TableHandle IO h) #-}
621619
-- | See 'Database.LSMTree.Normal.new'.
622620
new ::
623-
(MonadSTM m, MonadThrow m, MonadMVar m, PrimMonad m)
621+
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
624622
=> Session m h
625623
-> TableConfig
626624
-> m (TableHandle m h)
627625
new sesh conf = do
628626
traceWith (sessionTracer sesh) TraceNewTable
629-
withOpenSession sesh $ \seshEnv -> do
630-
am <- newArenaManager
631-
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
632-
incrUniqCounter (sessionUniqCounter seshEnv)
633-
wbblobs <- WBB.new (sessionHasFS seshEnv) blobpath
634-
newWith sesh seshEnv conf am WB.empty wbblobs V.empty
627+
withOpenSession sesh $ \seshEnv ->
628+
withTempRegistry $ \reg -> do
629+
am <- newArenaManager
630+
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
631+
incrUniqCounter (sessionUniqCounter seshEnv)
632+
tableWriteBufferBlobs
633+
<- allocateTemp reg
634+
(WBB.new (sessionHasFS seshEnv) blobpath)
635+
WBB.removeReference
636+
let tableWriteBuffer = WB.empty
637+
tableLevels = V.empty
638+
tableCache <- mkLevelsCache tableLevels
639+
let tc = TableContent {
640+
tableWriteBuffer
641+
, tableWriteBufferBlobs
642+
, tableLevels
643+
, tableCache
644+
}
645+
newWith reg sesh seshEnv conf am tc
635646

636647
{-# SPECIALISE newWith ::
637-
Session IO h
648+
TempRegistry IO
649+
-> Session IO h
638650
-> SessionEnv IO h
639651
-> TableConfig
640652
-> ArenaManager RealWorld
641-
-> WriteBuffer
642-
-> WriteBufferBlobs IO h
643-
-> Levels IO h
653+
-> TableContent IO h
644654
-> IO (TableHandle IO h) #-}
645655
newWith ::
646656
(MonadSTM m, MonadMVar m)
647-
=> Session m h
657+
=> TempRegistry m
658+
-> Session m h
648659
-> SessionEnv m h
649660
-> TableConfig
650661
-> ArenaManager (PrimState m)
651-
-> WriteBuffer
652-
-> WriteBufferBlobs m h
653-
-> Levels m h
662+
-> TableContent m h
654663
-> m (TableHandle m h)
655-
newWith sesh seshEnv conf !am !wb !wbblobs !levels = do
664+
newWith reg sesh seshEnv conf !am !tc = do
656665
tableId <- incrUniqCounter (sessionUniqCounter seshEnv)
657666
let tr = TraceTable (uniqueToWord64 tableId) `contramap` sessionTracer sesh
658667
traceWith tr $ TraceCreateTableHandle conf
659-
cache <- mkLevelsCache levels
660668
-- The session is kept open until we've updated the session's set of tracked
661669
-- tables. If 'closeSession' is called by another thread while this code
662670
-- block is being executed, that thread will block until it reads the
663671
-- /updated/ set of tracked tables.
664-
contentVar <- RW.new $ TableContent
665-
{ tableWriteBuffer = wb
666-
, tableWriteBufferBlobs = wbblobs
667-
, tableLevels = levels
668-
, tableCache = cache
669-
}
672+
contentVar <- RW.new $ tc
670673
tableVar <- RW.new $ TableHandleOpen $ TableHandleEnv {
671674
tableSession = sesh
672675
, tableSessionEnv = seshEnv
@@ -675,7 +678,8 @@ newWith sesh seshEnv conf !am !wb !wbblobs !levels = do
675678
}
676679
let !th = TableHandle conf tableVar am tr
677680
-- Track the current table
678-
modifyMVar_ (sessionOpenTables seshEnv) $ pure . Map.insert (uniqueToWord64 tableId) th
681+
freeTemp reg $ modifyMVar_ (sessionOpenTables seshEnv)
682+
$ pure . Map.insert (uniqueToWord64 tableId) th
679683
pure $! th
680684

681685
{-# SPECIALISE close :: TableHandle IO h -> IO () #-}
@@ -686,17 +690,17 @@ close ::
686690
-> m ()
687691
close th = do
688692
traceWith (tableTracer th) TraceCloseTable
689-
RW.withWriteAccess_ (tableHandleState th) $ \case
693+
modifyWithTempRegistry_
694+
(RW.unsafeAcquireWriteAccess (tableHandleState th))
695+
(atomically . RW.unsafeReleaseWriteAccess (tableHandleState th)) $ \reg -> \case
690696
TableHandleClosed -> pure TableHandleClosed
691697
TableHandleOpen thEnv -> do
692698
-- Since we have a write lock on the table state, we know that we are the
693699
-- only thread currently closing the table. We can safely make the session
694700
-- forget about this table.
695-
-- TODO: use TempRegistry
696-
tableSessionUntrackTable thEnv
701+
freeTemp reg (tableSessionUntrackTable thEnv)
697702
RW.withWriteAccess_ (tableContent thEnv) $ \tc -> do
698-
forRunM_ (tableLevels tc) Run.removeReference
699-
WBB.removeReference (tableWriteBufferBlobs tc)
703+
removeReferenceTableContent reg tc
700704
pure tc
701705
pure TableHandleClosed
702706

@@ -1289,12 +1293,22 @@ open sesh label override snap = do
12891293
let runPaths = V.map (bimap (second $ RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))
12901294
(V.map (RunFsPaths (Paths.activeDir $ sessionRoot seshEnv))))
12911295
runNumbers
1292-
lvls <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
1296+
12931297
am <- newArenaManager
12941298
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
12951299
incrUniqCounter (sessionUniqCounter seshEnv)
1296-
wbblobs <- WBB.new hfs blobpath
1297-
newWith sesh seshEnv conf' am WB.empty wbblobs lvls
1300+
tableWriteBufferBlobs
1301+
<- allocateTemp reg
1302+
(WBB.new hfs blobpath)
1303+
WBB.removeReference
1304+
tableLevels <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
1305+
tableCache <- mkLevelsCache tableLevels
1306+
newWith reg sesh seshEnv conf' am $! TableContent {
1307+
tableWriteBuffer = WB.empty
1308+
, tableWriteBufferBlobs
1309+
, tableLevels
1310+
, tableCache
1311+
}
12981312

12991313
{-# SPECIALISE openLevels ::
13001314
TempRegistry IO
@@ -1390,21 +1404,12 @@ duplicate th = do
13901404
-- The table contents escape the read access, but we just added references
13911405
-- to each run so it is safe.
13921406
content <- RW.withReadAccess (tableContent thEnv) $ \content -> do
1393-
forRunM_ (tableLevels content) $ \r -> do
1394-
allocateTemp reg
1395-
(Run.addReference r)
1396-
(\_ -> Run.removeReference r)
1407+
addReferenceTableContent reg content
13971408
pure content
1398-
WBB.addReference (tableWriteBufferBlobs content)
1399-
-- TODO: Fix possible double-free! See 'newCursor'.
1400-
-- In `newWith`, the table handle (in the open state) gets added to
1401-
-- `sessionOpenTables', even if later an async exception occurs and
1402-
-- the temp registry rolls back all allocations.
14031409
newWith
1410+
reg
14041411
(tableSession thEnv)
14051412
(tableSessionEnv thEnv)
14061413
(tableConfig th)
14071414
(tableHandleArenaManager th)
1408-
(tableWriteBuffer content)
1409-
(tableWriteBufferBlobs content)
1410-
(tableLevels content)
1415+
content

src/Database/LSMTree/Internal/MergeSchedule.hs

+57-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ module Database.LSMTree.Internal.MergeSchedule (
77
, MergeTrace (..)
88
-- * Table content
99
, TableContent (..)
10+
, addReferenceTableContent
11+
, removeReferenceTableContent
1012
-- * Levels cache
1113
, LevelsCache (..)
1214
, mkLevelsCache
@@ -15,7 +17,6 @@ module Database.LSMTree.Internal.MergeSchedule (
1517
, Level (..)
1618
, MergingRun (..)
1719
, MergingRunState (..)
18-
, forRunM_
1920
-- * Flushes and scheduled merges
2021
, updatesWithInterleavedFlushes
2122
, flushWriteBuffer
@@ -114,6 +115,28 @@ data TableContent m h = TableContent {
114115
, tableCache :: !(LevelsCache m (Handle h))
115116
}
116117

118+
{-# SPECIALISE addReferenceTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-}
119+
addReferenceTableContent ::
120+
(PrimMonad m, MonadMask m, MonadMVar m)
121+
=> TempRegistry m
122+
-> TableContent m h
123+
-> m ()
124+
addReferenceTableContent reg (TableContent _wb wbb levels _cache) = do
125+
allocateTemp reg (WBB.addReference wbb) (\_ -> WBB.removeReference wbb)
126+
addReferenceLevels reg levels
127+
-- references on the cache are implicit
128+
129+
{-# SPECIALISE removeReferenceTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-}
130+
removeReferenceTableContent ::
131+
(PrimMonad m, MonadMask m, MonadMVar m)
132+
=> TempRegistry m
133+
-> TableContent m h
134+
-> m ()
135+
removeReferenceTableContent reg (TableContent _wb wbb levels _cache) = do
136+
freeTemp reg (WBB.removeReference wbb)
137+
removeReferenceLevels reg levels
138+
-- references on the cache are implicit
139+
117140
{-------------------------------------------------------------------------------
118141
Levels cache
119142
-------------------------------------------------------------------------------}
@@ -170,23 +193,46 @@ data MergingRunState m h =
170193
CompletedMerge !(Run m (Handle h))
171194
| OngoingMerge !(V.Vector (Run m (Handle h))) !(Merge m h)
172195

173-
{-# SPECIALISE forRunM_ ::
196+
{-# SPECIALISE addReferenceLevels :: TempRegistry IO -> Levels IO h -> IO () #-}
197+
addReferenceLevels ::
198+
(PrimMonad m, MonadMVar m, MonadMask m)
199+
=> TempRegistry m
200+
-> Levels m h
201+
-> m ()
202+
addReferenceLevels reg levels =
203+
forRunAndMergeM_ levels
204+
(\r -> allocateTemp reg (Run.addReference r) (\_ -> Run.removeReference r))
205+
(\m -> allocateTemp reg (Merge.addReference m) (\_ -> Merge.removeReference m))
206+
207+
{-# SPECIALISE addReferenceLevels :: TempRegistry IO -> Levels IO h -> IO () #-}
208+
removeReferenceLevels ::
209+
(PrimMonad m, MonadMVar m, MonadMask m)
210+
=> TempRegistry m
211+
-> Levels m h
212+
-> m ()
213+
removeReferenceLevels reg levels =
214+
forRunAndMergeM_ levels
215+
(\r -> freeTemp reg (Run.removeReference r))
216+
(\m -> freeTemp reg (Merge.removeReference m))
217+
218+
{-# SPECIALISE forRunAndMergeM_ ::
174219
Levels IO h
175220
-> (Run IO (Handle h) -> IO ())
221+
-> (Merge IO h -> IO ())
176222
-> IO () #-}
177-
forRunM_ ::
223+
forRunAndMergeM_ ::
178224
MonadMVar m
179225
=> Levels m h
180226
-> (Run m (Handle h) -> m ())
227+
-> (Merge m h -> m ())
181228
-> m ()
182-
forRunM_ lvls k = V.forM_ lvls $ \(Level mr rs) -> do
229+
forRunAndMergeM_ lvls k1 k2 = V.forM_ lvls $ \(Level mr rs) -> do
183230
case mr of
184-
SingleRun r -> k r
231+
SingleRun r -> k1 r
185232
MergingRun var -> withMVar var $ \case
186-
CompletedMerge r -> k r
187-
OngoingMerge irs _m -> V.mapM_ k irs
188-
V.mapM_ k rs
189-
233+
CompletedMerge r -> k1 r
234+
OngoingMerge irs m -> V.mapM_ k1 irs >> k2 m
235+
V.mapM_ k1 rs
190236

191237
{-# SPECIALISE foldRunM ::
192238
(b -> Run IO (Handle h) -> IO b)
@@ -385,8 +431,8 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy}
385431
(tableWriteBuffer tc)
386432
(tableWriteBufferBlobs tc))
387433
Run.removeReference
388-
WBB.removeReference (tableWriteBufferBlobs tc)
389-
wbblobs' <- WBB.new hfs (Paths.tableBlobPath root n)
434+
freeTemp reg (WBB.removeReference (tableWriteBufferBlobs tc))
435+
wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) WBB.removeReference
390436
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc)
391437
cache' <- mkLevelsCache levels'
392438
pure $! TableContent {

0 commit comments

Comments
 (0)