diff --git a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs index 23a3e4adc..99286465f 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs @@ -18,6 +18,7 @@ import Database.LSMTree.Extras.RunData import Database.LSMTree.Extras.UTxO import Database.LSMTree.Internal.Entry import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.Paths (RunFsPaths (..), pathsForRunFiles, runChecksumsPath) import Database.LSMTree.Internal.Run (Run) @@ -106,7 +107,7 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [ { name = "word64-delete-x4-lastlevel" , nentries = totalEntries `splitInto` 4 , fdeletes = 1 - , mergeLevel = Merge.LastLevel + , mergeType = MergeLastLevel } -- different key and value sizes , benchMerge configWord64 @@ -157,21 +158,21 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Merge" [ , nentries = totalEntries `splitInto` 4 , finserts = 1 , fdeletes = 1 - , mergeLevel = Merge.LastLevel + , mergeType = MergeLastLevel } , benchMerge configUTxO { name = "utxo-x4+1-min-skewed-lastlevel" -- live levelling merge , nentries = totalEntries `distributed` [1, 1, 1, 1, 4] , finserts = 1 , fdeletes = 1 - , mergeLevel = Merge.LastLevel + , mergeType = MergeLastLevel } , benchMerge configUTxO { name = "utxo-x4+1-max-skewed-lastlevel" -- live levelling merge , nentries = totalEntries `distributed` [1, 1, 1, 1, 16] , finserts = 1 , fdeletes = 1 - , mergeLevel = Merge.LastLevel + , mergeType = MergeLastLevel } ] where @@ -233,7 +234,8 @@ merge :: merge fs hbio Config {..} targetPaths runs = do let f = fromMaybe const mergeMappend m <- fromMaybe (error "empty inputs, no merge created") <$> - Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) mergeLevel f targetPaths runs + Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) + mergeType f targetPaths runs Merge.stepsToCompletion m stepSize outputRunPaths :: Run.RunFsPaths @@ -271,7 +273,7 @@ data Config = Config { , randomKey :: Rnd SerialisedKey , randomValue :: Rnd SerialisedValue , randomBlob :: Rnd SerialisedBlob - , mergeLevel :: !Merge.Level + , mergeType :: !MergeType -- | Needs to be defined when generating mupserts. , mergeMappend :: !(Maybe Mappend) -- | Merging is done in chunks of @stepSize@ entries. @@ -291,7 +293,7 @@ defaultConfig = Config { , randomKey = error "randomKey not implemented" , randomValue = error "randomValue not implemented" , randomBlob = error "randomBlob not implemented" - , mergeLevel = Merge.MidLevel + , mergeType = MergeMidLevel , mergeMappend = Nothing , stepSize = maxBound -- by default, just do in one go } diff --git a/src-extras/Database/LSMTree/Extras/Generators.hs b/src-extras/Database/LSMTree/Extras/Generators.hs index 67f5cff85..232e55043 100644 --- a/src-extras/Database/LSMTree/Extras/Generators.hs +++ b/src-extras/Database/LSMTree/Extras/Generators.hs @@ -53,7 +53,7 @@ import Database.LSMTree.Extras.Index (Append (..)) import Database.LSMTree.Extras.Orphans () import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..)) -import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.Page (PageNo (..)) import Database.LSMTree.Internal.RawBytes as RB import Database.LSMTree.Internal.Serialise @@ -555,7 +555,8 @@ instance Arbitrary BlobSpan where Merge -------------------------------------------------------------------------------} -instance Arbitrary Merge.Level where - arbitrary = QC.elements [Merge.MidLevel, Merge.LastLevel] - shrink Merge.LastLevel = [Merge.MidLevel] - shrink Merge.MidLevel = [] +instance Arbitrary MergeType where + arbitrary = QC.elements [MergeMidLevel, MergeLastLevel, MergeUnion] + shrink MergeMidLevel = [] + shrink MergeLastLevel = [MergeMidLevel] + shrink MergeUnion = [MergeLastLevel] diff --git a/src-extras/Database/LSMTree/Extras/NoThunks.hs b/src-extras/Database/LSMTree/Extras/NoThunks.hs index b0ff43c18..f158edd7d 100644 --- a/src-extras/Database/LSMTree/Extras/NoThunks.hs +++ b/src-extras/Database/LSMTree/Extras/NoThunks.hs @@ -42,7 +42,7 @@ import Database.LSMTree.Internal.CRC32C import Database.LSMTree.Internal.Entry import Database.LSMTree.Internal.Index.Compact import Database.LSMTree.Internal.Index.CompactAcc -import Database.LSMTree.Internal.Merge hiding (Level) +import Database.LSMTree.Internal.Merge import qualified Database.LSMTree.Internal.Merge as Merge import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun @@ -377,8 +377,8 @@ deriving stock instance Generic (Merge m h) deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h) => NoThunks (Merge m h) -deriving stock instance Generic Merge.Level -deriving anyclass instance NoThunks Merge.Level +deriving stock instance Generic MergeType +deriving anyclass instance NoThunks MergeType deriving stock instance Generic Merge.StepResult deriving anyclass instance NoThunks Merge.StepResult diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index b779e5b6d..6b751648f 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -3,7 +3,7 @@ -- concurrency primitive, such as an 'MVar'. module Database.LSMTree.Internal.Merge ( Merge (..) - , Level (..) + , MergeType (..) , Mappend , MergeState (..) , new @@ -46,7 +46,7 @@ import System.FS.BlockIO.API (HasBlockIO) -- Since we always resolve all entries of the same key in one go, there is no -- need to store incompletely-resolved entries. data Merge m h = Merge { - mergeLevel :: !Level + mergeType :: !MergeType , mergeMappend :: !Mappend , mergeReaders :: {-# UNPACK #-} !(Readers m h) , mergeBuilder :: !(RunBuilder m h) @@ -72,12 +72,25 @@ data MergeState = -- | The merge was closed before it was completed. | Closed -data Level = MidLevel | LastLevel +-- | Merges can either exist on a level of the LSM, or be a union merge of two +-- tables. +-- +-- A last level merge behaves differently from a mid-level merge: last level +-- merges can actually remove delete operations, whereas mid-level merges must +-- preserve them. +-- +-- Union merges follow the semantics of @Data.Map.unionWith (<>)@. Since the +-- input runs are semantically treated like @Data.Map@s, deletes are ignored +-- and inserts act like mupserts, so they need to be merged monoidally using +-- 'resolveValue'. +-- +data MergeType = MergeMidLevel | MergeLastLevel | MergeUnion deriving stock (Eq, Show) -instance NFData Level where - rnf MidLevel = () - rnf LastLevel = () +instance NFData MergeType where + rnf MergeMidLevel = () + rnf MergeLastLevel = () + rnf MergeUnion = () type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue @@ -86,7 +99,7 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc - -> Level + -> MergeType -> Mappend -> Run.RunFsPaths -> V.Vector (Ref (Run IO h)) @@ -99,12 +112,12 @@ new :: -> HasBlockIO m h -> RunDataCaching -> RunBloomFilterAlloc - -> Level + -> MergeType -> Mappend -> Run.RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Maybe (Merge m h)) -new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do +new fs hbio mergeCaching alloc mergeType mergeMappend targetPaths runs = do -- no offset, no write buffer mreaders <- Readers.new Readers.NoOffsetKey Nothing runs for mreaders $ \mergeReaders -> do @@ -265,7 +278,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do handleEntry (n + 1) key entry Readers.Drained -> do -- no future entries, no previous entry to resolve, just write! - writeReaderEntry mergeLevel mergeBuilder key entry + writeReaderEntry mergeType mergeBuilder key entry writeMutVar mergeState $! MergingDone pure (n + 1, MergeDone) @@ -277,7 +290,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do handleMupdate n key (Reader.appendOverflow len overflowPages v) handleEntry !n !key entry = do -- otherwise, we can just drop all following entries of same key - writeReaderEntry mergeLevel mergeBuilder key entry + writeReaderEntry mergeType mergeBuilder key entry dropRemaining n key -- the value is from a mupsert, complete (not just a prefix) @@ -286,7 +299,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do if nextKey /= key then do -- resolved all entries for this key, write it - writeSerialisedEntry mergeLevel mergeBuilder key (Mupdate v) + writeSerialisedEntry mergeType mergeBuilder key (Mupdate v) go n else do (_, nextEntry, hasMore) <- Readers.pop mergeReaders @@ -301,10 +314,10 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do handleMupdate (n + 1) key v' _ -> do -- done with this key, now the remaining entries are obsolete - writeSerialisedEntry mergeLevel mergeBuilder key resolved + writeSerialisedEntry mergeType mergeBuilder key resolved dropRemaining (n + 1) key Readers.Drained -> do - writeSerialisedEntry mergeLevel mergeBuilder key resolved + writeSerialisedEntry mergeType mergeBuilder key resolved writeMutVar mergeState $! MergingDone pure (n + 1, MergeDone) @@ -317,19 +330,19 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do pure (n + dropped, MergeDone) {-# SPECIALISE writeReaderEntry :: - Level + MergeType -> RunBuilder IO h -> SerialisedKey -> Reader.Entry IO h -> IO () #-} writeReaderEntry :: (MonadSTM m, MonadST m, MonadThrow m) - => Level + => MergeType -> RunBuilder m h -> SerialisedKey -> Reader.Entry m h -> m () -writeReaderEntry level builder key (Reader.Entry entryFull) = +writeReaderEntry mergeType builder key (Reader.Entry entryFull) = -- Small entry. -- Note that this small entry could be the only one on the page. We only -- care about it being small, not single-entry, since it could still end @@ -339,10 +352,10 @@ writeReaderEntry level builder key (Reader.Entry entryFull) = -- entry of a page (which would for example happen a lot if most entries -- have 2k-4k bytes). In that case we could have copied the RawPage -- (but we find out too late to easily exploit it). - writeSerialisedEntry level builder key entryFull -writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ overflowPages) + writeSerialisedEntry mergeType builder key entryFull +writeReaderEntry mergeType builder key entry@(Reader.EntryOverflow prefix page _ overflowPages) | InsertWithBlob {} <- prefix = - assert (shouldWriteEntry level prefix) $ do -- large, can't be delete + assert (shouldWriteEntry prefix mergeType) $ do -- large, can't be delete -- has blob, we can't just copy the first page, fall back -- we simply append the overflow pages to the value Builder.addKeyOp builder key (Reader.toFullEntry entry) @@ -352,30 +365,30 @@ writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ ove -- 2. write a RawPage + SerialisedBlob + [RawOverflowPage], rewriting -- the raw page's blob offset (slightly faster, but a bit hacky) | otherwise = - assert (shouldWriteEntry level prefix) $ -- large, can't be delete + assert (shouldWriteEntry prefix mergeType) $ -- large, can't be delete -- no blob, directly copy all pages as they are Builder.addLargeSerialisedKeyOp builder key page overflowPages {-# SPECIALISE writeSerialisedEntry :: - Level + MergeType -> RunBuilder IO h -> SerialisedKey -> Entry SerialisedValue (RawBlobRef IO h) -> IO () #-} writeSerialisedEntry :: (MonadSTM m, MonadST m, MonadThrow m) - => Level + => MergeType -> RunBuilder m h -> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m () -writeSerialisedEntry level builder key entry = - when (shouldWriteEntry level entry) $ +writeSerialisedEntry mergeType builder key entry = + when (shouldWriteEntry entry mergeType) $ Builder.addKeyOp builder key entry -- One the last level we could also turn Mupdate into Insert, -- but no need to complicate things. -shouldWriteEntry :: Level -> Entry v b -> Bool -shouldWriteEntry level = \case - Delete -> level == MidLevel - _ -> True +shouldWriteEntry :: Entry v b -> MergeType -> Bool +shouldWriteEntry Delete MergeLastLevel = False +shouldWriteEntry Delete MergeUnion = False +shouldWriteEntry _ _ = True diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index f20d23dd2..33eeeb58f 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -46,7 +46,7 @@ import Database.LSMTree.Internal.Entry (Entry, NumEntries (..), unNumEntries) import Database.LSMTree.Internal.Index.Compact (IndexCompact) import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) -import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.MergingRun (MergePolicyForLevel (..), MergingRun, NumRuns (..)) import qualified Database.LSMTree.Internal.MergingRun as MR @@ -92,7 +92,7 @@ data MergeTrace = RunDataCaching RunBloomFilterAlloc MergePolicyForLevel - Merge.Level + MergeType | TraceCompletedMerge -- TODO: currently not traced for Incremental merges NumEntries -- ^ Size of output run RunNumber @@ -572,7 +572,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = traceWith tr $ AtLevel ln TraceAddLevel -- Make a new level let policyForLevel = mergePolicyForLevel confMergePolicy ln V.empty - ir <- newMerge policyForLevel Merge.LastLevel ln rs + ir <- newMerge policyForLevel MergeLastLevel ln rs return $! V.singleton $ Level ir V.empty go !ln rs' (V.uncons -> Just (Level ir rs, ls)) = do r <- expectCompletedMerge ln ir @@ -588,7 +588,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = -- as a bundle and move them down to the level below. We start a merge -- for the new incoming runs. This level is otherwise empty. LevelTiering | levelIsFull confSizeRatio rs -> do - ir' <- newMerge LevelTiering Merge.MidLevel ln rs' + ir' <- newMerge LevelTiering MergeMidLevel ln rs' ls' <- go (succ ln) (r `V.cons` rs) ls pure $! Level ir' V.empty `V.cons` ls' -- This tiering level is not yet full. We move the completed merged run @@ -607,13 +607,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = -- empty) level . LevelLevelling | Run.size r > maxRunSize' conf LevelLevelling ln -> do assert (V.null rs && V.null ls) $ pure () - ir' <- newMerge LevelTiering Merge.MidLevel ln rs' + ir' <- newMerge LevelTiering MergeMidLevel ln rs' ls' <- go (succ ln) (V.singleton r) V.empty pure $! Level ir' V.empty `V.cons` ls' -- Otherwise we start merging the incoming runs into the run. LevelLevelling -> do assert (V.null rs && V.null ls) $ pure () - ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r) + ir' <- newMerge LevelLevelling MergeLastLevel ln (rs' `V.snoc` r) pure $! Level ir' V.empty `V.cons` V.empty -- Releases the incoming run. @@ -631,11 +631,11 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = -- Releases the runs. newMerge :: MergePolicyForLevel - -> Merge.Level + -> MergeType -> LevelNo -> V.Vector (Ref (Run m h)) -> m (IncomingRun m h) - newMerge mergePolicy mergeLevel ln rs + newMerge mergePolicy mergeType ln rs | Just (r, rest) <- V.uncons rs , V.null rest = do traceWith tr $ AtLevel ln $ @@ -654,11 +654,11 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = !alloc = bloomFilterAllocForLevel conf ln !runPaths = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel ln $ - TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel + TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeType -- The runs will end up inside the merging run, with fresh references. -- The original references can be released (but only on the happy path). mr <- withRollback reg - (MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs) + (MR.new hfs hbio resolve caching alloc mergeType runPaths rs) releaseRef V.forM_ rs $ \r -> delayedCommit reg (releaseRef r) case confMergeSchedule of @@ -715,10 +715,10 @@ maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries maxRunSize' config policy ln = maxRunSize (confSizeRatio config) (confWriteBufferAlloc config) policy ln -mergeLastForLevel :: Levels m h -> Merge.Level +mergeLastForLevel :: Levels m h -> MergeType mergeLastForLevel levels - | V.null levels = Merge.LastLevel - | otherwise = Merge.MidLevel + | V.null levels = MergeLastLevel + | otherwise = MergeMidLevel levelIsFull :: SizeRatio -> V.Vector run -> Bool levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index 6e4dfac45..a2390f3e3 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -42,6 +42,7 @@ import Database.LSMTree.Internal.Entry (NumEntries (..), unNumEntries) import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import Database.LSMTree.Internal.Merge (Merge, StepResult (..)) import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.Paths (RunFsPaths (..)) import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run @@ -116,7 +117,7 @@ instance NFData MergeKnownCompleted where -> ResolveSerialisedValue -> Run.RunDataCaching -> RunBloomFilterAlloc - -> Merge.Level + -> MergeType -> RunFsPaths -> V.Vector (Ref (Run IO h)) -> IO (Ref (MergingRun IO h)) #-} @@ -134,16 +135,16 @@ new :: -> ResolveSerialisedValue -> Run.RunDataCaching -> RunBloomFilterAlloc - -> Merge.Level + -> MergeType -> RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Ref (MergingRun m h)) -new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns = +new hfs hbio resolve caching alloc mergeType runPaths inputRuns = -- If creating the Merge fails, we must release the references again. withActionRegistry $ \reg -> do runs <- V.mapM (\r -> withRollback reg (dupRef r) releaseRef) inputRuns merge <- fromMaybe (error "newMerge: merges can not be empty") - <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs + <$> Merge.new hfs hbio caching alloc mergeType resolve runPaths runs let numInputRuns = NumRuns $ V.length runs let numInputEntries = V.foldMap' Run.size runs spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 1f68221db..24cb16ab1 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -44,6 +44,7 @@ import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.Entry import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun (NumRuns (..)) import qualified Database.LSMTree.Internal.MergingRun as MR @@ -155,7 +156,7 @@ newtype UnspentCredits = UnspentCredits { getUnspentCredits :: Int } data SnapMergingRunState r = SnapCompletedMerge !r - | SnapOngoingMerge !(V.Vector r) !SpentCredits !Merge.Level + | SnapOngoingMerge !(V.Vector r) !SpentCredits !MergeType deriving stock (Show, Eq, Functor, Foldable, Traversable) instance NFData r => NFData (SnapMergingRunState r) where @@ -221,7 +222,7 @@ toSnapMergingRunState (MR.CompletedMerge r) = pure (SnapCompletedMerge r) -- work on snapshot load. toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m) = do spentCredits <- readPrimVar spentCreditsVar - pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeLevel m)) + pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeType m)) {------------------------------------------------------------------------------- Write Buffer @@ -452,10 +453,10 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve SnapCompletedMerge run -> withRollback reg (MR.newCompleted nr ne run) releaseRef - SnapOngoingMerge runs spentCredits lvl -> do + SnapOngoingMerge runs spentCredits mt -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc mr <- withRollback reg - (MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs) + (MR.new hfs hbio resolve caching alloc mt (mkPath rn) runs) releaseRef -- When a snapshot is created, merge progress is lost, so we -- have to redo merging work here. UnspentCredits and diff --git a/src/Database/LSMTree/Internal/Snapshot/Codec.hs b/src/Database/LSMTree/Internal/Snapshot/Codec.hs index b44ae2be3..bc178a446 100644 --- a/src/Database/LSMTree/Internal/Snapshot/Codec.hs +++ b/src/Database/LSMTree/Internal/Snapshot/Codec.hs @@ -31,7 +31,7 @@ import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.CRC32C import qualified Database.LSMTree.Internal.CRC32C as FS import Database.LSMTree.Internal.Entry -import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun (NumRuns (..)) import Database.LSMTree.Internal.Run (ChecksumError (..), @@ -547,16 +547,18 @@ instance Encode SpentCredits where instance DecodeVersioned SpentCredits where decodeVersioned V0 = SpentCredits <$> decodeInt - -- Merge.Level +-- MergeType -instance Encode Merge.Level where - encode Merge.MidLevel = encodeWord 0 - encode Merge.LastLevel = encodeWord 1 +instance Encode MergeType where + encode MergeMidLevel = encodeWord 0 + encode MergeLastLevel = encodeWord 1 + encode MergeUnion = encodeWord 2 -instance DecodeVersioned Merge.Level where +instance DecodeVersioned MergeType where decodeVersioned V0 = do tag <- decodeWord case tag of - 0 -> pure Merge.MidLevel - 1 -> pure Merge.LastLevel - _ -> fail ("[Merge.Level] Unexpected tag: " <> show tag) + 0 -> pure MergeMidLevel + 1 -> pure MergeLastLevel + 2 -> pure MergeUnion + _ -> fail ("[MergeType] Unexpected tag: " <> show tag) diff --git a/test/Test/Database/LSMTree/Internal/Merge.hs b/test/Test/Database/LSMTree/Internal/Merge.hs index e388001e4..92e411da0 100644 --- a/test/Test/Database/LSMTree/Internal/Merge.hs +++ b/test/Test/Database/LSMTree/Internal/Merge.hs @@ -17,6 +17,7 @@ import Database.LSMTree.Extras.RunData import qualified Database.LSMTree.Internal.BlobFile as BlobFile import qualified Database.LSMTree.Internal.Entry as Entry import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.PageAcc (entryWouldFitInPage) import Database.LSMTree.Internal.Paths (RunFsPaths (..), pathsForRunFiles) @@ -65,7 +66,7 @@ tests = testGroup "Test.Database.LSMTree.Internal.Merge" prop_MergeDistributes :: FS.HasFS IO h -> FS.HasBlockIO IO h -> - Merge.Level -> + MergeType -> StepSize -> SmallList (RunData KeyForIndexCompact SerialisedValue SerialisedBlob) -> IO Property @@ -137,11 +138,11 @@ prop_MergeDistributes fs hbio level stepSize (SmallList rds) = prop_AbortMerge :: FS.HasFS IO h -> FS.HasBlockIO IO h -> - Merge.Level -> + MergeType -> StepSize -> SmallList (RunData KeyForIndexCompact SerialisedValue SerialisedBlob) -> IO Property -prop_AbortMerge fs hbio level (Positive stepSize) (SmallList wbs) = +prop_AbortMerge fs hbio mergeType (Positive stepSize) (SmallList wbs) = withRuns fs hbio (V.fromList (zip (simplePaths [10..]) wbs')) $ \runs -> do let path0 = simplePath 0 mergeToClose <- makeInProgressMerge path0 runs @@ -156,7 +157,8 @@ prop_AbortMerge fs hbio level (Positive stepSize) (SmallList wbs) = wbs' = fmap serialiseRunData wbs makeInProgressMerge path runs = - Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) level mappendValues path runs >>= \case + Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) + mergeType mappendValues path runs >>= \case Nothing -> return Nothing -- not in progress Just merge -> do -- just do a few steps once, ideally not completing the merge @@ -176,13 +178,14 @@ type StepSize = Positive Int mergeRuns :: FS.HasFS IO h -> FS.HasBlockIO IO h -> - Merge.Level -> + MergeType -> RunNumber -> V.Vector (Ref (Run.Run IO h)) -> StepSize -> IO (Int, Ref (Run.Run IO h)) -mergeRuns fs hbio level runNumber runs (Positive stepSize) = do - Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) level mappendValues +mergeRuns fs hbio mergeType runNumber runs (Positive stepSize) = do + Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) + mergeType mappendValues (RunFsPaths (FS.mkFsPath []) runNumber) runs >>= \case Nothing -> (,) 0 <$> unsafeFlushAsWriteBuffer fs hbio (RunFsPaths (FS.mkFsPath []) runNumber) (RunData Map.empty) @@ -190,11 +193,11 @@ mergeRuns fs hbio level runNumber runs (Positive stepSize) = do type SerialisedEntry = Entry.Entry SerialisedValue SerialisedBlob -mergeWriteBuffers :: Merge.Level +mergeWriteBuffers :: MergeType -> [Map SerialisedKey SerialisedEntry] -> Map SerialisedKey SerialisedEntry -mergeWriteBuffers level = - (if level == Merge.LastLevel then Map.filter (not . isDelete) else id) +mergeWriteBuffers mergeType = + (if mergeType == MergeLastLevel then Map.filter (not . isDelete) else id) . Map.unionsWith (Entry.combine mappendValues) where isDelete Entry.Delete = True diff --git a/test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs b/test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs index e0d1c93fb..dd05faec7 100644 --- a/test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs +++ b/test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs @@ -14,7 +14,7 @@ import Data.Typeable import qualified Data.Vector as V import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.Entry -import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Merge (MergeType (..)) import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun import Database.LSMTree.Internal.RunNumber @@ -174,7 +174,7 @@ testAll test = [ , test (Proxy @UnspentCredits) , test (Proxy @(SnapMergingRunState RunNumber)) , test (Proxy @SpentCredits) - , test (Proxy @Merge.Level) + , test (Proxy @MergeType) ] {-------------------------------------------------------------------------------