Skip to content

Commit

Permalink
Rename Merge.Level to MergeType and extend with MergeUnion enum value
Browse files Browse the repository at this point in the history
This follows the naming scheme from the prototype.

This just adds the new enum (and renames) but does not introduce any new
merging behaviour yet, and the new MergeUnion value is not used (except
in snapshot (de)serialisation).
  • Loading branch information
dcoutts committed Jan 10, 2025
1 parent f607fba commit 3e3cc52
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 86 deletions.
16 changes: 9 additions & 7 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Database.LSMTree.Extras.RunData
import Database.LSMTree.Extras.UTxO
import Database.LSMTree.Internal.Entry
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Merge (MergeType (..))
import Database.LSMTree.Internal.Paths (RunFsPaths (..),
pathsForRunFiles, runChecksumsPath)
import Database.LSMTree.Internal.Run (Run)
Expand Down Expand Up @@ -106,7 +107,7 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [
{ name = "word64-delete-x4-lastlevel"
, nentries = totalEntries `splitInto` 4
, fdeletes = 1
, mergeLevel = Merge.LastLevel
, mergeType = MergeLastLevel
}
-- different key and value sizes
, benchMerge configWord64
Expand Down Expand Up @@ -157,21 +158,21 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [
, nentries = totalEntries `splitInto` 4
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
, mergeType = MergeLastLevel
}
, benchMerge configUTxO
{ name = "utxo-x4+1-min-skewed-lastlevel" -- live levelling merge
, nentries = totalEntries `distributed` [1, 1, 1, 1, 4]
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
, mergeType = MergeLastLevel
}
, benchMerge configUTxO
{ name = "utxo-x4+1-max-skewed-lastlevel" -- live levelling merge
, nentries = totalEntries `distributed` [1, 1, 1, 1, 16]
, finserts = 1
, fdeletes = 1
, mergeLevel = Merge.LastLevel
, mergeType = MergeLastLevel
}
]
where
Expand Down Expand Up @@ -233,7 +234,8 @@ merge ::
merge fs hbio Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) mergeLevel f targetPaths runs
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10)
mergeType f targetPaths runs
Merge.stepsToCompletion m stepSize

outputRunPaths :: Run.RunFsPaths
Expand Down Expand Up @@ -271,7 +273,7 @@ data Config = Config {
, randomKey :: Rnd SerialisedKey
, randomValue :: Rnd SerialisedValue
, randomBlob :: Rnd SerialisedBlob
, mergeLevel :: !Merge.Level
, mergeType :: !MergeType
-- | Needs to be defined when generating mupserts.
, mergeMappend :: !(Maybe Mappend)
-- | Merging is done in chunks of @stepSize@ entries.
Expand All @@ -291,7 +293,7 @@ defaultConfig = Config {
, randomKey = error "randomKey not implemented"
, randomValue = error "randomValue not implemented"
, randomBlob = error "randomBlob not implemented"
, mergeLevel = Merge.MidLevel
, mergeType = MergeMidLevel
, mergeMappend = Nothing
, stepSize = maxBound -- by default, just do in one go
}
Expand Down
11 changes: 6 additions & 5 deletions src-extras/Database/LSMTree/Extras/Generators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import Database.LSMTree.Extras.Index (Append (..))
import Database.LSMTree.Extras.Orphans ()
import Database.LSMTree.Internal.BlobRef (BlobSpan (..))
import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Merge (MergeType (..))
import Database.LSMTree.Internal.Page (PageNo (..))
import Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Serialise
Expand Down Expand Up @@ -555,7 +555,8 @@ instance Arbitrary BlobSpan where
Merge
-------------------------------------------------------------------------------}

