Skip to content

Commit 6472fba

Browse files
committed
Have LevelsCaches take references on runs.
With the imminent implementation of scheduled merges, we have to take special measures against runs being closed under our feet during operations like `lookups`. `lookups` use a `LevelsCache` for quick access to the runs in a table, and it is currently an invariant that the set of runs in this `LevelsCache` is the same as the set of runs that is stored in the `Levels` structure of a table. With scheduled merges, if an ongoing merge completes, then the completion process will release a number of references to the merge's input runs, so that unused runs are closed in a timely manner. However, if this happens concurrently with a `lookups` operation, then runs could be closed while `lookups` is still using them, leading to exceptions. The currently proposed solution is to have the `LevelsCache` take explicit references for the runs contained in the cache, so that runs don't disappear until the `LevelsCache` itself is invalidated. This does mean that the `LevelsCache` can keep runs open for longer than necessary, i.e., even after the `Levels` structure has already forgotten about these runs. This problem is not yet solved in this commit, and will require some thinking and experimentation. For this, a TODO is added to the code.
1 parent abf14bd commit 6472fba

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

src/Database/LSMTree/Internal.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ new sesh conf = do
636636
WBB.removeReference
637637
let tableWriteBuffer = WB.empty
638638
tableLevels = V.empty
639-
tableCache <- mkLevelsCache tableLevels
639+
tableCache <- mkLevelsCache reg tableLevels
640640
let tc = TableContent {
641641
tableWriteBuffer
642642
, tableWriteBufferBlobs
@@ -1303,7 +1303,7 @@ open sesh label override snap = do
13031303
(WBB.new hfs blobpath)
13041304
WBB.removeReference
13051305
tableLevels <- openLevels reg hfs hbio (confDiskCachePolicy conf') runPaths
1306-
tableCache <- mkLevelsCache tableLevels
1306+
tableCache <- mkLevelsCache reg tableLevels
13071307
newWith reg sesh seshEnv conf' am $! TableContent {
13081308
tableWriteBuffer = WB.empty
13091309
, tableWriteBufferBlobs

src/Database/LSMTree/Internal/MergeSchedule.hs

+81-17
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ data TableContent m h = TableContent {
109109
-- | A hierarchy of levels. The vector indexes double as level numbers.
110110
, tableLevels :: !(Levels m h)
111111
-- | Cache of flattened 'levels'.
112-
--
113-
-- INVARIANT: when 'level's is modified, this cache should be updated as
114-
-- well, for example using 'mkLevelsCache'.
115112
, tableCache :: !(LevelsCache m (Handle h))
116113
}
117114

@@ -121,21 +118,21 @@ addReferenceTableContent ::
121118
=> TempRegistry m
122119
-> TableContent m h
123120
-> m ()
124-
addReferenceTableContent reg (TableContent _wb wbb levels _cache) = do
121+
addReferenceTableContent reg (TableContent _wb wbb levels cache) = do
125122
allocateTemp reg (WBB.addReference wbb) (\_ -> WBB.removeReference wbb)
126123
addReferenceLevels reg levels
127-
-- references on the cache are implicit
124+
addReferenceLevelsCache reg cache
128125

129126
{-# SPECIALISE removeReferenceTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-}
130127
removeReferenceTableContent ::
131128
(PrimMonad m, MonadMask m, MonadMVar m)
132129
=> TempRegistry m
133130
-> TableContent m h
134131
-> m ()
135-
removeReferenceTableContent reg (TableContent _wb wbb levels _cache) = do
132+
removeReferenceTableContent reg (TableContent _wb wbb levels cache) = do
136133
freeTemp reg (WBB.removeReference wbb)
137134
removeReferenceLevels reg levels
138-
-- references on the cache are implicit
135+
removeReferenceLevelsCache reg cache
139136

140137
{-------------------------------------------------------------------------------
141138
Levels cache
@@ -148,30 +145,97 @@ removeReferenceTableContent reg (TableContent _wb wbb levels _cache) = do
148145
-- handles. This allows for quick access in the lookup code. Recomputing this
149146
-- cache should be relatively rare.
150147
--
151-
-- Use 'mkLevelsCache' to ensure that there are no mismatches between the vector
152-
-- of runs and the vectors of run components.
148+
-- Caches take reference counts for its runs on construction, and they release
149+
-- references when the cache is invalidated. This is done so that incremental
150+
-- merges can remove references for their input runs when a merge completes,
151+
-- without closing runs that might be in use for other operations such as
152+
-- lookups. This does mean that a cache can keep runs open for longer than
153+
-- necessary, so caches should be rebuilt using, e.g., 'rebuildCache', in a
154+
-- timely manner.
153155
data LevelsCache m h = LevelsCache_ {
154156
cachedRuns :: !(V.Vector (Run m h))
155157
, cachedFilters :: !(V.Vector (Bloom SerialisedKey))
156158
, cachedIndexes :: !(V.Vector IndexCompact)
157159
, cachedKOpsFiles :: !(V.Vector h)
158160
}
159161

160-
{-# SPECIALISE mkLevelsCache :: Levels IO h -> IO (LevelsCache IO (Handle h)) #-}
162+
{-# SPECIALISE mkLevelsCache ::
163+
TempRegistry IO
164+
-> Levels IO h
165+
-> IO (LevelsCache IO (Handle h)) #-}
161166
-- | Flatten the argument 'Level's into a single vector of runs, and use that to
162-
-- populate the 'LevelsCache'.
167+
-- populate the 'LevelsCache'. The cache will take a reference for each of the
168+
-- runs that end up in the cache.
163169
mkLevelsCache ::
164-
MonadMVar m
165-
=> Levels m h -> m (LevelsCache m (Handle h))
166-
mkLevelsCache lvls = do
167-
rs <- forRunM lvls pure
170+
(PrimMonad m, MonadMVar m, MonadMask m)
171+
=> TempRegistry m
172+
-> Levels m h
173+
-> m (LevelsCache m (Handle h))
174+
mkLevelsCache reg lvls = do
175+
rs <- forRunM lvls $ \r -> allocateTemp reg (Run.addReference r) (\_ -> Run.removeReference r) >> pure r
168176
pure $! LevelsCache_ {
169177
cachedRuns = rs
170178
, cachedFilters = mapStrict Run.runFilter rs
171179
, cachedIndexes = mapStrict Run.runIndex rs
172180
, cachedKOpsFiles = mapStrict Run.runKOpsFile rs
173181
}
174182

183+
{-# SPECIALISE rebuildCache ::
184+
TempRegistry IO
185+
-> LevelsCache IO (Handle h)
186+
-> Levels IO h
187+
-> IO (LevelsCache IO (Handle h)) #-}
188+
-- | Remove references to runs in the old cache, and create a new cache with
189+
-- fresh references taken for the runs in the new levels.
190+
--
191+
-- TODO: caches are currently only rebuilt in flushWriteBuffer. If an
192+
-- OngoingMerge is completed, then tables will only rebuild the cache, and
193+
-- therefore release "old" runs, when a flush is initiated. This is sub-optimal,
194+
-- and there are at least two solutions, but it is unclear which is faster or
195+
-- more convenient.
196+
--
197+
-- * Get rid of the cache entirely, and have each batch of lookups take
198+
-- references for runs in the levels structure.
199+
--
200+
-- * Keep the cache feature, but force a rebuild every once in a while, e.g.,
201+
-- once in every 100 lookups.
202+
rebuildCache ::
203+
(PrimMonad m, MonadMVar m, MonadMask m)
204+
=> TempRegistry m
205+
-> LevelsCache m (Handle h) -- ^ old cache
206+
-> Levels m h -- ^ new levels
207+
-> m (LevelsCache m (Handle h)) -- ^ new cache
208+
rebuildCache reg oldCache newLevels = do
209+
removeReferenceLevelsCache reg oldCache
210+
mkLevelsCache reg newLevels
211+
212+
{-# SPECIALISE addReferenceLevelsCache ::
213+
TempRegistry IO
214+
-> LevelsCache IO (Handle h)
215+
-> IO () #-}
216+
addReferenceLevelsCache ::
217+
(PrimMonad m, MonadMask m, MonadMVar m)
218+
=> TempRegistry m
219+
-> LevelsCache m (Handle h)
220+
-> m ()
221+
addReferenceLevelsCache reg cache =
222+
V.forM_ (cachedRuns cache) $ \r ->
223+
allocateTemp reg
224+
(Run.addReference r)
225+
(\_ -> Run.removeReference r)
226+
227+
{-# SPECIALISE removeReferenceLevelsCache ::
228+
TempRegistry IO
229+
-> LevelsCache IO (Handle h)
230+
-> IO () #-}
231+
removeReferenceLevelsCache ::
232+
(PrimMonad m, MonadMVar m, MonadMask m)
233+
=> TempRegistry m
234+
-> LevelsCache m (Handle h)
235+
-> m ()
236+
removeReferenceLevelsCache reg cache =
237+
V.forM_ (cachedRuns cache) $ \r -> freeTemp reg (Run.removeReference r)
238+
175239
{-------------------------------------------------------------------------------
176240
Levels, runs and ongoing merges
177241
-------------------------------------------------------------------------------}
@@ -434,12 +498,12 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy}
434498
freeTemp reg (WBB.removeReference (tableWriteBufferBlobs tc))
435499
wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) WBB.removeReference
436500
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc)
437-
cache' <- mkLevelsCache levels'
501+
tableCache' <- rebuildCache reg (tableCache tc) levels'
438502
pure $! TableContent {
439503
tableWriteBuffer = WB.empty
440504
, tableWriteBufferBlobs = wbblobs'
441505
, tableLevels = levels'
442-
, tableCache = cache'
506+
, tableCache = tableCache'
443507
}
444508

445509
{- TODO: re-enable

0 commit comments

Comments
 (0)