Skip to content

Commit 6186bf6

Browse files
committed
Give Merge a reference counter
In addition, the merge now keeps track of the 'MergeState' (instead of just the last `StepResult`), which is used to check even more bad usages of the `Merge` API. Examples are using `complete` before merging was done, or using `complete` on a closed `Merge`.
1 parent 530abbe commit 6186bf6

File tree

3 files changed

+110
-42
lines changed

3 files changed

+110
-42
lines changed

src-extras/Database/LSMTree/Extras/NoThunks.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,9 @@ deriving anyclass instance NoThunks Merge.Level
341341
deriving stock instance Generic Merge.StepResult
342342
deriving anyclass instance NoThunks Merge.StepResult
343343

344+
deriving stock instance Generic Merge.MergeState
345+
deriving anyclass instance NoThunks Merge.MergeState
346+
344347
{-------------------------------------------------------------------------------
345348
Readers
346349
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal/Merge.hs

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ module Database.LSMTree.Internal.Merge (
55
Merge (..)
66
, Level (..)
77
, Mappend
8+
, MergeState (..)
89
, new
9-
, close
10+
, addReference
11+
, removeReference
1012
, complete
1113
, stepsToCompletion
1214
, stepsToCompletionCounted
@@ -18,10 +20,12 @@ import Control.Exception (assert)
1820
import Control.Monad (when)
1921
import Control.Monad.Class.MonadST (MonadST)
2022
import Control.Monad.Class.MonadSTM (MonadSTM (..))
21-
import Control.Monad.Class.MonadThrow (MonadCatch, MonadThrow (..))
23+
import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask (..),
24+
MonadThrow (..))
2225
import Control.Monad.Fix (MonadFix)
23-
import Control.Monad.Primitive (PrimState, RealWorld)
24-
import Control.RefCount (RefCount (..))
26+
import Control.Monad.Primitive (PrimMonad, PrimState, RealWorld)
27+
import Control.RefCount (RefCounter)
28+
import qualified Control.RefCount as RC
2529
import Data.Coerce (coerce)
2630
import Data.Primitive.MutVar
2731
import Data.Traversable (for)
@@ -37,6 +41,7 @@ import qualified Database.LSMTree.Internal.RunReader as Reader
3741
import Database.LSMTree.Internal.RunReaders (Readers)
3842
import qualified Database.LSMTree.Internal.RunReaders as Readers
3943
import Database.LSMTree.Internal.Serialise
44+
import GHC.Stack (HasCallStack)
4045
import qualified System.FS.API as FS
4146
import System.FS.API (HasFS)
4247
import System.FS.BlockIO.API (HasBlockIO)
@@ -45,23 +50,34 @@ import System.FS.BlockIO.API (HasBlockIO)
4550
--
4651
-- Since we always resolve all entries of the same key in one go, there is no
4752
-- need to store incompletely-resolved entries.
48-
--
49-
-- TODO: Reference counting will have to be done somewhere, either here or in
50-
-- the layer above.
5153
data Merge m h = Merge {
52-
mergeLevel :: !Level
53-
, mergeMappend :: !Mappend
54-
, mergeReaders :: {-# UNPACK #-} !(Readers m (FS.Handle h))
55-
, mergeBuilder :: !(RunBuilder (PrimState m) (FS.Handle h))
54+
mergeLevel :: !Level
55+
, mergeMappend :: !Mappend
56+
, mergeReaders :: {-# UNPACK #-} !(Readers m (FS.Handle h))
57+
, mergeBuilder :: !(RunBuilder (PrimState m) (FS.Handle h))
5658
-- | The caching policy to use for the Run in the 'MergeComplete'.
57-
, mergeCaching :: !RunDataCaching
59+
, mergeCaching :: !RunDataCaching
5860
-- | The result of the latest call to 'steps'. This is used to determine
5961
-- whether a merge can be 'complete'd.
60-
, mergeLastStepResult :: !(MutVar (PrimState m) StepResult)
61-
, mergeHasFS :: !(HasFS m h)
62-
, mergeHasBlockIO :: !(HasBlockIO m h)
62+
, mergeState :: !(MutVar (PrimState m) MergeState)
63+
, mergeRefCounter :: !(RefCounter m)
64+
, mergeHasFS :: !(HasFS m h)
65+
, mergeHasBlockIO :: !(HasBlockIO m h)
6366
}
6467

68+
-- | The current state of the merge.
69+
data MergeState =
70+
-- | There is still merging work to be done
71+
Merging
72+
-- | There is no more merging work to be done, but the merge still has to be
73+
-- completed to yield a new run.
74+
| MergingDone
75+
-- | A run was yielded as the result of a merge. The merge is implicitly
76+
-- closed.
77+
| Completed
78+
-- | The merge was closed before it was completed.
79+
| Closed
80+
6581
data Level = MidLevel | LastLevel
6682
deriving stock (Eq, Show)
6783

@@ -80,7 +96,7 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue
8096
-- | Returns 'Nothing' if no input 'Run' contains any entries.
8197
-- The list of runs should be sorted from new to old.
8298
new ::
83-
(MonadCatch m, MonadSTM m, MonadST m)
99+
(MonadCatch m, MonadSTM m, MonadST m, MonadFix m)
84100
=> HasFS m h
85101
-> HasBlockIO m h
86102
-> RunDataCaching
@@ -97,31 +113,65 @@ new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
97113
-- calculate upper bounds based on input runs
98114
let numEntries = coerce (sum @V.Vector @Int) (fmap Run.runNumEntries runs)
99115
mergeBuilder <- Builder.new fs targetPaths numEntries alloc
100-
mergeLastStepResult <- newMutVar $! MergeInProgress
116+
mergeState <- newMutVar $! Merging
117+
mergeRefCounter <-
118+
RC.mkRefCounter1 (Just $! finaliser fs hbio mergeState mergeBuilder mergeReaders)
101119
return Merge {
102120
mergeHasFS = fs
103121
, mergeHasBlockIO = hbio
104122
, ..
105123
}
106124

107-
{-# SPECIALISE close :: Merge IO (FS.Handle h) -> IO () #-}
108-
-- | This function should be called when discarding a 'Merge' before it
109-
-- was done (i.e. returned 'MergeComplete'). This removes the incomplete files
110-
-- created for the new run so far and avoids leaking file handles.
125+
{-# SPECIALISE addReference :: Merge IO h -> IO () #-}
126+
addReference :: (HasCallStack, PrimMonad m) => Merge m h -> m ()
127+
addReference Merge{..} = RC.addReference mergeRefCounter
128+
129+
{-# SPECIALISE removeReference :: Merge IO h -> IO () #-}
130+
removeReference :: (HasCallStack, PrimMonad m, MonadMask m) => Merge m h -> m ()
131+
removeReference Merge{..} = RC.removeReference mergeRefCounter
132+
133+
{-# SPECIALISE finaliser :: HasFS IO h -> HasBlockIO IO h -> MutVar RealWorld MergeState -> RunBuilder RealWorld (FS.Handle h) -> Readers IO (FS.Handle h) -> IO () #-}
134+
-- | Closes the underlying builder and readers.
111135
--
112-
-- Once it has been called, do not use the 'Merge' any more!
113-
close :: (MonadFix m, MonadSTM m, MonadST m) => Merge m h -> m ()
114-
close Merge {..} = do
115-
Builder.close mergeHasFS mergeBuilder
116-
Readers.close mergeHasFS mergeHasBlockIO mergeReaders
136+
-- This function is idempotent. Technically, this is not necessary because the
137+
-- finaliser is going to run only once, but it is a nice property for
138+
-- @close@-like functions to be idempotent.
139+
finaliser ::
140+
(MonadFix m, MonadSTM m, MonadST m)
141+
=> HasFS m h
142+
-> HasBlockIO m h
143+
-> MutVar (PrimState m) MergeState
144+
-> RunBuilder (PrimState m) (FS.Handle h)
145+
-> Readers m (FS.Handle h)
146+
-> m ()
147+
finaliser hfs hbio var b rs = do
148+
st <- readMutVar var
149+
let shouldClose = case st of
150+
Merging -> True
151+
MergingDone -> True
152+
Completed -> False
153+
Closed -> False
154+
when shouldClose $ do
155+
Builder.close hfs b
156+
Readers.close hfs hbio rs
157+
writeMutVar var $! Closed
117158

118159
{-# SPECIALISE complete ::
119160
Merge IO h
120161
-> IO (Run IO (FS.Handle h)) #-}
121162
-- | Complete a 'Merge', returning a new 'Run' as the result of merging the
122-
-- input runs. This function will /not/ do any merging work if there is any
123-
-- remaining. That is, if not enough 'steps' were performed to exhaust the input
124-
-- 'Readers', this function will throw an error.
163+
-- input runs.
164+
--
165+
-- The resulting run has the same reference count as the input 'Merge'. The
166+
-- 'Merge' does not have to be closed afterwards, since it is closed implicitly
167+
-- by 'complete'.
168+
--
169+
-- This function will /not/ do any merging work if there is any remaining. That
170+
-- is, if not enough 'steps' were performed to exhaust the input 'Readers', this
171+
-- function will throw an error.
172+
--
173+
-- Returns an error if the merge was not yet done, if it was already completed
174+
-- before, or if it was already closed.
125175
--
126176
-- Note: this function creates new 'Run' resources, so it is recommended to run
127177
-- this function with async exceptions masked. Otherwise, these resources can
@@ -131,11 +181,22 @@ complete ::
131181
=> Merge m h
132182
-> m (Run m (FS.Handle h))
133183
complete Merge{..} = do
134-
readMutVar mergeLastStepResult >>= \case
135-
MergeInProgress -> error "complete: Merge is not yet completed!"
136-
MergeComplete -> do
137-
Run.fromMutable mergeHasFS mergeHasBlockIO mergeCaching
138-
(RefCount 1) mergeBuilder
184+
readMutVar mergeState >>= \case
185+
Merging -> error "complete: Merge is not done"
186+
MergingDone -> do
187+
-- Since access to a merge /should/ be sequentialised, we can assume
188+
-- that the ref count has not changed between this read and the use of
189+
-- fromMutable.
190+
--
191+
-- TODO: alternatively, the mergeRefCounter could be reused as the
192+
-- reference counter for the output run.
193+
n <- RC.readRefCount mergeRefCounter
194+
r <- Run.fromMutable mergeHasFS mergeHasBlockIO mergeCaching
195+
n mergeBuilder
196+
writeMutVar mergeState $! Completed
197+
pure r
198+
Completed -> error "complete: Merge is already completed"
199+
Closed -> error "complete: Merge is closed"
139200

140201
{-# SPECIALISE stepsToCompletion ::
141202
Merge IO h
@@ -194,6 +255,8 @@ stepsInvariant requestedSteps = \case
194255
--
195256
-- Returns the number of input entries read, which is guaranteed to be at least
196257
-- as many as requested (unless the merge is complete).
258+
--
259+
-- Returns an error if the merge was already completed or closed.
197260
steps ::
198261
forall h m.
199262
(MonadCatch m, MonadSTM m, MonadST m)
@@ -207,9 +270,11 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
207270
-- clear whether our (upcoming) implementation of scheduled merges is going
208271
-- to satisfy this precondition when it calls @steps@, so for now we do the
209272
-- check.
210-
readMutVar mergeLastStepResult >>= \case
211-
MergeComplete -> pure (0, MergeComplete)
212-
MergeInProgress -> go 0
273+
readMutVar mergeState >>= \case
274+
Merging -> go 0
275+
MergingDone -> pure (0, MergeComplete)
276+
Completed -> error "steps: Merge is completed"
277+
Closed -> error "steps: Merge is closed"
213278
where
214279
assertStepsInvariant res = assert (stepsInvariant requestedSteps res) res
215280

@@ -228,7 +293,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
228293
Readers.Drained -> do
229294
-- no future entries, no previous entry to resolve, just write!
230295
writeReaderEntry fs mergeLevel mergeBuilder key entry
231-
writeMutVar mergeLastStepResult $! MergeComplete
296+
writeMutVar mergeState $! MergingDone
232297
pure (n + 1, MergeComplete)
233298

234299
handleEntry !n !key (Reader.Entry (Mupdate v)) =
@@ -267,15 +332,15 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
267332
dropRemaining (n + 1) key
268333
Readers.Drained -> do
269334
writeSerialisedEntry fs mergeLevel mergeBuilder key resolved
270-
writeMutVar mergeLastStepResult $! MergeComplete
335+
writeMutVar mergeState $! MergingDone
271336
pure (n + 1, MergeComplete)
272337

273338
dropRemaining !n !key = do
274339
(dropped, hasMore) <- Readers.dropWhileKey fs hbio mergeReaders key
275340
case hasMore of
276341
Readers.HasMore -> go (n + dropped)
277342
Readers.Drained -> do
278-
writeMutVar mergeLastStepResult $! MergeComplete
343+
writeMutVar mergeState $! MergingDone
279344
pure (n + dropped, MergeComplete)
280345

281346
{-# SPECIALISE writeReaderEntry ::

test/Test/Database/LSMTree/Internal/Merge.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ prop_CloseMerge fs hbio level (Positive stepSize) (SmallList wbs) =
124124
withRuns fs hbio (V.fromList (zip (simplePaths [10..]) wbs')) $ \runs -> do
125125
let path0 = simplePath 0
126126
mergeToClose <- makeInProgressMerge path0 runs
127-
traverse_ Merge.close mergeToClose
127+
traverse_ Merge.removeReference mergeToClose
128128

129129
filesExist <- traverse (FS.doesFileExist fs) (pathsForRunFiles path0)
130130

@@ -141,7 +141,7 @@ prop_CloseMerge fs hbio level (Positive stepSize) (SmallList wbs) =
141141
-- just do a few steps once, ideally not completing the merge
142142
Merge.steps merge stepSize >>= \case
143143
(_, Merge.MergeComplete) -> do
144-
Merge.close merge -- run not needed, close
144+
Merge.removeReference merge -- run not needed, close
145145
return Nothing -- not in progress
146146
(_, Merge.MergeInProgress) ->
147147
return (Just merge)

0 commit comments

Comments
 (0)