instance Arbitrary Merge.Level where
arbitrary = QC.elements [Merge.MidLevel, Merge.LastLevel]
shrink Merge.LastLevel = [Merge.MidLevel]
shrink Merge.MidLevel = []
instance Arbitrary MergeType where
arbitrary = QC.elements [MergeMidLevel, MergeLastLevel, MergeUnion]
shrink MergeMidLevel = []
shrink MergeLastLevel = [MergeMidLevel]
shrink MergeUnion = [MergeLastLevel]
6 changes: 3 additions & 3 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Database.LSMTree.Internal.CRC32C
import Database.LSMTree.Internal.Entry
import Database.LSMTree.Internal.Index.Compact
import Database.LSMTree.Internal.Index.CompactAcc
import Database.LSMTree.Internal.Merge hiding (Level)
import Database.LSMTree.Internal.Merge
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun
Expand Down Expand Up @@ -377,8 +377,8 @@ deriving stock instance Generic (Merge m h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Merge m h)

deriving stock instance Generic Merge.Level
deriving anyclass instance NoThunks Merge.Level
deriving stock instance Generic MergeType
deriving anyclass instance NoThunks MergeType

deriving stock instance Generic Merge.StepResult
deriving anyclass instance NoThunks Merge.StepResult
Expand Down
71 changes: 42 additions & 29 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-- concurrency primitive, such as an 'MVar'.
module Database.LSMTree.Internal.Merge (
Merge (..)
, Level (..)
, MergeType (..)
, Mappend
, MergeState (..)
, new
Expand Down Expand Up @@ -46,7 +46,7 @@ import System.FS.BlockIO.API (HasBlockIO)
-- Since we always resolve all entries of the same key in one go, there is no
-- need to store incompletely-resolved entries.
data Merge m h = Merge {
mergeLevel :: !Level
mergeType :: !MergeType
, mergeMappend :: !Mappend
, mergeReaders :: {-# UNPACK #-} !(Readers m h)
, mergeBuilder :: !(RunBuilder m h)
Expand All @@ -72,12 +72,25 @@ data MergeState =
-- | The merge was closed before it was completed.
| Closed

data Level = MidLevel | LastLevel
-- | Merges can either exist on a level of the LSM, or be a union merge of two
-- tables.
--
-- A last level merge behaves differently from a mid-level merge: last level
-- merges can actually remove delete operations, whereas mid-level merges must
-- preserve them.
--
-- Union merges follow the semantics of @Data.Map.unionWith (<>)@. Since the
-- input runs are semantically treated like @Data.Map@s, deletes are ignored
-- and inserts act like mupserts, so they need to be merged monoidally using
-- 'resolveValue'.
--
data MergeType = MergeMidLevel | MergeLastLevel | MergeUnion
deriving stock (Eq, Show)

instance NFData Level where
rnf MidLevel = ()
rnf LastLevel = ()
instance NFData MergeType where
rnf MergeMidLevel = ()
rnf MergeLastLevel = ()
rnf MergeUnion = ()

type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue

Expand All @@ -86,7 +99,7 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue
-> HasBlockIO IO h
-> RunDataCaching
-> RunBloomFilterAlloc
-> Level
-> MergeType
-> Mappend
-> Run.RunFsPaths
-> V.Vector (Ref (Run IO h))
Expand All @@ -99,12 +112,12 @@ new ::
-> HasBlockIO m h
-> RunDataCaching
-> RunBloomFilterAlloc
-> Level
-> MergeType
-> Mappend
-> Run.RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Maybe (Merge m h))
new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
new fs hbio mergeCaching alloc mergeType mergeMappend targetPaths runs = do
-- no offset, no write buffer
mreaders <- Readers.new Readers.NoOffsetKey Nothing runs
for mreaders $ \mergeReaders -> do
Expand Down Expand Up @@ -265,7 +278,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
handleEntry (n + 1) key entry
Readers.Drained -> do
-- no future entries, no previous entry to resolve, just write!
writeReaderEntry mergeLevel mergeBuilder key entry
writeReaderEntry mergeType mergeBuilder key entry
writeMutVar mergeState $! MergingDone
pure (n + 1, MergeDone)

Expand All @@ -277,7 +290,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
handleMupdate n key (Reader.appendOverflow len overflowPages v)
handleEntry !n !key entry = do
-- otherwise, we can just drop all following entries of same key
writeReaderEntry mergeLevel mergeBuilder key entry
writeReaderEntry mergeType mergeBuilder key entry
dropRemaining n key

-- the value is from a mupsert, complete (not just a prefix)
Expand All @@ -286,7 +299,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
if nextKey /= key
then do
-- resolved all entries for this key, write it
writeSerialisedEntry mergeLevel mergeBuilder key (Mupdate v)
writeSerialisedEntry mergeType mergeBuilder key (Mupdate v)
go n
else do
(_, nextEntry, hasMore) <- Readers.pop mergeReaders
Expand All @@ -301,10 +314,10 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
handleMupdate (n + 1) key v'
_ -> do
-- done with this key, now the remaining entries are obsolete
writeSerialisedEntry mergeLevel mergeBuilder key resolved
writeSerialisedEntry mergeType mergeBuilder key resolved
dropRemaining (n + 1) key
Readers.Drained -> do
writeSerialisedEntry mergeLevel mergeBuilder key resolved
writeSerialisedEntry mergeType mergeBuilder key resolved
writeMutVar mergeState $! MergingDone
pure (n + 1, MergeDone)

Expand All @@ -317,19 +330,19 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
pure (n + dropped, MergeDone)

{-# SPECIALISE writeReaderEntry ::
Level
MergeType
-> RunBuilder IO h
-> SerialisedKey
-> Reader.Entry IO h
-> IO () #-}
writeReaderEntry ::
(MonadSTM m, MonadST m, MonadThrow m)
=> Level
=> MergeType
-> RunBuilder m h
-> SerialisedKey
-> Reader.Entry m h
-> m ()
writeReaderEntry level builder key (Reader.Entry entryFull) =
writeReaderEntry mergeType builder key (Reader.Entry entryFull) =
-- Small entry.
-- Note that this small entry could be the only one on the page. We only
-- care about it being small, not single-entry, since it could still end
Expand All @@ -339,10 +352,10 @@ writeReaderEntry level builder key (Reader.Entry entryFull) =
-- entry of a page (which would for example happen a lot if most entries
-- have 2k-4k bytes). In that case we could have copied the RawPage
-- (but we find out too late to easily exploit it).
writeSerialisedEntry level builder key entryFull
writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ overflowPages)
writeSerialisedEntry mergeType builder key entryFull
writeReaderEntry mergeType builder key entry@(Reader.EntryOverflow prefix page _ overflowPages)
| InsertWithBlob {} <- prefix =
assert (shouldWriteEntry level prefix) $ do -- large, can't be delete
assert (shouldWriteEntry prefix mergeType) $ do -- large, can't be delete
-- has blob, we can't just copy the first page, fall back
-- we simply append the overflow pages to the value
Builder.addKeyOp builder key (Reader.toFullEntry entry)
Expand All @@ -352,30 +365,30 @@ writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ ove
-- 2. write a RawPage + SerialisedBlob + [RawOverflowPage], rewriting
-- the raw page's blob offset (slightly faster, but a bit hacky)
| otherwise =
assert (shouldWriteEntry level prefix) $ -- large, can't be delete
assert (shouldWriteEntry prefix mergeType) $ -- large, can't be delete
-- no blob, directly copy all pages as they are
Builder.addLargeSerialisedKeyOp builder key page overflowPages

{-# SPECIALISE writeSerialisedEntry ::
Level
MergeType
-> RunBuilder IO h
-> SerialisedKey
-> Entry SerialisedValue (RawBlobRef IO h)
-> IO () #-}
writeSerialisedEntry ::
(MonadSTM m, MonadST m, MonadThrow m)
=> Level
=> MergeType
-> RunBuilder m h
-> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m ()
writeSerialisedEntry level builder key entry =
when (shouldWriteEntry level entry) $
writeSerialisedEntry mergeType builder key entry =
when (shouldWriteEntry entry mergeType) $
Builder.addKeyOp builder key entry

-- One the last level we could also turn Mupdate into Insert,
-- but no need to complicate things.
shouldWriteEntry :: Level -> Entry v b -> Bool
shouldWriteEntry level = \case
Delete -> level == MidLevel
_ -> True
shouldWriteEntry :: Entry v b -> MergeType -> Bool
shouldWriteEntry Delete MergeLastLevel = False
shouldWriteEntry Delete MergeUnion = False
shouldWriteEntry _ _ = True
26 changes: 13 additions & 13 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
unNumEntries)
import Database.LSMTree.Internal.Index.Compact (IndexCompact)
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Merge (MergeType (..))
import Database.LSMTree.Internal.MergingRun (MergePolicyForLevel (..),
MergingRun, NumRuns (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
Expand Down Expand Up @@ -92,7 +92,7 @@ data MergeTrace =
RunDataCaching
RunBloomFilterAlloc
MergePolicyForLevel
Merge.Level
MergeType
| TraceCompletedMerge -- TODO: currently not traced for Incremental merges
NumEntries -- ^ Size of output run
RunNumber
Expand Down Expand Up @@ -572,7 +572,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
traceWith tr $ AtLevel ln TraceAddLevel
-- Make a new level
let policyForLevel = mergePolicyForLevel confMergePolicy ln V.empty
ir <- newMerge policyForLevel Merge.LastLevel ln rs
ir <- newMerge policyForLevel MergeLastLevel ln rs
return $! V.singleton $ Level ir V.empty
go !ln rs' (V.uncons -> Just (Level ir rs, ls)) = do
r <- expectCompletedMerge ln ir
Expand All @@ -588,7 +588,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
-- as a bundle and move them down to the level below. We start a merge
-- for the new incoming runs. This level is otherwise empty.
LevelTiering | levelIsFull confSizeRatio rs -> do
ir' <- newMerge LevelTiering Merge.MidLevel ln rs'
ir' <- newMerge LevelTiering MergeMidLevel ln rs'
ls' <- go (succ ln) (r `V.cons` rs) ls
pure $! Level ir' V.empty `V.cons` ls'
-- This tiering level is not yet full. We move the completed merged run
Expand All @@ -607,13 +607,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
-- empty) level .
LevelLevelling | Run.size r > maxRunSize' conf LevelLevelling ln -> do
assert (V.null rs && V.null ls) $ pure ()
ir' <- newMerge LevelTiering Merge.MidLevel ln rs'
ir' <- newMerge LevelTiering MergeMidLevel ln rs'
ls' <- go (succ ln) (V.singleton r) V.empty
pure $! Level ir' V.empty `V.cons` ls'
-- Otherwise we start merging the incoming runs into the run.
LevelLevelling -> do
assert (V.null rs && V.null ls) $ pure ()
ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r)
ir' <- newMerge LevelLevelling MergeLastLevel ln (rs' `V.snoc` r)
pure $! Level ir' V.empty `V.cons` V.empty

-- Releases the incoming run.
Expand All @@ -631,11 +631,11 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =

-- Releases the runs.
newMerge :: MergePolicyForLevel
-> Merge.Level
-> MergeType
-> LevelNo
-> V.Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge mergePolicy mergeLevel ln rs
newMerge mergePolicy mergeType ln rs
| Just (r, rest) <- V.uncons rs
, V.null rest = do
traceWith tr $ AtLevel ln $
Expand All @@ -654,11 +654,11 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
!alloc = bloomFilterAllocForLevel conf ln
!runPaths = Paths.runPath root (uniqueToRunNumber n)
traceWith tr $ AtLevel ln $
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeType
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- withRollback reg
(MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs)
(MR.new hfs hbio resolve caching alloc mergeType runPaths rs)
releaseRef
V.forM_ rs $ \r -> delayedCommit reg (releaseRef r)
case confMergeSchedule of
Expand Down Expand Up @@ -715,10 +715,10 @@ maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' config policy ln =
maxRunSize (confSizeRatio config) (confWriteBufferAlloc config) policy ln

mergeLastForLevel :: Levels m h -> Merge.Level
mergeLastForLevel :: Levels m h -> MergeType
mergeLastForLevel levels
| V.null levels = Merge.LastLevel
| otherwise = Merge.MidLevel
| V.null levels = MergeLastLevel
| otherwise = MergeMidLevel

levelIsFull :: SizeRatio -> V.Vector run -> Bool
levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr)
Expand Down
Loading

0 comments on commit 3e3cc52

Please sign in to comment.