diff --git a/src/Database/LSMTree.hs b/src/Database/LSMTree.hs index 56a2ba854..6bd5c0e25 100644 --- a/src/Database/LSMTree.hs +++ b/src/Database/LSMTree.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE MagicHash #-} + -- | This module is experimental. It is mainly used for testing purposes. -- -- See the 'Normal' and 'Monoidal' modules for documentation. @@ -102,7 +104,6 @@ import Control.DeepSeq import Control.Exception (throw) import Control.Monad import Data.Bifunctor (Bifunctor (..)) -import Data.Coerce (coerce) import Data.Kind (Type) import Data.Typeable (Proxy (..), eqT, type (:~:) (Refl)) import qualified Data.Vector as V @@ -113,6 +114,7 @@ import Database.LSMTree.Common (BlobRef (BlobRef), IOLike, Range (..), import qualified Database.LSMTree.Common as Common import qualified Database.LSMTree.Internal as Internal import qualified Database.LSMTree.Internal.BlobRef as Internal +import qualified Database.LSMTree.Internal.Config as Internal import qualified Database.LSMTree.Internal.Entry as Entry import qualified Database.LSMTree.Internal.RawBytes as RB import qualified Database.LSMTree.Internal.Serialise as Internal @@ -121,6 +123,7 @@ import qualified Database.LSMTree.Internal.Vector as V import Database.LSMTree.Monoidal (ResolveValue (..), resolveDeserialised, resolveValueAssociativity, resolveValueValidOutput) +import GHC.Exts {------------------------------------------------------------------------------- Tables @@ -140,7 +143,9 @@ withTable :: -> (Table m k v b -> m a) -> m a withTable (Internal.Session' sesh) conf action = - Internal.withTable sesh conf (action . Internal.Table') + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.withTable p sesh conf (action . Internal.Table') {-# SPECIALISE new :: Session IO @@ -151,7 +156,10 @@ new :: => Session m -> Common.TableConfig -> m (Table m k v b) -new (Internal.Session' sesh) conf = Internal.Table' <$> Internal.new sesh conf +new (Internal.Session' sesh) conf = + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.Table' <$> Internal.new p sesh conf {-# SPECIALISE close :: Table IO k v b @@ -481,15 +489,13 @@ createSnapshot label snap (Internal.Table' t) = void $ Internal.createSnapshot (resolve (Proxy @v)) snap label Internal.SnapFullTable t {-# SPECIALISE openSnapshot :: - ResolveValue v - => Session IO + Session IO -> Common.TableConfigOverride -> Common.SnapshotLabel -> SnapshotName -> IO (Table IO k v b ) #-} openSnapshot :: forall m k v b. ( IOLike m - , ResolveValue v ) => Session m -> Common.TableConfigOverride -- ^ Optional config override @@ -497,7 +503,14 @@ openSnapshot :: forall m k v b. -> SnapshotName -> m (Table m k v b) openSnapshot (Internal.Session' sesh) override label snap = - Internal.Table' <$!> Internal.openSnapshot sesh label Internal.SnapFullTable override snap (resolve (Proxy @v)) + Internal.openSnapshot + Internal.Table' + sesh + label + Internal.SnapFullTable + override + snap + const {------------------------------------------------------------------------------- Mutiple writable tables diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 717604be0..3f2bc2a76 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE MagicHash #-} -- | This module brings together the internal parts to provide an API in terms -- of untyped serialised keys, values and blobs. It makes no distinction between @@ -97,6 +98,7 @@ import qualified Database.LSMTree.Internal.BlobRef as BlobRef import Database.LSMTree.Internal.Config import qualified Database.LSMTree.Internal.Cursor as Cursor import Database.LSMTree.Internal.Entry (Entry, unNumEntries) +import Database.LSMTree.Internal.Index (IndexAcc (ResultingIndex)) import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy, ResolveSerialisedValue, lookupsIO) import Database.LSMTree.Internal.MergeSchedule @@ -115,6 +117,7 @@ import Database.LSMTree.Internal.Snapshot.Codec import Database.LSMTree.Internal.UniqCounter import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB +import GHC.Exts import qualified System.FS.API as FS import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS) import qualified System.FS.BlockIO.API as FS @@ -125,47 +128,50 @@ import System.FS.BlockIO.API (HasBlockIO) -------------------------------------------------------------------------------} type Session' :: (Type -> Type) -> Type -data Session' m = forall h. Typeable h => Session' !(Session m h) +data Session' m = forall h. Typeable h => + Session' !(Session m h) instance NFData (Session' m) where rnf (Session' s) = rnf s type Table' :: (Type -> Type) -> Type -> Type -> Type -> Type -data Table' m k v b = forall h. Typeable h => Table' (Table m h) +data Table' m k v b = forall h j. (Typeable h, IndexAcc j) => + Table' (Table m h j) instance NFData (Table' m k v b) where rnf (Table' t) = rnf t type Cursor' :: (Type -> Type) -> Type -> Type -> Type -> Type -data Cursor' m k v b = forall h. Typeable h => Cursor' (Cursor m h) +data Cursor' m k v b = forall h j. (Typeable h, IndexAcc j) => + Cursor' (Cursor m h j) instance NFData (Cursor' m k v b) where rnf (Cursor' t) = rnf t type NormalTable :: (Type -> Type) -> Type -> Type -> Type -> Type -data NormalTable m k v b = forall h. Typeable h => - NormalTable !(Table m h) +data NormalTable m k v b = forall h j. (Typeable h, IndexAcc j) => + NormalTable !(Table m h j) instance NFData (NormalTable m k v b) where rnf (NormalTable t) = rnf t type NormalCursor :: (Type -> Type) -> Type -> Type -> Type -> Type -data NormalCursor m k v b = forall h. Typeable h => - NormalCursor !(Cursor m h) +data NormalCursor m k v b = forall h j. (Typeable h, IndexAcc j) => + NormalCursor !(Cursor m h j) instance NFData (NormalCursor m k v b) where rnf (NormalCursor c) = rnf c type MonoidalTable :: (Type -> Type) -> Type -> Type -> Type -data MonoidalTable m k v = forall h. Typeable h => - MonoidalTable !(Table m h) +data MonoidalTable m k v = forall h j. (Typeable h, IndexAcc j) => + MonoidalTable !(Table m h j) instance NFData (MonoidalTable m k v) where rnf (MonoidalTable t) = rnf t type MonoidalCursor :: (Type -> Type) -> Type -> Type -> Type -data MonoidalCursor m k v = forall h. Typeable h => - MonoidalCursor !(Cursor m h) +data MonoidalCursor m k v = forall h j. (Typeable h, IndexAcc j) => + MonoidalCursor !(Cursor m h j) instance NFData (MonoidalCursor m k v) where rnf (MonoidalCursor c) = rnf c @@ -319,12 +325,33 @@ data SessionEnv m h = SessionEnv { -- * A table 'close' may delete its own identifier from the set of open -- tables without restrictions, even concurrently with 'closeSession'. -- This is safe because 'close' is idempotent'. - , sessionOpenTables :: !(StrictMVar m (Map Word64 (Table m h))) + , sessionOpenTables :: !(StrictMVar m (Map Word64 (SomeTable m h))) -- | Similarly to tables, open cursors are tracked so they can be closed -- once the session is closed. See 'sessionOpenTables'. - , sessionOpenCursors :: !(StrictMVar m (Map Word64 (Cursor m h))) + , sessionOpenCursors :: !(StrictMVar m (Map Word64 (SomeCursor m h))) } + +data SomeTable m h where + SomeTable :: forall m h j. Table m h j -> SomeTable m h + +{-# INLINE closeSomeTable #-} +closeSomeTable :: + (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) + => SomeTable m h + -> m () +closeSomeTable (SomeTable t) = close t + +data SomeCursor m h where + SomeCursor :: forall m h j. Cursor m h j -> SomeCursor m h + +{-# INLINE closeSomeCursor #-} +closeSomeCursor :: + (MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) + => SomeCursor m h + -> m () +closeSomeCursor (SomeCursor t) = closeCursor t + {-# INLINE withOpenSession #-} {-# SPECIALISE withOpenSession :: Session IO h @@ -503,9 +530,9 @@ closeSession Session{sessionState, sessionTracer} = do -- -- TODO: use TempRegistry cursors <- modifyMVar (sessionOpenCursors seshEnv) (\m -> pure (Map.empty, m)) - mapM_ closeCursor cursors + mapM_ closeSomeCursor cursors tables <- modifyMVar (sessionOpenTables seshEnv) (\m -> pure (Map.empty, m)) - mapM_ close tables + mapM_ closeSomeTable tables FS.close (sessionHasBlockIO seshEnv) FS.hUnlock (sessionLockFile seshEnv) pure SessionClosed @@ -517,18 +544,18 @@ closeSession Session{sessionState, sessionTracer} = do -- | A handle to an on-disk key\/value table. -- -- For more information, see 'Database.LSMTree.Normal.Table'. -data Table m h = Table { +data Table m h j = Table { tableConfig :: !TableConfig -- | The primary purpose of this 'RWVar' is to ensure consistent views of -- the open-/closedness of a table when multiple threads require access to -- the table's fields (see 'withOpenTable'). We use more fine-grained -- synchronisation for various mutable parts of an open table. - , tableState :: !(RWVar m (TableState m h)) + , tableState :: !(RWVar m (TableState m h j)) , tableArenaManager :: !(ArenaManager (PrimState m)) , tableTracer :: !(Tracer m TableTrace) } -instance NFData (Table m h) where +instance NFData (Table m h j) where rnf (Table a b c d) = rnf a `seq` rnf b `seq` rnf c `seq` rwhnf d @@ -536,11 +563,11 @@ instance NFData (Table m h) where -- long as the table is open. A session's global resources, and therefore -- resources that are inherited by the table, will only be released once the -- session is sure that no tables are open anymore. -data TableState m h = - TableOpen !(TableEnv m h) +data TableState m h j = + TableOpen !(TableEnv m h j) | TableClosed -data TableEnv m h = TableEnv { +data TableEnv m h j = TableEnv { -- === Session-inherited -- | The session that this table belongs to. @@ -562,34 +589,34 @@ data TableEnv m h = TableEnv { -- waiting for the MVar. -- -- TODO: switch to more fine-grained synchronisation approach - , tableContent :: !(RWVar m (TableContent m h)) + , tableContent :: !(RWVar m (TableContent m h j)) } {-# INLINE tableSessionRoot #-} -- | Inherited from session for ease of access. -tableSessionRoot :: TableEnv m h -> SessionRoot +tableSessionRoot :: TableEnv m h j -> SessionRoot tableSessionRoot = sessionRoot . tableSessionEnv {-# INLINE tableHasFS #-} -- | Inherited from session for ease of access. -tableHasFS :: TableEnv m h -> HasFS m h +tableHasFS :: TableEnv m h j -> HasFS m h tableHasFS = sessionHasFS . tableSessionEnv {-# INLINE tableHasBlockIO #-} -- | Inherited from session for ease of access. -tableHasBlockIO :: TableEnv m h -> HasBlockIO m h +tableHasBlockIO :: TableEnv m h j -> HasBlockIO m h tableHasBlockIO = sessionHasBlockIO . tableSessionEnv {-# INLINE tableSessionUniqCounter #-} -- | Inherited from session for ease of access. -tableSessionUniqCounter :: TableEnv m h -> UniqCounter m +tableSessionUniqCounter :: TableEnv m h j -> UniqCounter m tableSessionUniqCounter = sessionUniqCounter . tableSessionEnv {-# INLINE tableSessionUntrackTable #-} -{-# SPECIALISE tableSessionUntrackTable :: TableEnv IO h -> IO () #-} +{-# SPECIALISE tableSessionUntrackTable :: TableEnv IO h j -> IO () #-} -- | Open tables are tracked in the corresponding session, so when a table is -- closed it should become untracked (forgotten). -tableSessionUntrackTable :: MonadMVar m => TableEnv m h -> m () +tableSessionUntrackTable :: MonadMVar m => TableEnv m h j -> m () tableSessionUntrackTable thEnv = modifyMVar_ (sessionOpenTables (tableSessionEnv thEnv)) $ pure . Map.delete (tableId thEnv) @@ -599,13 +626,13 @@ tableSessionUntrackTable thEnv = -- NOTE: any operation except 'close' can use this function. {-# INLINE withOpenTable #-} {-# SPECIALISE withOpenTable :: - Table IO h - -> (TableEnv IO h -> IO a) + Table IO h j + -> (TableEnv IO h j -> IO a) -> IO a #-} withOpenTable :: (MonadSTM m, MonadThrow m) - => Table m h - -> (TableEnv m h -> m a) + => Table m h j + -> (TableEnv m h j -> m a) -> m a withOpenTable t action = RW.withReadAccess (tableState t) $ \case TableClosed -> throwIO ErrTableClosed @@ -616,30 +643,34 @@ withOpenTable t action = RW.withReadAccess (tableState t) $ \case -- {-# SPECIALISE withTable :: - Session IO h + Proxy# j + -> Session IO h -> TableConfig - -> (Table IO h -> IO a) + -> (Table IO h j -> IO a) -> IO a #-} -- | See 'Database.LSMTree.Normal.withTable'. withTable :: (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) - => Session m h + => Proxy# j + -> Session m h -> TableConfig - -> (Table m h -> m a) + -> (Table m h j -> m a) -> m a -withTable sesh conf = bracket (new sesh conf) close +withTable p sesh conf = bracket (new p sesh conf) close {-# SPECIALISE new :: - Session IO h + Proxy# j + -> Session IO h -> TableConfig - -> IO (Table IO h) #-} + -> IO (Table IO h j) #-} -- | See 'Database.LSMTree.Normal.new'. new :: - (MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) - => Session m h + forall m h j. (MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) + => Proxy# j + -> Session m h -> TableConfig - -> m (Table m h) -new sesh conf = do + -> m (Table m h j) +new _ sesh conf = do traceWith (sessionTracer sesh) TraceNewTable withOpenSession sesh $ \seshEnv -> withTempRegistry $ \reg -> do @@ -652,7 +683,7 @@ new sesh conf = do releaseRef let tableWriteBuffer = WB.empty tableLevels = V.empty - tableCache <- mkLevelsCache reg tableLevels + tableCache <- mkLevelsCache @_ @_ @j reg tableLevels let tc = TableContent { tableWriteBuffer , tableWriteBufferBlobs @@ -667,8 +698,8 @@ new sesh conf = do -> SessionEnv IO h -> TableConfig -> ArenaManager RealWorld - -> TableContent IO h - -> IO (Table IO h) #-} + -> TableContent IO h j + -> IO (Table IO h j) #-} newWith :: (MonadSTM m, MonadMVar m) => TempRegistry m @@ -676,8 +707,8 @@ newWith :: -> SessionEnv m h -> TableConfig -> ArenaManager (PrimState m) - -> TableContent m h - -> m (Table m h) + -> TableContent m h j + -> m (Table m h j) newWith reg sesh seshEnv conf !am !tc = do tableId <- incrUniqCounter (sessionUniqCounter seshEnv) let tr = TraceTable (uniqueToWord64 tableId) `contramap` sessionTracer sesh @@ -696,14 +727,14 @@ newWith reg sesh seshEnv conf !am !tc = do let !t = Table conf tableVar am tr -- Track the current table freeTemp reg $ modifyMVar_ (sessionOpenTables seshEnv) - $ pure . Map.insert (uniqueToWord64 tableId) t + $ pure . Map.insert (uniqueToWord64 tableId) (SomeTable t) pure $! t -{-# SPECIALISE close :: Table IO h -> IO () #-} +{-# SPECIALISE close :: Table IO h j -> IO () #-} -- | See 'Database.LSMTree.Normal.close'. close :: (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) - => Table m h + => Table m h j -> m () close t = do traceWith (tableTracer t) TraceCloseTable @@ -722,16 +753,17 @@ close t = do pure TableClosed {-# SPECIALISE lookups :: - ResolveSerialisedValue + IndexAcc j + => ResolveSerialisedValue -> V.Vector SerialisedKey - -> Table IO h + -> Table IO h j -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-} -- | See 'Database.LSMTree.Normal.lookups'. lookups :: - (MonadST m, MonadSTM m, MonadThrow m) + (MonadST m, MonadSTM m, MonadThrow m, IndexAcc j) => ResolveSerialisedValue -> V.Vector SerialisedKey - -> Table m h + -> Table m h j -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))) lookups resolve ks t = do traceWith (tableTracer t) $ TraceLookups (V.length ks) @@ -751,17 +783,18 @@ lookups resolve ks t = do ks {-# SPECIALISE rangeLookup :: - ResolveSerialisedValue + IndexAcc j + => ResolveSerialisedValue -> Range SerialisedKey - -> Table IO h + -> Table IO h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | See 'Database.LSMTree.Normal.rangeLookup'. rangeLookup :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => ResolveSerialisedValue -> Range SerialisedKey - -> Table m h + -> Table m h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) @@ -791,18 +824,19 @@ rangeLookup resolve range t fromEntry = do else return (V.concat (reverse (V.slice 0 n chunk : chunks))) {-# SPECIALISE updates :: - ResolveSerialisedValue + IndexAcc j + => ResolveSerialisedValue -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> Table IO h + -> Table IO h j -> IO () #-} -- | See 'Database.LSMTree.Normal.updates'. -- -- Does not enforce that mupsert and blobs should not occur in the same table. updates :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => ResolveSerialisedValue -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> Table m h + -> Table m h j -> m () updates resolve es t = do traceWith (tableTracer t) $ TraceUpdates (V.length es) @@ -853,21 +887,21 @@ retrieveBlobs sesh wrefs = -- -- The representation of a cursor is similar to that of a 'Table', but -- simpler, as it is read-only. -data Cursor m h = Cursor { +data Cursor m h j = Cursor { -- | Mutual exclusion, only a single thread can read from a cursor at a -- given time. - cursorState :: !(StrictMVar m (CursorState m h)) + cursorState :: !(StrictMVar m (CursorState m h j)) , cursorTracer :: !(Tracer m CursorTrace) } -instance NFData (Cursor m h) where +instance NFData (Cursor m h j) where rnf (Cursor a b) = rwhnf a `seq` rwhnf b -data CursorState m h = - CursorOpen !(CursorEnv m h) +data CursorState m h j = + CursorOpen !(CursorEnv m h j) | CursorClosed -- ^ Calls to a closed cursor raise an exception. -data CursorEnv m h = CursorEnv { +data CursorEnv m h j = CursorEnv { -- === Session-inherited -- | The session that this cursor belongs to. @@ -904,7 +938,7 @@ data CursorEnv m h = CursorEnv { -- | The runs held open by the cursor. We must release these references -- when the cursor gets closed. - , cursorRuns :: !(V.Vector (Ref (Run m h))) + , cursorRuns :: !(V.Vector (Ref (Run m h (ResultingIndex j)))) -- | The write buffer blobs, which like the runs, we have to keep open -- untile the cursor is closed. @@ -912,29 +946,31 @@ data CursorEnv m h = CursorEnv { } {-# SPECIALISE withCursor :: - OffsetKey - -> Table IO h - -> (Cursor IO h -> IO a) + IndexAcc j + => OffsetKey + -> Table IO h j + -> (Cursor IO h j -> IO a) -> IO a #-} -- | See 'Database.LSMTree.Normal.withCursor'. withCursor :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => OffsetKey - -> Table m h - -> (Cursor m h -> m a) + -> Table m h j + -> (Cursor m h j -> m a) -> m a withCursor offsetKey t = bracket (newCursor offsetKey t) closeCursor {-# SPECIALISE newCursor :: - OffsetKey - -> Table IO h - -> IO (Cursor IO h) #-} + IndexAcc j + => OffsetKey + -> Table IO h j + -> IO (Cursor IO h j) #-} -- | See 'Database.LSMTree.Normal.newCursor'. newCursor :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => OffsetKey - -> Table m h - -> m (Cursor m h) + -> Table m h j + -> m (Cursor m h j) newCursor !offsetKey t = withOpenTable t $ \thEnv -> do let cursorSession = tableSession thEnv let cursorSessionEnv = tableSessionEnv thEnv @@ -962,7 +998,7 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- successfully, i.e. using 'freeTemp'. freeTemp reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ - pure . Map.insert cursorId cursor + pure . Map.insert cursorId (SomeCursor cursor) pure $! cursor where -- The table contents escape the read access, but we just duplicate @@ -977,11 +1013,11 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do allocateTemp reg (dupRef r) releaseRef pure (wb, wbblobs', runs') -{-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-} +{-# SPECIALISE closeCursor :: Cursor IO h j -> IO () #-} -- | See 'Database.LSMTree.Normal.closeCursor'. closeCursor :: (MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) - => Cursor m h + => Cursor m h j -> m () closeCursor Cursor {..} = do traceWith cursorTracer $ TraceCloseCursor @@ -1004,16 +1040,16 @@ closeCursor Cursor {..} = do {-# SPECIALISE readCursor :: ResolveSerialisedValue -> Int - -> Cursor IO h + -> Cursor IO h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | See 'Database.LSMTree.Normal.readCursor'. readCursor :: - forall m h res. + forall m h j res. (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => ResolveSerialisedValue -> Int -- ^ Maximum number of entries to read - -> Cursor m h + -> Cursor m h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) @@ -1024,7 +1060,7 @@ readCursor resolve n cursor fromEntry = ResolveSerialisedValue -> (SerialisedKey -> Bool) -> Int - -> Cursor IO h + -> Cursor IO h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res) -> IO (V.Vector res) #-} -- | @readCursorWhile _ p n cursor _@ reads elements until either: @@ -1036,12 +1072,12 @@ readCursor resolve n cursor fromEntry = -- Consequently, once a call returned fewer than @n@ elements, any subsequent -- calls with the same predicate @p@ will return an empty vector. readCursorWhile :: - forall m h res. + forall m h j res. (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => ResolveSerialisedValue -> (SerialisedKey -> Bool) -- ^ Only read as long as this predicate holds -> Int -- ^ Maximum number of entries to read - -> Cursor m h + -> Cursor m h j -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res) -- ^ How to map to a query result, different for normal/monoidal -> m (V.Vector res) @@ -1067,20 +1103,21 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do -------------------------------------------------------------------------------} {-# SPECIALISE createSnapshot :: - ResolveSerialisedValue + IndexAcc j + => ResolveSerialisedValue -> SnapshotName -> SnapshotLabel -> SnapshotTableType - -> Table IO h + -> Table IO h j -> IO () #-} -- | See 'Database.LSMTree.Normal.createSnapshot''. createSnapshot :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => ResolveSerialisedValue -> SnapshotName -> SnapshotLabel -> SnapshotTableType - -> Table m h + -> Table m h j -> m () createSnapshot resolve snap label tableType t = do traceWith (tableTracer t) $ TraceSnapshot snap @@ -1139,24 +1176,26 @@ createSnapshot resolve snap label tableType t = do writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData {-# SPECIALISE openSnapshot :: - Session IO h + (forall j. IndexAcc j => Table IO h j -> a) + -> Session IO h -> SnapshotLabel -> SnapshotTableType -> TableConfigOverride -> SnapshotName -> ResolveSerialisedValue - -> IO (Table IO h) #-} + -> IO a #-} -- | See 'Database.LSMTree.Normal.openSnapshot'. openSnapshot :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) - => Session m h + => (forall j. IndexAcc j => Table m h j -> a) + -> Session m h -> SnapshotLabel -- ^ Expected label -> SnapshotTableType -- ^ Expected table type -> TableConfigOverride -- ^ Optional config override -> SnapshotName -> ResolveSerialisedValue - -> m (Table m h) -openSnapshot sesh label tableType override snap resolve = do + -> m a +openSnapshot mkExistential sesh label tableType override snap resolve = do traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override withOpenSession sesh $ \seshEnv -> do withTempRegistry $ \reg -> do @@ -1174,35 +1213,38 @@ openSnapshot sesh label tableType override snap resolve = do Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap) Right x -> pure x - let SnapshotMetaData label' tableType' conf snapLevels = snapMetaData + let SnapshotMetaData label' tableType' conf snapLevels = + snapMetaData - unless (tableType == tableType') $ - throwIO (ErrSnapshotWrongTableType snap tableType tableType') + case someFencePointerIndex (confFencePointerIndex conf) of + SomeFencePointerIndex (_ :: Proxy# j) -> do + unless (tableType == tableType') $ + throwIO (ErrSnapshotWrongTableType snap tableType tableType') - unless (label == label') $ - throwIO (ErrSnapshotWrongLabel snap label label') + unless (label == label') $ + throwIO (ErrSnapshotWrongLabel snap label label') - let conf' = applyOverride override conf - am <- newArenaManager - blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> - incrUniqCounter (sessionUniqCounter seshEnv) - tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath) - releaseRef - - let actDir = Paths.activeDir (sessionRoot seshEnv) - - -- Hard link runs into the active directory, - snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels - -- Convert from the snapshot format, restoring merge progress in the process - tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels' - - tableCache <- mkLevelsCache reg tableLevels - newWith reg sesh seshEnv conf' am $! TableContent { - tableWriteBuffer = WB.empty - , tableWriteBufferBlobs - , tableLevels - , tableCache - } + let conf' = applyOverride override conf + am <- newArenaManager + blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> + incrUniqCounter (sessionUniqCounter seshEnv) + tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath) + releaseRef + + let actDir = Paths.activeDir (sessionRoot seshEnv) + + -- Hard link runs into the active directory, + snapLevels' <- openRuns @_ @_ @(ResultingIndex j) reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels + -- Convert from the snapshot format, restoring merge progress in the process + tableLevels <- fromSnapLevels @_ @_ @j reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels' + + tableCache <- mkLevelsCache reg tableLevels + mkExistential <$> (newWith reg sesh seshEnv conf' am $! TableContent { + tableWriteBuffer = WB.empty + , tableWriteBufferBlobs + , tableLevels + , tableCache + }) {-# SPECIALISE deleteSnapshot :: Session IO h @@ -1254,12 +1296,12 @@ listSnapshots sesh = do Mutiple writable tables -------------------------------------------------------------------------------} -{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-} +{-# SPECIALISE duplicate :: Table IO h j -> IO (Table IO h j) #-} -- | See 'Database.LSMTree.Normal.duplicate'. duplicate :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) - => Table m h - -> m (Table m h) + => Table m h j + -> m (Table m h j) duplicate t@Table{..} = do traceWith tableTracer TraceDuplicate withOpenTable t $ \TableEnv{..} -> do diff --git a/src/Database/LSMTree/Internal/ChecksumHandle.hs b/src/Database/LSMTree/Internal/ChecksumHandle.hs index 104d5e1a3..87525846d 100644 --- a/src/Database/LSMTree/Internal/ChecksumHandle.hs +++ b/src/Database/LSMTree/Internal/ChecksumHandle.hs @@ -36,8 +36,8 @@ import qualified Database.LSMTree.Internal.Chunk as Chunk (toByteString) import Database.LSMTree.Internal.CRC32C (CRC32C) import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry -import Database.LSMTree.Internal.Index.Compact (IndexCompact) -import qualified Database.LSMTree.Internal.Index.Compact as Index +import Database.LSMTree.Internal.Index (Index) +import qualified Database.LSMTree.Internal.Index as Index import Database.LSMTree.Internal.Paths (ForBlob (..), ForFilter (..), ForIndex (..), ForKOps (..)) import qualified Database.LSMTree.Internal.RawBytes as RB @@ -205,15 +205,16 @@ writeFilter hfs filterHandle bf = writeToHandle hfs (unForFilter filterHandle) (bloomFilterToLBS bf) {-# SPECIALISE writeIndexHeader :: - HasFS IO h + Index i + => HasFS IO h -> ForIndex (ChecksumHandle RealWorld h) - -> Proxy# IndexCompact + -> Proxy# i -> IO () #-} writeIndexHeader :: - (MonadSTM m, PrimMonad m) + (MonadSTM m, PrimMonad m, Index i) => HasFS m h -> ForIndex (ChecksumHandle (PrimState m) h) - -> Proxy# IndexCompact + -> Proxy# i -> m () writeIndexHeader hfs indexHandle indexTypeProxy = writeToHandle hfs (unForIndex indexHandle) $ @@ -235,17 +236,18 @@ writeIndexChunk hfs indexHandle chunk = BSL.fromStrict $ Chunk.toByteString chunk {-# SPECIALISE writeIndexFinal :: - HasFS IO h + Index i + => HasFS IO h -> ForIndex (ChecksumHandle RealWorld h) -> NumEntries - -> IndexCompact + -> i -> IO () #-} writeIndexFinal :: - (MonadSTM m, PrimMonad m) + (MonadSTM m, PrimMonad m, Index i) => HasFS m h -> ForIndex (ChecksumHandle (PrimState m) h) -> NumEntries - -> IndexCompact + -> i -> m () writeIndexFinal hfs indexHandle numEntries index = writeToHandle hfs (unForIndex indexHandle) $ diff --git a/src/Database/LSMTree/Internal/Config.hs b/src/Database/LSMTree/Internal/Config.hs index ff0e7f771..69bfba7ef 100644 --- a/src/Database/LSMTree/Internal/Config.hs +++ b/src/Database/LSMTree/Internal/Config.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE MagicHash #-} + module Database.LSMTree.Internal.Config ( LevelNo (..) -- * Table configuration @@ -21,6 +23,8 @@ module Database.LSMTree.Internal.Config ( , bloomFilterAllocForLevel -- * Fence pointer index , FencePointerIndex (..) + , SomeFencePointerIndex (..) + , someFencePointerIndex -- * Disk cache policy , DiskCachePolicy (..) , diskCachePolicyForLevel @@ -36,8 +40,14 @@ import Data.Word (Word64) import Database.LSMTree.Internal.Assertions (assert, fromIntegralChecked) import Database.LSMTree.Internal.Entry (NumEntries (..)) +import Database.LSMTree.Internal.Index +import Database.LSMTree.Internal.Index.CompactAcc as IndexCompactAcc + (IndexCompactAcc) +import Database.LSMTree.Internal.Index.OrdinaryAcc as IndexOrdinaryAcc + (IndexOrdinaryAcc) import Database.LSMTree.Internal.Run (RunDataCaching (..)) import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..)) +import GHC.Exts import qualified Monkey newtype LevelNo = LevelNo Int @@ -281,9 +291,6 @@ bloomFilterAllocForLevel conf (LevelNo l) = -------------------------------------------------------------------------------} -- | Configure the type of fence pointer index. --- --- TODO: this configuration option currently has no effect: 'CompactIndex' is --- always used. data FencePointerIndex = -- | Use a compact fence pointer index. -- @@ -301,7 +308,8 @@ data FencePointerIndex = -- to test this law. CompactIndex -- | Use an ordinary fence pointer index, without any constraints on - -- serialised keys. + -- serialised keys other than that their serialised forms may not be 64 KiB + -- or more in size. | OrdinaryIndex deriving stock (Show, Eq) @@ -309,6 +317,13 @@ instance NFData FencePointerIndex where rnf CompactIndex = () rnf OrdinaryIndex = () +data SomeFencePointerIndex where + SomeFencePointerIndex :: forall j. IndexAcc j => Proxy# j -> SomeFencePointerIndex + +someFencePointerIndex :: FencePointerIndex -> SomeFencePointerIndex +someFencePointerIndex CompactIndex = SomeFencePointerIndex (proxy# @IndexCompactAcc) +someFencePointerIndex OrdinaryIndex = SomeFencePointerIndex (proxy# @IndexOrdinaryAcc) + {------------------------------------------------------------------------------- Disk cache policy -------------------------------------------------------------------------------} diff --git a/src/Database/LSMTree/Internal/Index.hs b/src/Database/LSMTree/Internal/Index.hs index a22a98161..5b4e81c86 100644 --- a/src/Database/LSMTree/Internal/Index.hs +++ b/src/Database/LSMTree/Internal/Index.hs @@ -7,8 +7,22 @@ -} module Database.LSMTree.Internal.Index ( - Index (search, sizeInPages, headerLBS, finalLBS, fromSBS), - IndexAcc (ResultingIndex, appendSingle, appendMulti, unsafeEnd) + Index + ( + search, + sizeInPages, + headerLBS, + finalLBS, + fromSBS + ), + IndexAcc + ( + ResultingIndex, + newWithDefaults, + appendSingle, + appendMulti, + unsafeEnd + ) ) where @@ -100,6 +114,9 @@ class Index (ResultingIndex j) => IndexAcc j where -- | The type of indexes constructed by accumulators of a certain type type ResultingIndex j + -- | Create a new index accumulator with a default configuration. + newWithDefaults :: ST s (j s) + {-| Adds information about a single page that fully comprises one or more key–value pairs to an index and outputs newly available chunks. diff --git a/src/Database/LSMTree/Internal/Index/CompactAcc.hs b/src/Database/LSMTree/Internal/Index/CompactAcc.hs index 795d628b8..3361597f8 100644 --- a/src/Database/LSMTree/Internal/Index/CompactAcc.hs +++ b/src/Database/LSMTree/Internal/Index/CompactAcc.hs @@ -47,7 +47,7 @@ import Database.LSMTree.Internal.BitMath import Database.LSMTree.Internal.Chunk (Chunk) import Database.LSMTree.Internal.Index (IndexAcc, ResultingIndex) import qualified Database.LSMTree.Internal.Index as Index (appendMulti, - appendSingle, unsafeEnd) + appendSingle, newWithDefaults, unsafeEnd) import Database.LSMTree.Internal.Index.Compact import Database.LSMTree.Internal.Page import Database.LSMTree.Internal.Serialise @@ -257,6 +257,9 @@ instance IndexAcc IndexCompactAcc where type ResultingIndex IndexCompactAcc = IndexCompact + newWithDefaults :: ST s (IndexCompactAcc s) + newWithDefaults = new 1024 + appendSingle :: (SerialisedKey, SerialisedKey) -> IndexCompactAcc s -> ST s (Maybe Chunk) diff --git a/src/Database/LSMTree/Internal/Index/OrdinaryAcc.hs b/src/Database/LSMTree/Internal/Index/OrdinaryAcc.hs index b8b454237..8428b4171 100644 --- a/src/Database/LSMTree/Internal/Index/OrdinaryAcc.hs +++ b/src/Database/LSMTree/Internal/Index/OrdinaryAcc.hs @@ -26,7 +26,7 @@ import Data.Word (Word16, Word32, Word8) import Database.LSMTree.Internal.Chunk (Baler, Chunk, createBaler, feedBaler, unsafeEndBaler) import Database.LSMTree.Internal.Index - (IndexAcc (ResultingIndex, appendMulti, appendSingle, unsafeEnd)) + (IndexAcc (ResultingIndex, appendMulti, appendSingle, newWithDefaults, unsafeEnd)) import Database.LSMTree.Internal.Index.Ordinary (IndexOrdinary (IndexOrdinary)) import Database.LSMTree.Internal.Serialise @@ -73,6 +73,9 @@ instance IndexAcc IndexOrdinaryAcc where type ResultingIndex IndexOrdinaryAcc = IndexOrdinary + newWithDefaults :: ST s (IndexOrdinaryAcc s) + newWithDefaults = new 1024 4096 + appendSingle :: (SerialisedKey, SerialisedKey) -> IndexOrdinaryAcc s -> ST s (Maybe Chunk) diff --git a/src/Database/LSMTree/Internal/Lookup.hs b/src/Database/LSMTree/Internal/Lookup.hs index c69d0a8c3..448cb6fd2 100644 --- a/src/Database/LSMTree/Internal/Lookup.hs +++ b/src/Database/LSMTree/Internal/Lookup.hs @@ -41,8 +41,8 @@ import Control.RefCount import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..)) import Database.LSMTree.Internal.Entry +import Database.LSMTree.Internal.Index (Index) import qualified Database.LSMTree.Internal.Index as Index (search) -import Database.LSMTree.Internal.Index.Compact (IndexCompact) import Database.LSMTree.Internal.Page (PageSpan (..), getNumPages, pageSpanSize, unPageNo) import Database.LSMTree.Internal.RawBytes (RawBytes (..)) @@ -70,9 +70,10 @@ import Database.LSMTree.Internal.BloomFilterQuery1 (RunIxKeyIx (..), -- both of which are the same length. The indexes record the run and key -- associated with each 'IOOp'. prepLookups :: - Arena s + Index i + => Arena s -> V.Vector (Bloom SerialisedKey) - -> V.Vector IndexCompact + -> V.Vector i -> V.Vector (Handle h) -> V.Vector SerialisedKey -> ST s (VP.Vector RunIxKeyIx, V.Vector (IOOp s h)) @@ -89,8 +90,9 @@ type RunIx = Int -- @VP.Vector RunIxKeyIx@ argument, because index searching always returns a -- positive search result. indexSearches - :: Arena s - -> V.Vector IndexCompact + :: Index i + => Arena s + -> V.Vector i -> V.Vector (Handle h) -> V.Vector SerialisedKey -> VP.Vector RunIxKeyIx -- ^ Result of 'bloomQueries' @@ -155,14 +157,15 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy { deriving anyclass (Exception) {-# SPECIALIZE lookupsIO :: - HasBlockIO IO h + Index i + => HasBlockIO IO h -> ArenaManager RealWorld -> ResolveSerialisedValue -> WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) - -> V.Vector (Ref (Run IO h)) + -> V.Vector (Ref (Run IO h i)) -> V.Vector (Bloom SerialisedKey) - -> V.Vector IndexCompact + -> V.Vector i -> V.Vector (Handle h) -> V.Vector SerialisedKey -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) @@ -174,15 +177,15 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy { -- PRECONDITION: the vectors of bloom filters, indexes and file handles -- should pointwise match with the vectors of runs. lookupsIO :: - forall m h. (MonadThrow m, MonadST m) + forall m h i. (MonadThrow m, MonadST m, Index i) => HasBlockIO m h -> ArenaManager (PrimState m) -> ResolveSerialisedValue -> WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs m h) - -> V.Vector (Ref (Run m h)) -- ^ Runs @rs@ + -> V.Vector (Ref (Run m h i)) -- ^ Runs @rs@ -> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@ - -> V.Vector IndexCompact -- ^ The indexes inside @rs@ + -> V.Vector i -- ^ The indexes inside @rs@ -> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@ -> V.Vector SerialisedKey -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))) @@ -205,7 +208,7 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks ResolveSerialisedValue -> WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) - -> V.Vector (Ref (Run IO h)) + -> V.Vector (Ref (Run IO h i)) -> V.Vector SerialisedKey -> VP.Vector RunIxKeyIx -> V.Vector (IOOp RealWorld h) @@ -220,11 +223,11 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks -- key in multiple runs. -- intraPageLookups :: - forall m h. (PrimMonad m, MonadThrow m) + forall m h i. (PrimMonad m, MonadThrow m) => ResolveSerialisedValue -> WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs m h) - -> V.Vector (Ref (Run m h)) + -> V.Vector (Ref (Run m h i)) -> V.Vector SerialisedKey -> VP.Vector RunIxKeyIx -> V.Vector (IOOp (PrimState m) h) diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index a5f30903c..c208b59d7 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -21,12 +21,14 @@ import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow) import Control.Monad.Primitive (PrimState) +import Control.Monad.ST.Strict (RealWorld, ST) import Control.RefCount import Data.Primitive.MutVar import Data.Traversable (for) import qualified Data.Vector as V import Database.LSMTree.Internal.BlobRef (RawBlobRef) import Database.LSMTree.Internal.Entry +import Database.LSMTree.Internal.Index (IndexAcc (ResultingIndex)) import Database.LSMTree.Internal.Run (Run, RunDataCaching) import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..)) @@ -44,11 +46,11 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- Since we always resolve all entries of the same key in one go, there is no -- need to store incompletely-resolved entries. -data Merge m h = Merge { +data Merge m h j = Merge { mergeLevel :: !Level , mergeMappend :: !Mappend , mergeReaders :: {-# UNPACK #-} !(Readers m h) - , mergeBuilder :: !(RunBuilder m h) + , mergeBuilder :: !(RunBuilder m h j) -- | The caching policy to use for the output Run. , mergeCaching :: !RunDataCaching -- | The result of the latest call to 'steps'. This is used to determine @@ -77,35 +79,38 @@ data Level = MidLevel | LastLevel type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue {-# SPECIALISE new :: - HasFS IO h + IndexAcc j + => HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc + -> ST RealWorld (j RealWorld) -> Level -> Mappend -> Run.RunFsPaths - -> V.Vector (Ref (Run IO h)) - -> IO (Maybe (Merge IO h)) #-} + -> V.Vector (Ref (Run IO h (ResultingIndex j))) + -> IO (Maybe (Merge IO h j)) #-} -- | Returns 'Nothing' if no input 'Run' contains any entries. -- The list of runs should be sorted from new to old. new :: - (MonadMask m, MonadSTM m, MonadST m) + (MonadMask m, MonadSTM m, MonadST m, IndexAcc j) => HasFS m h -> HasBlockIO m h -> RunDataCaching -> RunBloomFilterAlloc + -> ST (PrimState m) (j (PrimState m)) -> Level -> Mappend -> Run.RunFsPaths - -> V.Vector (Ref (Run m h)) - -> m (Maybe (Merge m h)) -new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do + -> V.Vector (Ref (Run m h (ResultingIndex j))) + -> m (Maybe (Merge m h j)) +new fs hbio mergeCaching alloc newIndex mergeLevel mergeMappend targetPaths runs = do -- no offset, no write buffer mreaders <- Readers.new Readers.NoOffsetKey Nothing runs for mreaders $ \mergeReaders -> do -- calculate upper bounds based on input runs let numEntries = V.foldMap' Run.size runs - mergeBuilder <- Builder.new fs hbio targetPaths numEntries alloc + mergeBuilder <- Builder.new fs hbio targetPaths numEntries alloc newIndex mergeState <- newMutVar $! Merging return Merge { mergeHasFS = fs @@ -113,13 +118,13 @@ new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do , .. } -{-# SPECIALISE abort :: Merge IO (FS.Handle h) -> IO () #-} +{-# SPECIALISE abort :: Merge IO (FS.Handle h) j -> IO () #-} -- | This function should be called when discarding a 'Merge' before it -- was done (i.e. returned 'MergeComplete'). This removes the incomplete files -- created for the new run so far and avoids leaking file handles. -- -- Once it has been called, do not use the 'Merge' any more! -abort :: (MonadMask m, MonadSTM m, MonadST m) => Merge m h -> m () +abort :: (MonadMask m, MonadSTM m, MonadST m) => Merge m h j -> m () abort Merge {..} = do readMutVar mergeState >>= \case Merging -> do @@ -135,8 +140,9 @@ abort Merge {..} = do writeMutVar mergeState $! Closed {-# SPECIALISE complete :: - Merge IO h - -> IO (Ref (Run IO h)) #-} + IndexAcc j + => Merge IO h j + -> IO (Ref (Run IO h (ResultingIndex j))) #-} -- | Complete a 'Merge', returning a new 'Run' as the result of merging the -- input runs. -- @@ -154,9 +160,9 @@ abort Merge {..} = do -- leak. And it must eventually be released with 'releaseRef'. -- complete :: - (MonadSTM m, MonadST m, MonadMask m) - => Merge m h - -> m (Ref (Run m h)) + (MonadSTM m, MonadST m, MonadMask m, IndexAcc j) + => Merge m h j + -> m (Ref (Run m h (ResultingIndex j))) complete Merge{..} = do readMutVar mergeState >>= \case Merging -> error "complete: Merge is not done" @@ -169,17 +175,18 @@ complete Merge{..} = do Closed -> error "complete: Merge is closed" {-# SPECIALISE stepsToCompletion :: - Merge IO h + IndexAcc j + => Merge IO h j -> Int - -> IO (Ref (Run IO h)) #-} + -> IO (Ref (Run IO h (ResultingIndex j))) #-} -- | Like 'steps', but calling 'complete' once the merge is finished. -- -- Note: run with async exceptions masked. See 'complete'. stepsToCompletion :: - (MonadMask m, MonadSTM m, MonadST m) - => Merge m h + (MonadMask m, MonadSTM m, MonadST m, IndexAcc j) + => Merge m h j -> Int - -> m (Ref (Run m h)) + -> m (Ref (Run m h (ResultingIndex j))) stepsToCompletion m stepBatchSize = go where go = do @@ -188,17 +195,18 @@ stepsToCompletion m stepBatchSize = go (_, MergeDone) -> complete m {-# SPECIALISE stepsToCompletionCounted :: - Merge IO h + IndexAcc j + => Merge IO h j -> Int - -> IO (Int, Ref (Run IO h)) #-} + -> IO (Int, Ref (Run IO h (ResultingIndex j))) #-} -- | Like 'steps', but calling 'complete' once the merge is finished. -- -- Note: run with async exceptions masked. See 'complete'. stepsToCompletionCounted :: - (MonadMask m, MonadSTM m, MonadST m) - => Merge m h + (MonadMask m, MonadSTM m, MonadST m, IndexAcc j) + => Merge m h j -> Int - -> m (Int, Ref (Run m h)) + -> m (Int, Ref (Run m h (ResultingIndex j))) stepsToCompletionCounted m stepBatchSize = go 0 where go !stepsSum = do @@ -216,7 +224,8 @@ stepsInvariant requestedSteps = \case _ -> True {-# SPECIALISE steps :: - Merge IO h + IndexAcc j + => Merge IO h j -> Int -> IO (Int, StepResult) #-} -- | Do at least a given number of steps of merging. Each step reads a single @@ -229,9 +238,9 @@ stepsInvariant requestedSteps = \case -- -- Returns an error if the merge was already completed or closed. steps :: - forall m h. - (MonadMask m, MonadSTM m, MonadST m) - => Merge m h + forall m h j. + (MonadMask m, MonadSTM m, MonadST m, IndexAcc j) + => Merge m h j -> Int -- ^ How many input entries to consume (at least) -> m (Int, StepResult) steps Merge {..} requestedSteps = assertStepsInvariant <$> do @@ -312,15 +321,16 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do pure (n + dropped, MergeDone) {-# SPECIALISE writeReaderEntry :: - Level - -> RunBuilder IO h + IndexAcc j + => Level + -> RunBuilder IO h j -> SerialisedKey -> Reader.Entry IO h -> IO () #-} writeReaderEntry :: - (MonadSTM m, MonadST m, MonadThrow m) + (MonadSTM m, MonadST m, MonadThrow m, IndexAcc j) => Level - -> RunBuilder m h + -> RunBuilder m h j -> SerialisedKey -> Reader.Entry m h -> m () @@ -352,15 +362,16 @@ writeReaderEntry level builder key entry@(Reader.EntryOverflow prefix page _ ove Builder.addLargeSerialisedKeyOp builder key page overflowPages {-# SPECIALISE writeSerialisedEntry :: - Level - -> RunBuilder IO h + IndexAcc j + => Level + -> RunBuilder IO h j -> SerialisedKey -> Entry SerialisedValue (RawBlobRef IO h) -> IO () #-} writeSerialisedEntry :: - (MonadSTM m, MonadST m, MonadThrow m) + (MonadSTM m, MonadST m, MonadThrow m, IndexAcc j) => Level - -> RunBuilder m h + -> RunBuilder m h j -> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m () diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index a6bf2058b..368aa08b5 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -51,6 +51,7 @@ import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError), MonadMask, MonadThrow (..)) import Control.Monad.Primitive +import Control.Monad.ST.Strict (ST) import Control.RefCount import Control.TempRegistry import Control.Tracer @@ -63,7 +64,8 @@ import Database.LSMTree.Internal.Assertions (assert) import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.Entry (Entry, NumEntries (..), unNumEntries) -import Database.LSMTree.Internal.Index.Compact (IndexCompact) +import Database.LSMTree.Internal.Index + (IndexAcc (ResultingIndex, newWithDefaults)) import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import Database.LSMTree.Internal.Merge (Merge, StepResult (..)) import qualified Database.LSMTree.Internal.Merge as Merge @@ -124,34 +126,34 @@ data MergeTrace = Table content -------------------------------------------------------------------------------} -data TableContent m h = TableContent { +data TableContent m h j = TableContent { --TODO: probably less allocation to make this a MutVar tableWriteBuffer :: !WriteBuffer -- | The blob storage for entries in the write buffer , tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h)) -- | A hierarchy of levels. The vector indexes double as level numbers. - , tableLevels :: !(Levels m h) + , tableLevels :: !(Levels m h j) -- | Cache of flattened 'levels'. - , tableCache :: !(LevelsCache m h) + , tableCache :: !(LevelsCache m h (ResultingIndex j)) } -{-# SPECIALISE duplicateTableContent :: TempRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} +{-# SPECIALISE duplicateTableContent :: TempRegistry IO -> TableContent IO h j -> IO (TableContent IO h j) #-} duplicateTableContent :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m - -> TableContent m h - -> m (TableContent m h) + -> TableContent m h j + -> m (TableContent m h j) duplicateTableContent reg (TableContent wb wbb levels cache) = do wbb' <- allocateTemp reg (dupRef wbb) releaseRef levels' <- duplicateLevels reg levels cache' <- duplicateLevelsCache reg cache return $! TableContent wb wbb' levels' cache' -{-# SPECIALISE releaseTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-} +{-# SPECIALISE releaseTableContent :: TempRegistry IO -> TableContent IO h j -> IO () #-} releaseTableContent :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m - -> TableContent m h + -> TableContent m h j -> m () releaseTableContent reg (TableContent _wb wbb levels cache) = do freeTemp reg (releaseRef wbb) @@ -176,25 +178,25 @@ releaseTableContent reg (TableContent _wb wbb levels cache) = do -- lookups. This does mean that a cache can keep runs open for longer than -- necessary, so caches should be rebuilt using, e.g., 'rebuildCache', in a -- timely manner. -data LevelsCache m h = LevelsCache_ { - cachedRuns :: !(V.Vector (Ref (Run m h))) +data LevelsCache m h i = LevelsCache_ { + cachedRuns :: !(V.Vector (Ref (Run m h i))) , cachedFilters :: !(V.Vector (Bloom SerialisedKey)) - , cachedIndexes :: !(V.Vector IndexCompact) + , cachedIndexes :: !(V.Vector i) , cachedKOpsFiles :: !(V.Vector (FS.Handle h)) } {-# SPECIALISE mkLevelsCache :: TempRegistry IO - -> Levels IO h - -> IO (LevelsCache IO h) #-} + -> Levels IO h j + -> IO (LevelsCache IO h (ResultingIndex j)) #-} -- | Flatten the argument 'Level's into a single vector of runs, including all -- runs that are inputs to an ongoing merge. Use that to populate the -- 'LevelsCache'. The cache will take a reference for each of its runs. mkLevelsCache :: - forall m h. (PrimMonad m, MonadMVar m, MonadMask m) + forall m h j. (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> Levels m h - -> m (LevelsCache m h) + -> Levels m h j + -> m (LevelsCache m h (ResultingIndex j)) mkLevelsCache reg lvls = do rs <- foldRunAndMergeM (fmap V.singleton . dupRun) @@ -213,9 +215,9 @@ mkLevelsCache reg lvls = do -- going to need this in the end. We might get rid of the LevelsCache. foldRunAndMergeM :: Monoid a - => (Ref (Run m h) -> m a) - -> (Ref (MergingRun m h) -> m a) - -> Levels m h + => (Ref (Run m h (ResultingIndex j)) -> m a) + -> (Ref (MergingRun m h j) -> m a) + -> Levels m h j -> m a foldRunAndMergeM k1 k2 ls = fmap fold $ V.forM ls $ \(Level ir rs) -> do @@ -226,9 +228,9 @@ mkLevelsCache reg lvls = do {-# SPECIALISE rebuildCache :: TempRegistry IO - -> LevelsCache IO h - -> Levels IO h - -> IO (LevelsCache IO h) #-} + -> LevelsCache IO h i + -> Levels IO h j + -> IO (LevelsCache IO h (ResultingIndex j)) #-} -- | Remove references to runs in the old cache, and create a new cache with -- fresh references taken for the runs in the new levels. -- @@ -252,22 +254,22 @@ mkLevelsCache reg lvls = do rebuildCache :: (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> LevelsCache m h -- ^ old cache - -> Levels m h -- ^ new levels - -> m (LevelsCache m h) -- ^ new cache + -> LevelsCache m h i -- ^ old cache + -> Levels m h j -- ^ new levels + -> m (LevelsCache m h (ResultingIndex j)) -- ^ new cache rebuildCache reg oldCache newLevels = do releaseLevelsCache reg oldCache mkLevelsCache reg newLevels {-# SPECIALISE duplicateLevelsCache :: TempRegistry IO - -> LevelsCache IO h - -> IO (LevelsCache IO h) #-} + -> LevelsCache IO h i + -> IO (LevelsCache IO h i) #-} duplicateLevelsCache :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m - -> LevelsCache m h - -> m (LevelsCache m h) + -> LevelsCache m h i + -> m (LevelsCache m h i) duplicateLevelsCache reg cache = do rs' <- V.forM (cachedRuns cache) $ \r -> allocateTemp reg (dupRef r) releaseRef @@ -275,12 +277,12 @@ duplicateLevelsCache reg cache = do {-# SPECIALISE releaseLevelsCache :: TempRegistry IO - -> LevelsCache IO h + -> LevelsCache IO h i -> IO () #-} releaseLevelsCache :: (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> LevelsCache m h + -> LevelsCache m h i -> m () releaseLevelsCache reg cache = V.forM_ (cachedRuns cache) $ \r -> @@ -290,23 +292,23 @@ releaseLevelsCache reg cache = Levels, runs and ongoing merges -------------------------------------------------------------------------------} -type Levels m h = V.Vector (Level m h) +type Levels m h j = V.Vector (Level m h j) -- | Runs in order from newer to older -data Level m h = Level { - incomingRun :: !(IncomingRun m h) - , residentRuns :: !(V.Vector (Ref (Run m h))) +data Level m h j = Level { + incomingRun :: !(IncomingRun m h j) + , residentRuns :: !(V.Vector (Ref (Run m h (ResultingIndex j)))) } -- | An incoming run is either a single run, or a merge. -data IncomingRun m h = - Single !(Ref (Run m h)) - | Merging !(Ref (MergingRun m h)) +data IncomingRun m h j = + Single !(Ref (Run m h (ResultingIndex j))) + | Merging !(Ref (MergingRun m h j)) -- | A merging of multiple runs. -- -- TODO: Move to a separate module. -data MergingRun m h = MergingRun { +data MergingRun m h j = MergingRun { mergePolicy :: !MergePolicyForLevel , mergeNumRuns :: !NumRuns -- | Sum of number of entries in the input runs @@ -320,12 +322,12 @@ data MergingRun m h = MergingRun { -- completed, otherwise if 'MergeMaybeCompleted' we have to check the -- 'MergingRunState'. , mergeKnownCompleted :: !(MutVar (PrimState m) MergeKnownCompleted) - , mergeState :: !(StrictMVar m (MergingRunState m h)) + , mergeState :: !(StrictMVar m (MergingRunState m h j)) , mergeRefCounter :: !(RefCounter m) } -instance RefCounted (MergingRun m h) where - type FinaliserM (MergingRun m h) = m +instance RefCounted (MergingRun m h j) where + type FinaliserM (MergingRun m h j) = m getRefCounter = mergeRefCounter {-# SPECIALISE newMergingRun :: @@ -333,8 +335,8 @@ instance RefCounted (MergingRun m h) where -> NumRuns -> NumEntries -> MergeKnownCompleted - -> MergingRunState IO h - -> IO (Ref (MergingRun IO h)) + -> MergingRunState IO h j + -> IO (Ref (MergingRun IO h j)) #-} -- | This allows constructing ill-formed MergingRuns, but the flexibility is -- needed for creating a merging run that is already Completed, as well as @@ -351,8 +353,8 @@ newMergingRun :: -> NumRuns -> NumEntries -> MergeKnownCompleted - -> MergingRunState m h - -> m (Ref (MergingRun m h)) + -> MergingRunState m h j + -> m (Ref (MergingRun m h j)) newMergingRun mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0 mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0 @@ -385,8 +387,8 @@ newMergingRun mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do duplicateMergingRunRuns :: (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> Ref (MergingRun m h) - -> m (V.Vector (Ref (Run m h))) + -> Ref (MergingRun m h j) + -> m (V.Vector (Ref (Run m h (ResultingIndex j)))) duplicateMergingRunRuns reg (DeRef mr) = -- We take the references while holding the MVar to make sure the MergingRun -- does not get completed concurrently before we are done. @@ -399,7 +401,7 @@ duplicateMergingRunRuns reg (DeRef mr) = data MergePolicyForLevel = LevelTiering | LevelLevelling deriving stock (Show, Eq) -mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel +mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h j -> MergePolicyForLevel mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | n == 1 , V.null nextLevels @@ -412,16 +414,16 @@ newtype NumRuns = NumRuns { unNumRuns :: Int } newtype UnspentCreditsVar s = UnspentCreditsVar { getUnspentCreditsVar :: PrimVar s Int } -data MergingRunState m h = +data MergingRunState m h j = CompletedMerge - !(Ref (Run m h)) + !(Ref (Run m h (ResultingIndex j))) -- ^ Output run | OngoingMerge - !(V.Vector (Ref (Run m h))) + !(V.Vector (Ref (Run m h (ResultingIndex j)))) -- ^ Input runs !(SpentCreditsVar (PrimState m)) -- ^ The total number of spent credits. - !(Merge m h) + !(Merge m h j) newtype TotalStepsVar s = TotalStepsVar { getTotalStepsVar :: PrimVar s Int } @@ -430,12 +432,12 @@ newtype SpentCreditsVar s = SpentCreditsVar { getSpentCreditsVar :: PrimVar s In data MergeKnownCompleted = MergeKnownCompleted | MergeMaybeCompleted deriving stock (Show, Eq, Read) -{-# SPECIALISE duplicateLevels :: TempRegistry IO -> Levels IO h -> IO (Levels IO h) #-} +{-# SPECIALISE duplicateLevels :: TempRegistry IO -> Levels IO h j -> IO (Levels IO h j) #-} duplicateLevels :: (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> Levels m h - -> m (Levels m h) + -> Levels m h j + -> m (Levels m h j) duplicateLevels reg levels = V.forM levels $ \Level {incomingRun, residentRuns} -> do incomingRun' <- duplicateIncomingRun reg incomingRun @@ -446,39 +448,39 @@ duplicateLevels reg levels = residentRuns = residentRuns' } -{-# SPECIALISE releaseLevels :: TempRegistry IO -> Levels IO h -> IO () #-} +{-# SPECIALISE releaseLevels :: TempRegistry IO -> Levels IO h j -> IO () #-} releaseLevels :: (PrimMonad m, MonadMVar m, MonadMask m) => TempRegistry m - -> Levels m h + -> Levels m h j -> m () releaseLevels reg levels = V.forM_ levels $ \Level {incomingRun, residentRuns} -> do releaseIncomingRun reg incomingRun V.mapM_ (freeTemp reg . releaseRef) residentRuns -{-# SPECIALISE duplicateIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} +{-# SPECIALISE duplicateIncomingRun :: TempRegistry IO -> IncomingRun IO h j -> IO (IncomingRun IO h j) #-} duplicateIncomingRun :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m - -> IncomingRun m h - -> m (IncomingRun m h) + -> IncomingRun m h j + -> m (IncomingRun m h j) duplicateIncomingRun reg (Single r) = Single <$> allocateTemp reg (dupRef r) releaseRef duplicateIncomingRun reg (Merging mr) = Merging <$> allocateTemp reg (dupRef mr) releaseRef -{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-} +{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h j -> IO () #-} releaseIncomingRun :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m - -> IncomingRun m h -> m () + -> IncomingRun m h j -> m () releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) releaseIncomingRun reg (Merging mr) = freeTemp reg (releaseRef mr) -{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-} -iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m () +{-# SPECIALISE iforLevelM_ :: Levels IO h j -> (LevelNo -> Level IO h j -> IO ()) -> IO () #-} +iforLevelM_ :: Monad m => Levels m h j -> (LevelNo -> Level m h j -> m ()) -> m () iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl {------------------------------------------------------------------------------- @@ -486,7 +488,8 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -------------------------------------------------------------------------------} {-# SPECIALISE updatesWithInterleavedFlushes :: - Tracer IO (AtLevel MergeTrace) + IndexAcc j + => Tracer IO (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS IO h @@ -495,8 +498,8 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> TempRegistry IO - -> TableContent IO h - -> IO (TableContent IO h) #-} + -> TableContent IO h j + -> IO (TableContent IO h j) #-} -- | A single batch of updates can fill up the write buffer multiple times. We -- flush the write buffer each time it fills up before trying to fill it up -- again. @@ -522,8 +525,8 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -- and write those to disk. Of course, any remainder that did not fit into a -- whole run should then end up in a fresh write buffer. updatesWithInterleavedFlushes :: - forall m h. - (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) + forall m h j. + (MonadMask m, MonadMVar m, MonadSTM m, MonadST m, IndexAcc j) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue @@ -533,8 +536,8 @@ updatesWithInterleavedFlushes :: -> UniqCounter m -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> TempRegistry m - -> TableContent m h - -> m (TableContent m h) + -> TableContent m h j + -> m (TableContent m h j) updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do let wb = tableWriteBuffer tc wbblobs = tableWriteBufferBlobs tc @@ -604,7 +607,8 @@ addWriteBufferEntries hfs f wbblobs maxn = {-# SPECIALISE flushWriteBuffer :: - Tracer IO (AtLevel MergeTrace) + IndexAcc j + => Tracer IO (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS IO h @@ -612,14 +616,14 @@ addWriteBufferEntries hfs f wbblobs maxn = -> SessionRoot -> UniqCounter IO -> TempRegistry IO - -> TableContent IO h - -> IO (TableContent IO h) #-} + -> TableContent IO h j + -> IO (TableContent IO h j) #-} -- | Flush the write buffer to disk, regardless of whether it is full or not. -- -- The returned table content contains an updated set of levels, where the write -- buffer is inserted into level 1. flushWriteBuffer :: - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + forall m h j. (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue @@ -628,8 +632,8 @@ flushWriteBuffer :: -> SessionRoot -> UniqCounter m -> TempRegistry m - -> TableContent m h - -> m (TableContent m h) + -> TableContent m h j + -> m (TableContent m h j) flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} resolve hfs hbio root uc reg tc | WB.null (tableWriteBuffer tc) = pure tc @@ -641,10 +645,12 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} !alloc = bloomFilterAllocForLevel conf l !path = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc + r <- allocateTemp reg (Run.fromWriteBuffer hfs hbio cache alloc + (newWithDefaults @j) path (tableWriteBuffer tc) (tableWriteBufferBlobs tc)) @@ -668,17 +674,17 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} -- NOTE: @_levelsInvariant@ is based on the @ScheduledMerges.invariant@ -- prototype. See @ScheduledMerges.invariant@ for documentation about the merge -- algorithm. -_levelsInvariant :: forall m h. TableConfig -> Levels m h -> ST (PrimState m) Bool +_levelsInvariant :: forall m h. TableConfig j -> Levels m h j -> ST (PrimState m) Bool _levelsInvariant conf levels = go (LevelNo 1) levels >>= \ !_ -> pure True where sr = confSizeRatio conf wba = confWriteBufferAlloc conf - go :: LevelNo -> Levels m h -> ST (PrimState m) () + go :: LevelNo -> Levels m h j -> ST (PrimState m) () go !_ (V.uncons -> Nothing) = pure () - go !ln (V.uncons -> Just (Level mr rs, ls)) = do + go !ln (V.uncons -> Just (Level j mr rs, ls)) = do mrs <- case mr of SingleRun r -> pure $ CompletedMerge r MergingRun var -> readMutVar var @@ -742,24 +748,25 @@ _levelsInvariant conf levels = -} {-# SPECIALISE addRunToLevels :: - Tracer IO (AtLevel MergeTrace) + IndexAcc j + => Tracer IO (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO - -> Ref (Run IO h) + -> Ref (Run IO h (ResultingIndex j)) -> TempRegistry IO - -> Levels IO h - -> IO (Levels IO h) #-} + -> Levels IO h j + -> IO (Levels IO h j) #-} -- | Add a run to the levels, and propagate merges. -- -- NOTE: @go@ is based on the @ScheduledMerges.increment@ prototype. See @ScheduledMerges.increment@ -- for documentation about the merge algorithm. addRunToLevels :: - forall m h. - (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) + forall m h j. + (MonadMask m, MonadMVar m, MonadST m, MonadSTM m, IndexAcc j) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue @@ -767,10 +774,10 @@ addRunToLevels :: -> HasBlockIO m h -> SessionRoot -> UniqCounter m - -> Ref (Run m h) + -> Ref (Run m h (ResultingIndex j)) -> TempRegistry m - -> Levels m h - -> m (Levels m h) + -> Levels m h j + -> m (Levels m h j) addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = do ls' <- go (LevelNo 1) (V.singleton r0) levels {- TODO: re-enable @@ -830,8 +837,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r) pure $! Level ir' V.empty `V.cons` V.empty - expectCompletedMergeTraced :: LevelNo -> IncomingRun m h - -> m (Ref (Run m h)) + expectCompletedMergeTraced :: LevelNo -> IncomingRun m h j + -> m (Ref (Run m h (ResultingIndex j))) expectCompletedMergeTraced ln ir = do r <- expectCompletedMerge reg ir traceWith tr $ AtLevel ln $ @@ -842,8 +849,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = newMerge :: MergePolicyForLevel -> Merge.Level -> LevelNo - -> V.Vector (Ref (Run m h)) - -> m (IncomingRun m h) + -> V.Vector (Ref (Run m h (ResultingIndex j))) + -> m (IncomingRun m h j) newMerge mergePolicy mergelast ln rs | Just (r, rest) <- V.uncons rs , V.null rest = do @@ -856,6 +863,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = !n <- incrUniqCounter uc let !caching = diskCachePolicyForLevel confDiskCachePolicy ln !alloc = bloomFilterAllocForLevel conf ln + !newIndex = newWithDefaults @j !runPaths = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel ln $ TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergelast @@ -864,7 +872,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = case confMergeSchedule of OneShot -> do r <- allocateTemp reg - (mergeRuns resolve hfs hbio caching alloc runPaths mergelast rs) + (mergeRuns resolve hfs hbio caching alloc newIndex runPaths mergelast rs) releaseRef traceWith tr $ AtLevel ln $ TraceCompletedMerge (Run.size r) @@ -874,7 +882,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = Incremental -> do mergeMaybe <- allocateMaybeTemp reg - (Merge.new hfs hbio caching alloc mergelast resolve runPaths rs) + (Merge.new hfs hbio caching alloc newIndex mergelast resolve runPaths rs) Merge.abort case mergeMaybe of Nothing -> error "newMerge: merges can not be empty" @@ -922,7 +930,7 @@ maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries maxRunSize' config policy ln = maxRunSize (confSizeRatio config) (confWriteBufferAlloc config) policy ln -mergeLastForLevel :: Levels m h -> Merge.Level +mergeLastForLevel :: Levels m h j -> Merge.Level mergeLastForLevel levels | V.null levels = Merge.LastLevel | otherwise = Merge.MidLevel @@ -930,20 +938,21 @@ mergeLastForLevel levels levelIsFull :: SizeRatio -> V.Vector run -> Bool levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) -{-# SPECIALISE mergeRuns :: ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc -> RunFsPaths -> Merge.Level -> V.Vector (Ref (Run IO h)) -> IO (Ref (Run IO h)) #-} +{-# SPECIALISE mergeRuns :: IndexAcc j => ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc -> ST RealWorld (j RealWorld) -> RunFsPaths -> Merge.Level -> V.Vector (Ref (Run IO h (ResultingIndex j))) -> IO (Ref (Run IO h (ResultingIndex j))) #-} mergeRuns :: - (MonadMask m, MonadST m, MonadSTM m) + (MonadMask m, MonadST m, MonadSTM m, IndexAcc j) => ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> RunDataCaching -> RunBloomFilterAlloc + -> ST (PrimState m) (j (PrimState m)) -> RunFsPaths -> Merge.Level - -> V.Vector (Ref (Run m h)) - -> m (Ref (Run m h)) -mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do - Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs >>= \case + -> V.Vector (Ref (Run m h (ResultingIndex j))) + -> m (Ref (Run m h (ResultingIndex j))) +mergeRuns resolve hfs hbio caching alloc newIndex runPaths mergeLevel runs = do + Merge.new hfs hbio caching alloc newIndex mergeLevel resolve runPaths runs >>= \case Nothing -> error "mergeRuns: no inputs" Just m -> Merge.stepsToCompletion m 1024 @@ -1048,18 +1057,19 @@ mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do newtype Credit = Credit Int {-# SPECIALISE supplyCredits :: - TableConfig + IndexAcc j + => TableConfig -> Credit - -> Levels IO h + -> Levels IO h j -> IO () #-} -- | Supply the given amount of credits to each merge in the levels structure. -- This /may/ cause some merges to progress. supplyCredits :: - (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) + (MonadSTM m, MonadST m, MonadMVar m, MonadMask m, IndexAcc j) => TableConfig -> Credit - -> Levels m h + -> Levels m h j -> m () supplyCredits conf c levels = iforLevelM_ levels $ \ln (Level ir _rs) -> @@ -1077,7 +1087,7 @@ newtype ScaledCredits = ScaledCredits Int -- Initially, 1 update supplies 1 credit. However, since merging runs have -- different numbers of input runs/entries, we may have to a more or less -- merging work than 1 merge step for each credit. -scaleCreditsForMerge :: IncomingRun m h -> Credit -> ScaledCredits +scaleCreditsForMerge :: IncomingRun m h j -> Credit -> ScaledCredits -- A single run is a trivially completed merge, so it requires no credits. scaleCreditsForMerge (Single _) _ = ScaledCredits 0 scaleCreditsForMerge (Merging (DeRef MergingRun {..})) (Credit c) = @@ -1101,14 +1111,14 @@ scaleCreditsForMerge (Merging (DeRef MergingRun {..})) (Credit c) = -- same as division rounding up: ceiling (c * n / 4) in ScaledCredits ((c * n + 3) `div` 4) -{-# SPECIALISE supplyMergeCredits :: ScaledCredits -> CreditThreshold -> IncomingRun IO h -> IO () #-} +{-# SPECIALISE supplyMergeCredits :: IndexAcc j => ScaledCredits -> CreditThreshold -> IncomingRun IO h j -> IO () #-} -- | Supply the given amount of credits to a merging run. This /may/ cause an -- ongoing merge to progress. supplyMergeCredits :: - forall m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) + forall m h j. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m, IndexAcc j) => ScaledCredits -> CreditThreshold - -> IncomingRun m h + -> IncomingRun m h j -> m () supplyMergeCredits _ _ Single{} = pure () supplyMergeCredits (ScaledCredits c) creditsThresh @@ -1233,10 +1243,10 @@ takeAllUnspentCredits (UnspentCreditsVar !unspentCreditsVar) = do else casLoop prev' -{-# SPECIALISE stepMerge :: StrictMVar IO (MergingRunState IO h) -> TotalStepsVar RealWorld -> Credit -> IO Bool #-} +{-# SPECIALISE stepMerge :: IndexAcc j => StrictMVar IO (MergingRunState IO h j) -> TotalStepsVar RealWorld -> Credit -> IO Bool #-} stepMerge :: - (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => StrictMVar m (MergingRunState m h) + (MonadMVar m, MonadMask m, MonadSTM m, MonadST m, IndexAcc j) + => StrictMVar m (MergingRunState m h j) -> TotalStepsVar (PrimState m) -> Credit -> m Bool @@ -1281,13 +1291,14 @@ stepMerge mergeVar (TotalStepsVar totalStepsVar) (Credit c) = pure $ stepResult == MergeDone {-# SPECIALISE completeMerge :: - StrictMVar IO (MergingRunState IO h) + IndexAcc j + => StrictMVar IO (MergingRunState IO h j) -> MutVar RealWorld MergeKnownCompleted -> IO () #-} -- | Convert an 'OngoingMerge' to a 'CompletedMerge'. completeMerge :: - (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => StrictMVar m (MergingRunState m h) + (MonadSTM m, MonadST m, MonadMVar m, MonadMask m, IndexAcc j) + => StrictMVar m (MergingRunState m h j) -> MutVar (PrimState m) MergeKnownCompleted -> m () completeMerge mergeVar mergeKnownCompletedVar = do @@ -1302,10 +1313,10 @@ completeMerge mergeVar mergeKnownCompletedVar = do writeMutVar mergeKnownCompletedVar MergeKnownCompleted pure $! CompletedMerge r -{-# SPECIALISE expectCompletedMerge :: TempRegistry IO -> IncomingRun IO h -> IO (Ref (Run IO h)) #-} +{-# SPECIALISE expectCompletedMerge :: IndexAcc j => TempRegistry IO -> IncomingRun IO h j -> IO (Ref (Run IO h (ResultingIndex j))) #-} expectCompletedMerge :: - (MonadMVar m, MonadSTM m, MonadST m, MonadMask m) - => TempRegistry m -> IncomingRun m h -> m (Ref (Run m h)) + (MonadMVar m, MonadSTM m, MonadST m, MonadMask m, IndexAcc j) + => TempRegistry m -> IncomingRun m h j -> m (Ref (Run m h (ResultingIndex j))) expectCompletedMerge _ (Single r) = pure r expectCompletedMerge reg (Merging (mr@(DeRef MergingRun {..}))) = do knownCompleted <- readMutVar mergeKnownCompleted diff --git a/src/Database/LSMTree/Internal/Run.hs b/src/Database/LSMTree/Internal/Run.hs index c066ab4bc..74cf5bbdf 100644 --- a/src/Database/LSMTree/Internal/Run.hs +++ b/src/Database/LSMTree/Internal/Run.hs @@ -33,6 +33,7 @@ import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive +import Control.Monad.ST.Strict (ST) import Control.RefCount import Data.BloomFilter (Bloom) import qualified Data.ByteString.Short as SBS @@ -44,8 +45,9 @@ import qualified Database.LSMTree.Internal.BlobRef as BlobRef import Database.LSMTree.Internal.BloomFilter (bloomFilterFromSBS) import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry (NumEntries (..)) +import Database.LSMTree.Internal.Index (Index, + IndexAcc (ResultingIndex)) import qualified Database.LSMTree.Internal.Index as Index (fromSBS, sizeInPages) -import Database.LSMTree.Internal.Index.Compact (IndexCompact) import Database.LSMTree.Internal.Page (NumPages) import Database.LSMTree.Internal.Paths as Paths import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc) @@ -64,7 +66,7 @@ import System.FS.BlockIO.API (HasBlockIO) -- | The in-memory representation of a completed LSM run. -- -data Run m h = Run { +data Run m h i = Run { runNumEntries :: !NumEntries -- | The reference count for the LSM run. This counts the -- number of references from LSM handles to this run. When @@ -77,7 +79,7 @@ data Run m h = Run { -- | The in-memory index mapping keys to page numbers in the -- Key\/Ops file. In future we may support alternative index -- representations. - , runIndex :: !IndexCompact + , runIndex :: !i -- | The file handle for the Key\/Ops file. This file is opened -- read-only and is accessed in a page-oriented way, i.e. only -- reading whole pages, at page offsets. It will be opened with @@ -93,37 +95,37 @@ data Run m h = Run { } -- | Shows only the 'runRunFsPaths' field. -instance Show (Run m h) where +instance Show (Run m h i) where showsPrec _ run = showString "Run { runRunFsPaths = " . showsPrec 0 (runRunFsPaths run) . showString " }" -instance NFData h => NFData (Run m h) where +instance (NFData i, NFData h) => NFData (Run m h i) where rnf (Run a b c d e f g h i j) = rnf a `seq` rwhnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f `seq` rnf g `seq` rnf h `seq` rwhnf i `seq` rwhnf j -instance RefCounted (Run m h) where - type FinaliserM (Run m h) = m +instance RefCounted (Run m h i) where + type FinaliserM (Run m h i) = m getRefCounter = runRefCounter -size :: Ref (Run m h) -> NumEntries +size :: Ref (Run m h i) -> NumEntries size (DeRef run) = runNumEntries run -sizeInPages :: Ref (Run m h) -> NumPages +sizeInPages :: Index i => Ref (Run m h i) -> NumPages sizeInPages (DeRef run) = Index.sizeInPages (runIndex run) -runFsPaths :: Ref (Run m h) -> RunFsPaths +runFsPaths :: Ref (Run m h i) -> RunFsPaths runFsPaths (DeRef r) = runRunFsPaths r -runFsPathsNumber :: Ref (Run m h) -> RunNumber +runFsPathsNumber :: Ref (Run m h i) -> RunNumber runFsPathsNumber = Paths.runNumber . runFsPaths -- | Helper function to make a 'WeakBlobRef' that points into a 'Run'. -mkRawBlobRef :: Run m h -> BlobSpan -> RawBlobRef m h +mkRawBlobRef :: Run m h i -> BlobSpan -> RawBlobRef m h mkRawBlobRef Run{runBlobFile} blobspan = BlobRef.mkRawBlobRef runBlobFile blobspan -- | Helper function to make a 'WeakBlobRef' that points into a 'Run'. -mkWeakBlobRef :: Ref (Run m h) -> BlobSpan -> WeakBlobRef m h +mkWeakBlobRef :: Ref (Run m h i) -> BlobSpan -> WeakBlobRef m h mkWeakBlobRef (DeRef Run{runBlobFile}) blobspan = BlobRef.mkWeakBlobRef runBlobFile blobspan @@ -181,14 +183,15 @@ setRunDataCaching hbio runKOpsFile NoCacheRunData = do FS.hSetNoCache hbio runKOpsFile True {-# SPECIALISE fromMutable :: - RunDataCaching - -> RunBuilder IO h - -> IO (Ref (Run IO h)) #-} + IndexAcc j + => RunDataCaching + -> RunBuilder IO h j + -> IO (Ref (Run IO h (ResultingIndex j))) #-} fromMutable :: - (MonadST m, MonadSTM m, MonadMask m) + (MonadST m, MonadSTM m, MonadMask m, IndexAcc j) => RunDataCaching - -> RunBuilder m h - -> m (Ref (Run m h)) + -> RunBuilder m h j + -> m (Ref (Run m h (ResultingIndex j))) fromMutable runRunDataCaching builder = do (runHasFS, runHasBlockIO, runRunFsPaths, runFilter, runIndex, runNumEntries) <- Builder.unsafeFinalise (runRunDataCaching == NoCacheRunData) builder @@ -199,14 +202,16 @@ fromMutable runRunDataCaching builder = do (\runRefCounter -> Run { .. }) {-# SPECIALISE fromWriteBuffer :: - HasFS IO h + IndexAcc j + => HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunBloomFilterAlloc + -> ST RealWorld (j RealWorld) -> RunFsPaths -> WriteBuffer -> Ref (WriteBufferBlobs IO h) - -> IO (Ref (Run IO h)) #-} + -> IO (Ref (Run IO h (ResultingIndex j))) #-} -- | Write a write buffer to disk, including the blobs it contains. -- -- This creates a new 'Run' which must eventually be released with 'releaseRef'. @@ -215,17 +220,18 @@ fromMutable runRunDataCaching builder = do -- immediately when they are added to the write buffer, avoiding the need to do -- it here. fromWriteBuffer :: - (MonadST m, MonadSTM m, MonadMask m) + (MonadST m, MonadSTM m, MonadMask m, IndexAcc j) => HasFS m h -> HasBlockIO m h -> RunDataCaching -> RunBloomFilterAlloc + -> ST (PrimState m) (j (PrimState m)) -> RunFsPaths -> WriteBuffer -> Ref (WriteBufferBlobs m h) - -> m (Ref (Run m h)) -fromWriteBuffer fs hbio caching alloc fsPaths buffer blobs = do - builder <- Builder.new fs hbio fsPaths (WB.numEntries buffer) alloc + -> m (Ref (Run m h (ResultingIndex j))) +fromWriteBuffer fs hbio caching alloc newIndex fsPaths buffer blobs = do + builder <- Builder.new fs hbio fsPaths (WB.numEntries buffer) alloc newIndex for_ (WB.toList buffer) $ \(k, e) -> Builder.addKeyOp builder k (fmap (WBB.mkRawBlobRef blobs) e) --TODO: the fmap entry here reallocates even when there are no blobs @@ -244,11 +250,12 @@ data FileFormatError = FileFormatError FS.FsPath String deriving anyclass Exception {-# SPECIALISE openFromDisk :: - HasFS IO h + Index i + => HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunFsPaths - -> IO (Ref (Run IO h)) #-} + -> IO (Ref (Run IO h i)) #-} -- | Load a previously written run from disk, checking each file's checksum -- against the checksum file. -- @@ -257,13 +264,13 @@ data FileFormatError = FileFormatError FS.FsPath String -- Exceptions will be raised when any of the file's contents don't match their -- checksum ('ChecksumError') or can't be parsed ('FileFormatError'). openFromDisk :: - forall m h. - (MonadSTM m, MonadMask m, PrimMonad m) + forall m h i. + (MonadSTM m, MonadMask m, PrimMonad m, Index i) => HasFS m h -> HasBlockIO m h -> RunDataCaching -> RunFsPaths - -> m (Ref (Run m h)) + -> m (Ref (Run m h i)) openFromDisk fs hbio runRunDataCaching runRunFsPaths = do expectedChecksums <- expectValidFile (runChecksumsPath runRunFsPaths) . fromChecksumsFile diff --git a/src/Database/LSMTree/Internal/RunAcc.hs b/src/Database/LSMTree/Internal/RunAcc.hs index 1f833ba1b..febd35b0f 100644 --- a/src/Database/LSMTree/Internal/RunAcc.hs +++ b/src/Database/LSMTree/Internal/RunAcc.hs @@ -44,11 +44,9 @@ import Database.LSMTree.Internal.Assertions (fromIntegralChecked) import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) import Database.LSMTree.Internal.Chunk (Chunk) import Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..)) +import Database.LSMTree.Internal.Index (IndexAcc, ResultingIndex) import qualified Database.LSMTree.Internal.Index as Index (appendMulti, appendSingle, unsafeEnd) -import Database.LSMTree.Internal.Index.Compact (IndexCompact) -import Database.LSMTree.Internal.Index.CompactAcc (IndexCompactAcc) -import qualified Database.LSMTree.Internal.Index.CompactAcc as IndexCompact import Database.LSMTree.Internal.PageAcc (PageAcc) import qualified Database.LSMTree.Internal.PageAcc as PageAcc import qualified Database.LSMTree.Internal.PageAcc1 as PageAcc @@ -70,9 +68,9 @@ import qualified Monkey -- Use 'new' to start run construction, add new key\/operation pairs to the run -- by using 'addKeyOp' and co, and complete run construction using -- 'unsafeFinalise'. -data RunAcc s = RunAcc { +data RunAcc j s = RunAcc { mbloom :: !(MBloom s SerialisedKey) - , mindex :: !(IndexCompactAcc s) + , mindex :: !(j s) , mpageacc :: !(PageAcc s) , entryCount :: !(PrimVar s Int) } @@ -90,8 +88,8 @@ data RunBloomFilterAlloc = -- -- @nentries@ should be an upper bound on the expected number of entries in the -- output run. -new :: NumEntries -> RunBloomFilterAlloc -> ST s (RunAcc s) -new (NumEntries nentries) alloc = do +new :: NumEntries -> RunBloomFilterAlloc -> ST s (j s) -> ST s (RunAcc j s) +new (NumEntries nentries) alloc newIndexAcc = do mbloom <- case alloc of RunAllocFixed !bitsPerEntry -> let !nbits = fromIntegral bitsPerEntry * fromIntegral nentries @@ -104,7 +102,7 @@ new (NumEntries nentries) alloc = do MBloom.new (fromIntegralChecked $ Monkey.numHashFunctions (fromIntegral nbits) (fromIntegral nentries)) nbits - mindex <- IndexCompact.new 1024 -- TODO(optimise): tune chunk size + mindex <- newIndexAcc mpageacc <- PageAcc.newPageAcc entryCount <- newPrimVar 0 pure RunAcc{..} @@ -115,12 +113,13 @@ new (NumEntries nentries) alloc = do -- The frozen bloom filter and compact index will be returned, along with the -- final page of the run (if necessary), and the remaining chunks of the -- incrementally constructed compact index. -unsafeFinalise :: - RunAcc s +unsafeFinalise + :: IndexAcc j + => RunAcc j s -> ST s ( Maybe RawPage , Maybe Chunk , Bloom SerialisedKey - , IndexCompact + , ResultingIndex j , NumEntries ) unsafeFinalise racc@RunAcc {..} = do @@ -153,7 +152,8 @@ unsafeFinalise racc@RunAcc {..} = do -- pre-serialised, use 'addLargeSerialisedKeyOp'. -- addKeyOp - :: RunAcc s + :: IndexAcc j + => RunAcc j s -> SerialisedKey -> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix -> ST s ([RawPage], [RawOverflowPage], [Chunk]) @@ -178,7 +178,8 @@ addKeyOp racc k e -- This is guaranteed to add the key\/op, and it may yield (at most one) page. -- addSmallKeyOp - :: RunAcc s + :: IndexAcc j + => RunAcc j s -> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s (Maybe (RawPage, Maybe Chunk)) @@ -224,7 +225,8 @@ addSmallKeyOp racc@RunAcc{..} k e = -- 'RawOverflowPage's. -- addLargeKeyOp - :: RunAcc s + :: IndexAcc j + => RunAcc j s -> SerialisedKey -> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix -> ST s ([RawPage], [RawOverflowPage], [Chunk]) @@ -272,7 +274,8 @@ addLargeKeyOp racc@RunAcc{..} k e = -- Otherwise, use 'addLargeKeyOp' or 'addSmallKeyOp' as appropriate. -- addLargeSerialisedKeyOp - :: RunAcc s + :: IndexAcc j + => RunAcc j s -> SerialisedKey -- ^ The key -> RawPage -- ^ The page that this key\/op is in, which must be the -- first page of a multi-page representation of a single @@ -301,7 +304,10 @@ addLargeSerialisedKeyOp racc@RunAcc{..} k page overflowPages = -- -- Returns @Nothing@ if the page accumulator was empty. -- -flushPageIfNonEmpty :: RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk)) +flushPageIfNonEmpty + :: IndexAcc j + => RunAcc j s + -> ST s (Maybe (RawPage, Maybe Chunk)) flushPageIfNonEmpty RunAcc{mpageacc, mindex} = do nkeys <- PageAcc.keysCountPageAcc mpageacc if nkeys > 0 diff --git a/src/Database/LSMTree/Internal/RunBuilder.hs b/src/Database/LSMTree/Internal/RunBuilder.hs index eb45fbbf2..690272697 100644 --- a/src/Database/LSMTree/Internal/RunBuilder.hs +++ b/src/Database/LSMTree/Internal/RunBuilder.hs @@ -17,6 +17,7 @@ import qualified Control.Monad.Class.MonadST as ST import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadThrow) import Control.Monad.Primitive +import Control.Monad.ST.Strict (ST) import Data.BloomFilter (Bloom) import Data.Foldable (for_, traverse_) import Data.Primitive.PrimVar @@ -25,7 +26,7 @@ import Database.LSMTree.Internal.BlobRef (RawBlobRef) import Database.LSMTree.Internal.ChecksumHandle import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry -import Database.LSMTree.Internal.Index.Compact (IndexCompact) +import Database.LSMTree.Internal.Index (IndexAcc (ResultingIndex)) import Database.LSMTree.Internal.Paths import Database.LSMTree.Internal.RawOverflowPage (RawOverflowPage) import Database.LSMTree.Internal.RawPage (RawPage) @@ -49,7 +50,7 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- __Not suitable for concurrent construction from multiple threads!__ -- -data RunBuilder m h = RunBuilder { +data RunBuilder m h j = RunBuilder { -- | The file system paths for all the files used by the run. runBuilderFsPaths :: !RunFsPaths @@ -57,7 +58,7 @@ data RunBuilder m h = RunBuilder { -- morally pure subset of the run cnstruction functionality. In -- particular it contains the (mutable) index, bloom filter and buffered -- pending output for the key\/ops file. - , runBuilderAcc :: !(RunAcc (PrimState m)) + , runBuilderAcc :: !(RunAcc j (PrimState m)) -- | The byte offset within the blob file for the next blob to be written. , runBuilderBlobOffset :: !(PrimVar (PrimState m) Word64) @@ -68,36 +69,42 @@ data RunBuilder m h = RunBuilder { , runBuilderHasBlockIO :: !(HasBlockIO m h) } -{-# SPECIALISE new :: - HasFS IO h +{-# SPECIALISE new + :: IndexAcc j + => HasFS IO h -> HasBlockIO IO h -> RunFsPaths -> NumEntries -> RunBloomFilterAlloc - -> IO (RunBuilder IO h) #-} + -> ST RealWorld (j RealWorld) + -> IO (RunBuilder IO h j) #-} -- | Create an 'RunBuilder' to start building a run. -- -- NOTE: 'new' assumes that 'runDir' that the run is created in exists. -new :: - (MonadST m, MonadSTM m) +new + :: forall m h j . (MonadST m, MonadSTM m, IndexAcc j) => HasFS m h -> HasBlockIO m h -> RunFsPaths -> NumEntries -- ^ an upper bound of the number of entries to be added -> RunBloomFilterAlloc - -> m (RunBuilder m h) -new hfs hbio runBuilderFsPaths numEntries alloc = do - runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc + -> ST (PrimState m) (j (PrimState m)) + -> m (RunBuilder m h j) +new hfs hbio runBuilderFsPaths numEntries alloc newIndexAcc = do + runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc newIndexAcc runBuilderBlobOffset <- newPrimVar 0 runBuilderHandles <- traverse (makeHandle hfs) (pathsForRunFiles runBuilderFsPaths) let builder = RunBuilder { runBuilderHasFS = hfs, runBuilderHasBlockIO = hbio, .. } - writeIndexHeader hfs (forRunIndex runBuilderHandles) (proxy# @IndexCompact) + writeIndexHeader hfs + (forRunIndex runBuilderHandles) + (proxy# @(ResultingIndex j)) return builder -{-# SPECIALISE addKeyOp :: - RunBuilder IO h +{-# SPECIALISE addKeyOp + :: IndexAcc j + => RunBuilder IO h j -> SerialisedKey -> Entry SerialisedValue (RawBlobRef IO h) -> IO () #-} @@ -115,8 +122,8 @@ new hfs hbio runBuilderFsPaths numEntries alloc = do -- everything else only at the end when 'unsafeFinalise' is called. -- addKeyOp :: - (MonadST m, MonadSTM m, MonadThrow m) - => RunBuilder m h + (MonadST m, MonadSTM m, MonadThrow m, IndexAcc j) + => RunBuilder m h j -> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m () @@ -142,8 +149,9 @@ addKeyOp RunBuilder{..} key op = do writeRawOverflowPages runBuilderHasFS (forRunKOps runBuilderHandles) overflowPages for_ chunks $ writeIndexChunk runBuilderHasFS (forRunIndex runBuilderHandles) -{-# SPECIALISE addLargeSerialisedKeyOp :: - RunBuilder IO h +{-# SPECIALISE addLargeSerialisedKeyOp + :: IndexAcc j + => RunBuilder IO h j -> SerialisedKey -> RawPage -> [RawOverflowPage] @@ -151,8 +159,8 @@ addKeyOp RunBuilder{..} key op = do -- | See 'RunAcc.addLargeSerialisedKeyOp' for details. -- addLargeSerialisedKeyOp :: - (MonadST m, MonadSTM m) - => RunBuilder m h + (MonadST m, MonadSTM m, IndexAcc j) + => RunBuilder m h j -> SerialisedKey -> RawPage -> [RawOverflowPage] @@ -165,10 +173,11 @@ addLargeSerialisedKeyOp RunBuilder{..} key page overflowPages = do writeRawOverflowPages runBuilderHasFS (forRunKOps runBuilderHandles) overflowPages' for_ chunks $ writeIndexChunk runBuilderHasFS (forRunIndex runBuilderHandles) -{-# SPECIALISE unsafeFinalise :: - Bool - -> RunBuilder IO h - -> IO (HasFS IO h, HasBlockIO IO h, RunFsPaths, Bloom SerialisedKey, IndexCompact, NumEntries) #-} +{-# SPECIALISE unsafeFinalise + :: IndexAcc j + => Bool + -> RunBuilder IO h j + -> IO (HasFS IO h, HasBlockIO IO h, RunFsPaths, Bloom SerialisedKey, ResultingIndex j, NumEntries) #-} -- | Finish construction of the run. -- Writes the filter and index to file and leaves all written files on disk. -- @@ -176,10 +185,10 @@ addLargeSerialisedKeyOp RunBuilder{..} key page overflowPages = do -- -- TODO: Ensure proper cleanup even in presence of exceptions. unsafeFinalise :: - (MonadST m, MonadSTM m, MonadThrow m) + (MonadST m, MonadSTM m, MonadThrow m, IndexAcc j) => Bool -- ^ drop caches - -> RunBuilder m h - -> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey, IndexCompact, NumEntries) + -> RunBuilder m h j + -> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey, ResultingIndex j, NumEntries) unsafeFinalise dropCaches RunBuilder {..} = do -- write final bits (mPage, mChunk, runFilter, runIndex, numEntries) <- @@ -204,13 +213,13 @@ unsafeFinalise dropCaches RunBuilder {..} = do mapM_ (closeHandle runBuilderHasFS) runBuilderHandles return (runBuilderHasFS, runBuilderHasBlockIO, runBuilderFsPaths, runFilter, runIndex, numEntries) -{-# SPECIALISE close :: RunBuilder IO h -> IO () #-} +{-# SPECIALISE close :: RunBuilder IO h j -> IO () #-} -- | Close a run that is being constructed (has not been finalised yet), -- removing all files associated with it from disk. -- After calling this operation, the run must not be used anymore. -- -- TODO: Ensure proper cleanup even in presence of exceptions. -close :: MonadSTM m => RunBuilder m h -> m () +close :: MonadSTM m => RunBuilder m h j -> m () close RunBuilder {..} = do traverse_ (closeHandle runBuilderHasFS) runBuilderHandles traverse_ (FS.removeFile runBuilderHasFS) (pathsForRunFiles runBuilderFsPaths) diff --git a/src/Database/LSMTree/Internal/RunReader.hs b/src/Database/LSMTree/Internal/RunReader.hs index 8a9c95995..f30e9980f 100644 --- a/src/Database/LSMTree/Internal/RunReader.hs +++ b/src/Database/LSMTree/Internal/RunReader.hs @@ -33,6 +33,7 @@ import Database.LSMTree.Internal.BitMath (ceilDivPageSize, import Database.LSMTree.Internal.BlobFile as BlobFile import Database.LSMTree.Internal.BlobRef as BlobRef import qualified Database.LSMTree.Internal.Entry as E +import Database.LSMTree.Internal.Index (Index) import qualified Database.LSMTree.Internal.Index as Index (search) import Database.LSMTree.Internal.Page (PageNo (..), PageSpan (..), getNumPages, nextPageNo) @@ -86,13 +87,14 @@ data RunReader m h = RunReader { data OffsetKey = NoOffsetKey | OffsetKey !SerialisedKey {-# SPECIALISE new :: - OffsetKey - -> Ref (Run.Run IO h) + Index i + => OffsetKey + -> Ref (Run.Run IO h i) -> IO (RunReader IO h) #-} -new :: forall m h. - (MonadMask m, MonadSTM m, PrimMonad m) +new :: forall m h i. + (MonadMask m, MonadSTM m, PrimMonad m, Index i) => OffsetKey - -> Ref (Run.Run m h) + -> Ref (Run.Run m h i) -> m (RunReader m h) new !offsetKey readerRun@(DeRef Run.Run { diff --git a/src/Database/LSMTree/Internal/RunReaders.hs b/src/Database/LSMTree/Internal/RunReaders.hs index 75863845c..5c1f07243 100644 --- a/src/Database/LSMTree/Internal/RunReaders.hs +++ b/src/Database/LSMTree/Internal/RunReaders.hs @@ -29,6 +29,7 @@ import Data.Traversable (for) import qualified Data.Vector as V import Database.LSMTree.Internal.BlobRef (RawBlobRef) import Database.LSMTree.Internal.Entry (Entry (..)) +import Database.LSMTree.Internal.Index (Index) import Database.LSMTree.Internal.Run (Run) import Database.LSMTree.Internal.RunReader (OffsetKey (..), RunReader (..)) @@ -119,15 +120,16 @@ data Reader m h = type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h)) {-# SPECIALISE new :: - OffsetKey + Index i + => OffsetKey -> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs IO h)) - -> V.Vector (Ref (Run IO h)) + -> V.Vector (Ref (Run IO h i)) -> IO (Maybe (Readers IO h)) #-} -new :: forall m h. - (MonadMask m, MonadST m, MonadSTM m) +new :: forall m h i. + (MonadMask m, MonadST m, MonadSTM m, Index i) => OffsetKey -> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs m h)) - -> V.Vector (Ref (Run m h)) + -> V.Vector (Ref (Run m h i)) -> m (Maybe (Readers m h)) new !offsetKey wbs runs = do wBuffer <- maybe (pure Nothing) (uncurry fromWB) wbs @@ -151,7 +153,7 @@ new !offsetKey wbs runs = do NoOffsetKey -> id OffsetKey k -> Map.dropWhileAntitone (< k) - fromRun :: ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h)) + fromRun :: ReaderNumber -> Ref (Run m h i) -> m (Maybe (ReadCtx m h)) fromRun n run = do reader <- Reader.new offsetKey run nextReadCtx n (ReadRun reader) diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 9a20482fe..65fdbf385 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -37,6 +37,8 @@ import Data.Traversable (for) import qualified Data.Vector as V import Database.LSMTree.Internal.Config import Database.LSMTree.Internal.Entry +import Database.LSMTree.Internal.Index (Index, + IndexAcc (ResultingIndex, newWithDefaults)) import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import qualified Database.LSMTree.Internal.Merge as Merge import Database.LSMTree.Internal.MergeSchedule @@ -132,27 +134,27 @@ newtype SpentCredits = SpentCredits { getSpentCredits :: Int } Conversion to levels snapshot format -------------------------------------------------------------------------------} -{-# SPECIALISE toSnapLevels :: Levels IO h -> IO (SnapLevels (Ref (Run IO h))) #-} +{-# SPECIALISE toSnapLevels :: Levels IO h j -> IO (SnapLevels (Ref (Run IO h (ResultingIndex j)))) #-} toSnapLevels :: (PrimMonad m, MonadMVar m) - => Levels m h - -> m (SnapLevels (Ref (Run m h))) + => Levels m h j + -> m (SnapLevels (Ref (Run m h (ResultingIndex j)))) toSnapLevels levels = SnapLevels <$> V.mapM toSnapLevel levels -{-# SPECIALISE toSnapLevel :: Level IO h -> IO (SnapLevel (Ref (Run IO h))) #-} +{-# SPECIALISE toSnapLevel :: Level IO h j -> IO (SnapLevel (Ref (Run IO h (ResultingIndex j)))) #-} toSnapLevel :: (PrimMonad m, MonadMVar m) - => Level m h - -> m (SnapLevel (Ref (Run m h))) + => Level m h j + -> m (SnapLevel (Ref (Run m h (ResultingIndex j)))) toSnapLevel Level{..} = do sir <- toSnapIncomingRun incomingRun pure (SnapLevel sir residentRuns) -{-# SPECIALISE toSnapIncomingRun :: IncomingRun IO h -> IO (SnapIncomingRun (Ref (Run IO h))) #-} +{-# SPECIALISE toSnapIncomingRun :: IncomingRun IO h j -> IO (SnapIncomingRun (Ref (Run IO h (ResultingIndex j)))) #-} toSnapIncomingRun :: (PrimMonad m, MonadMVar m) - => IncomingRun m h - -> m (SnapIncomingRun (Ref (Run m h))) + => IncomingRun m h j + -> m (SnapIncomingRun (Ref (Run m h (ResultingIndex j)))) toSnapIncomingRun (Single r) = pure (SnapSingleRun r) -- We need to know how many credits were yet unspent so we can restore merge -- work on snapshot load. No need to snapshot the contents of totalStepsVar @@ -171,12 +173,12 @@ toSnapIncomingRun (Merging (DeRef MergingRun {..})) = do smrs {-# SPECIALISE toSnapMergingRunState :: - MergingRunState IO h - -> IO (SnapMergingRunState (Ref (Run IO h))) #-} + MergingRunState IO h j + -> IO (SnapMergingRunState (Ref (Run IO h (ResultingIndex j)))) #-} toSnapMergingRunState :: PrimMonad m - => MergingRunState m h - -> m (SnapMergingRunState (Ref (Run m h))) + => MergingRunState m h j + -> m (SnapMergingRunState (Ref (Run m h (ResultingIndex j)))) toSnapMergingRunState (CompletedMerge r) = pure (SnapCompletedMerge r) -- We need to know how many credits were spent already so we can restore merge -- work on snapshot load. @@ -191,7 +193,7 @@ toSnapMergingRunState (OngoingMerge rs (SpentCreditsVar spentCreditsVar) m) = do {-# SPECIALISE snapshotRuns :: TempRegistry IO -> NamedSnapshotDir - -> SnapLevels (Ref (Run IO h)) + -> SnapLevels (Ref (Run IO h i)) -> IO (SnapLevels RunNumber) #-} -- | @'snapshotRuns' _ targetDir levels@ creates hard links for all run files -- associated with the runs in @levels@, and puts the new directory entries in @@ -200,7 +202,7 @@ snapshotRuns :: (MonadMask m, MonadMVar m) => TempRegistry m -> NamedSnapshotDir - -> SnapLevels (Ref (Run m h)) + -> SnapLevels (Ref (Run m h i)) -> m (SnapLevels RunNumber) snapshotRuns reg (NamedSnapshotDir targetDir) levels = for levels $ \run@(DeRef Run.Run { Run.runHasFS = hfs, @@ -211,7 +213,8 @@ snapshotRuns reg (NamedSnapshotDir targetDir) levels = pure (runNumber targetPaths) {-# SPECIALISE openRuns :: - TempRegistry IO + Index i + => TempRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -219,14 +222,14 @@ snapshotRuns reg (NamedSnapshotDir targetDir) levels = -> NamedSnapshotDir -> ActiveDir -> SnapLevels RunNumber - -> IO (SnapLevels (Ref (Run IO h))) #-} + -> IO (SnapLevels (Ref (Run IO h i))) #-} -- | @'openRuns' _ _ _ _ uniqCounter sourceDir targetDir levels@ takes all run -- files that are referenced by @levels@, and hard links them from @sourceDir@ -- into @targetDir@ with new, unique names (using @uniqCounter@). Each set of -- (hard linked) files that represents a run is opened and verified, returning -- 'Run's as a result. openRuns :: - (MonadMask m, MonadSTM m, MonadST m, MonadMVar m) + forall m h i. (MonadMask m, MonadSTM m, MonadST m, MonadMVar m, Index i) => TempRegistry m -> HasFS m h -> HasBlockIO m h @@ -235,7 +238,7 @@ openRuns :: -> NamedSnapshotDir -> ActiveDir -> SnapLevels RunNumber - -> m (SnapLevels (Ref (Run m h))) + -> m (SnapLevels (Ref (Run m h i))) openRuns reg hfs hbio TableConfig{..} uc (NamedSnapshotDir sourceDir) (ActiveDir targetDir) (SnapLevels levels) = do @@ -259,18 +262,19 @@ openRuns -------------------------------------------------------------------------------} {-# SPECIALISE fromSnapLevels :: - TempRegistry IO + IndexAcc j + => TempRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig -> UniqCounter IO -> ResolveSerialisedValue -> ActiveDir - -> SnapLevels (Ref (Run IO h)) - -> IO (Levels IO h) + -> SnapLevels (Ref (Run IO h (ResultingIndex j))) + -> IO (Levels IO h j) #-} fromSnapLevels :: - forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) + forall m h j. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m, IndexAcc j) => TempRegistry m -> HasFS m h -> HasBlockIO m h @@ -278,14 +282,16 @@ fromSnapLevels :: -> UniqCounter m -> ResolveSerialisedValue -> ActiveDir - -> SnapLevels (Ref (Run m h)) - -> m (Levels m h) + -> SnapLevels (Ref (Run m h (ResultingIndex j))) + -> m (Levels m h j) fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels levels) = V.iforM levels $ \i -> fromSnapLevel (LevelNo (i+1)) where mkPath = RunFsPaths (getActiveDir dir) - fromSnapLevel :: LevelNo -> SnapLevel (Ref (Run m h)) -> m (Level m h) + fromSnapLevel :: LevelNo + -> SnapLevel (Ref (Run m h (ResultingIndex j))) + -> m (Level m h j) fromSnapLevel ln SnapLevel{..} = do (unspentCreditsMay, spentCreditsMay, incomingRun) <- fromSnapIncomingRun snapIncoming -- When a snapshot is created, merge progress is lost, so we have to @@ -310,10 +316,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve where caching = diskCachePolicyForLevel confDiskCachePolicy ln alloc = bloomFilterAllocForLevel conf ln + newIndex = newWithDefaults fromSnapIncomingRun :: - SnapIncomingRun (Ref (Run m h)) - -> m (Maybe UnspentCredits, Maybe SpentCredits, IncomingRun m h) + SnapIncomingRun (Ref (Run m h (ResultingIndex j))) + -> m (Maybe UnspentCredits, Maybe SpentCredits, IncomingRun m h j) fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits knownCompleted smrs) = do (spentCreditsMay, mrs) <- fromSnapMergingRunState smrs (Just unspentCredits, spentCreditsMay,) . Merging <$> @@ -322,8 +329,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve pure (Nothing, Nothing, Single run) fromSnapMergingRunState :: - SnapMergingRunState (Ref (Run m h)) - -> m (Maybe SpentCredits, MergingRunState m h) + SnapMergingRunState (Ref (Run m h (ResultingIndex j))) + -> m (Maybe SpentCredits, MergingRunState m h j) fromSnapMergingRunState (SnapCompletedMerge run) = pure (Nothing, CompletedMerge run) fromSnapMergingRunState (SnapOngoingMerge runs spentCredits mergeLast) = do @@ -332,7 +339,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 rn <- uniqueToRunNumber <$> incrUniqCounter uc mergeMaybe <- allocateMaybeTemp reg - (Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs) + (Merge.new hfs hbio caching alloc newIndex mergeLast resolve (mkPath rn) runs) Merge.abort case mergeMaybe of Nothing -> error "openLevels: merges can not be empty" diff --git a/src/Database/LSMTree/Internal/Snapshot/Codec.hs b/src/Database/LSMTree/Internal/Snapshot/Codec.hs index 8922d3a23..66988c4c0 100644 --- a/src/Database/LSMTree/Internal/Snapshot/Codec.hs +++ b/src/Database/LSMTree/Internal/Snapshot/Codec.hs @@ -200,19 +200,21 @@ instance Decode SnapshotVersion where -- SnapshotMetaData instance Encode SnapshotMetaData where - encode (SnapshotMetaData label tableType config levels) = + encode (SnapshotMetaData label tableType someConfig levels) = encodeListLen 4 <> encode label <> encode tableType - <> encode config + <> encode someConfig <> encode levels instance DecodeVersioned SnapshotMetaData where decodeVersioned ver@V0 = do _ <- decodeListLenOf 4 - SnapshotMetaData - <$> decodeVersioned ver <*> decodeVersioned ver - <*> decodeVersioned ver <*> decodeVersioned ver + label <- decodeVersioned ver + tableType <- decodeVersioned ver + someConfig <- decodeVersioned ver + levels <- decodeVersioned ver + pure (SnapshotMetaData label tableType someConfig levels) -- SnapshotLabel @@ -268,10 +270,22 @@ instance Encode TableConfig where instance DecodeVersioned TableConfig where decodeVersioned v@V0 = do _ <- decodeListLenOf 7 - TableConfig - <$> decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v - <*> decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v - <*> decodeVersioned v + mergePolicy <- decodeVersioned v + sizeRatio <- decodeVersioned v + writeBufferAlloc <- decodeVersioned v + bloomFilterAlloc <- decodeVersioned v + fencePointerIndex <- decodeVersioned v + diskCachePolicy <- decodeVersioned v + mergeSchedule <- decodeVersioned v + pure $ + TableConfig + mergePolicy + sizeRatio + writeBufferAlloc + bloomFilterAlloc + fencePointerIndex + diskCachePolicy + mergeSchedule -- MergePolicy @@ -351,8 +365,8 @@ instance DecodeVersioned BloomFilterAlloc where -- FencePointerIndex instance Encode FencePointerIndex where - encode CompactIndex = encodeWord 0 - encode OrdinaryIndex = encodeWord 1 + encode CompactIndex = encodeWord 0 + encode OrdinaryIndex = encodeWord 1 instance DecodeVersioned FencePointerIndex where decodeVersioned V0 = do diff --git a/src/Database/LSMTree/Monoidal.hs b/src/Database/LSMTree/Monoidal.hs index ec91de933..e98b44d50 100644 --- a/src/Database/LSMTree/Monoidal.hs +++ b/src/Database/LSMTree/Monoidal.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE MagicHash #-} + -- | On disk key-value tables, implemented as Log Structured Merge (LSM) trees. -- -- This module is the API for \"monoidal\" tables, as opposed to \"normal\" @@ -120,7 +122,6 @@ import Control.DeepSeq import Control.Exception (assert) import Control.Monad ((<$!>)) import Data.Bifunctor (Bifunctor (..)) -import Data.Coerce (coerce) import Data.Kind (Type) import Data.Monoid (Sum (..)) import Data.Proxy (Proxy (Proxy)) @@ -130,11 +131,13 @@ import Database.LSMTree.Common (IOLike, Range (..), SerialiseKey, deleteSnapshot, listSnapshots, openSession, withSession) import qualified Database.LSMTree.Common as Common import qualified Database.LSMTree.Internal as Internal +import qualified Database.LSMTree.Internal.Config as Internal import qualified Database.LSMTree.Internal.Entry as Entry import Database.LSMTree.Internal.RawBytes (RawBytes) import qualified Database.LSMTree.Internal.Serialise as Internal import qualified Database.LSMTree.Internal.Snapshot as Internal import qualified Database.LSMTree.Internal.Vector as V +import GHC.Exts -- $resource-management -- See "Database.LSMTree.Normal#g:resource" @@ -174,8 +177,9 @@ withTable :: forall m k v a. -> (Table m k v -> m a) -> m a withTable (Internal.Session' sesh) conf action = - Internal.withTable sesh conf $ - action . Internal.MonoidalTable + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.withTable p sesh conf (action . Internal.MonoidalTable) {-# SPECIALISE new :: Session IO @@ -191,7 +195,10 @@ new :: forall m k v. => Session m -> Common.TableConfig -> m (Table m k v) -new (Internal.Session' sesh) conf = Internal.MonoidalTable <$> Internal.new sesh conf +new (Internal.Session' sesh) conf = + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.MonoidalTable <$> Internal.new p sesh conf {-# SPECIALISE close :: Table IO k v @@ -590,8 +597,8 @@ openSnapshot :: forall m k v. -> SnapshotName -> m (Table m k v) openSnapshot (Internal.Session' sesh) override label snap = - Internal.MonoidalTable <$> Internal.openSnapshot + Internal.MonoidalTable sesh label Internal.SnapMonoidalTable diff --git a/src/Database/LSMTree/Normal.hs b/src/Database/LSMTree/Normal.hs index f1c2140cf..699cac511 100644 --- a/src/Database/LSMTree/Normal.hs +++ b/src/Database/LSMTree/Normal.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE MagicHash #-} + -- | On disk key-value tables, implemented as Log Structured Merge (LSM) trees. -- -- This module is the API for \"normal\" tables, as opposed to \"monoidal\" @@ -124,10 +126,12 @@ import Database.LSMTree.Common (BlobRef (BlobRef), IOLike, Range (..), import qualified Database.LSMTree.Common as Common import qualified Database.LSMTree.Internal as Internal import qualified Database.LSMTree.Internal.BlobRef as Internal +import qualified Database.LSMTree.Internal.Config as Internal import qualified Database.LSMTree.Internal.Entry as Entry import qualified Database.LSMTree.Internal.Serialise as Internal import qualified Database.LSMTree.Internal.Snapshot as Internal import qualified Database.LSMTree.Internal.Vector as V +import GHC.Exts -- $resource-management -- Sessions, tables and cursors use resources and as such need to be @@ -252,7 +256,9 @@ withTable :: -> (Table m k v b -> m a) -> m a withTable (Internal.Session' sesh) conf action = - Internal.withTable sesh conf (action . Internal.NormalTable) + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.withTable p sesh conf (action . Internal.NormalTable) {-# SPECIALISE new :: Session IO @@ -264,11 +270,14 @@ withTable (Internal.Session' sesh) conf action = -- closed using 'close' as soon as they are no longer used. -- new :: - IOLike m + forall m k v b. IOLike m => Session m -> Common.TableConfig -> m (Table m k v b) -new (Internal.Session' sesh) conf = Internal.NormalTable <$> Internal.new sesh conf +new (Internal.Session' sesh) conf = do + case Internal.someFencePointerIndex (Internal.confFencePointerIndex conf) of + Internal.SomeFencePointerIndex (p :: Proxy# j) -> + Internal.NormalTable <$> Internal.new p sesh conf {-# SPECIALISE close :: Table IO k v b @@ -710,14 +719,14 @@ openSnapshot :: forall m k v b. -> SnapshotName -> m (Table m k v b) openSnapshot (Internal.Session' sesh) override label snap = - Internal.NormalTable <$!> - Internal.openSnapshot - sesh - label - Internal.SnapNormalTable - override - snap - const + Internal.openSnapshot + Internal.NormalTable + sesh + label + Internal.SnapNormalTable + override + snap + const {------------------------------------------------------------------------------- Mutiple writable tables