Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<!--
A new scriv changelog fragment.
Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

<!--
### Patch
- A bullet item for the Patch category.
-->

### Non-Breaking

- Ensure uncommitted forkers do not leak Ledger tables handles.

<!--
### Breaking
- A bullet item for the Breaking category.
-->
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ test-suite storage-test
ouroboros-consensus,
ouroboros-network-api,
ouroboros-network-mock,
ouroboros-network-protocols,
pretty-show,
quickcheck-dynamic,
quickcheck-lockstep ^>=0.8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2 (mkInitDb) where

import Cardano.Ledger.BaseTypes (unNonZero)
import Control.Arrow ((>>>))
import Control.Monad (join)
import qualified Control.Monad as Monad (void, (>=>))
import qualified Control.Monad as Monad (join, void)
import Control.Monad.Except
import Control.RAWLock
import qualified Control.RAWLock as RAWLock
import Control.ResourceRegistry
import Control.Tracer
import Data.Foldable (traverse_)
import qualified Data.Foldable as Foldable
import Data.Functor.Contravariant ((>$<))
import Data.Kind (Type)
Expand Down Expand Up @@ -195,7 +193,7 @@ mkInternals bss h =
let selectWhereTo = case whereTo of
TakeAtImmutableTip -> anchorHandle
TakeAtVolatileTip -> currentHandle
withStateRef env (MkSolo . selectWhereTo) $ \(MkSolo st) ->
withStateRef env (MkSolo . selectWhereTo) $ \(MkSolo (st, _)) ->
Monad.void $
takeSnapshot
(configCodec . getExtLedgerCfg . ledgerDbCfg $ ldbCfg env)
Expand Down Expand Up @@ -249,7 +247,7 @@ mkInternals bss h =

pruneLedgerSeq :: LedgerDBEnv m (ExtLedgerState blk) blk -> m ()
pruneLedgerSeq env =
join $ atomically $ stateTVar (ldbSeq env) $ pruneToImmTipOnly
Monad.join $ atomically $ stateTVar (ldbSeq env) $ pruneToImmTipOnly

-- | Testing only! Truncate all snapshots in the DB.
implIntTruncateSnapshots :: MonadThrow m => SomeHasFS m -> m ()
Expand Down Expand Up @@ -360,7 +358,7 @@ implGarbageCollect env slotNo = do
Set.dropWhileAntitone ((< slotNo) . realPointSlot)
-- It is safe to close the handles outside of the locked region, which reduces
-- contention. See the docs of 'ldbOpenHandlesLock'.
join $ RAWLock.withWriteAccess (ldbOpenHandlesLock env) $ \() -> do
Monad.join $ RAWLock.withWriteAccess (ldbOpenHandlesLock env) $ \() -> do
close <- atomically $ stateTVar (ldbSeq env) $ prune (LedgerDbPruneBeforeSlot slotNo)
pure (close, ())

Expand All @@ -379,7 +377,7 @@ implTryTakeSnapshot ::
implTryTakeSnapshot bss env mTime nrBlocks =
if onDiskShouldTakeSnapshot (ldbSnapshotPolicy env) (uncurry (flip diffTime) <$> mTime) nrBlocks
then do
withStateRef env (MkSolo . anchorHandle) $ \(MkSolo st) ->
withStateRef env (MkSolo . anchorHandle) $ \(MkSolo (st, _)) ->
Monad.void $
takeSnapshot
(configCodec . getExtLedgerCfg . ledgerDbCfg $ ldbCfg env)
Expand Down Expand Up @@ -585,36 +583,37 @@ getVolatileLedgerSeq env =
where
k = unNonZero $ maxRollbacks $ ledgerDbCfgSecParam $ ldbCfg env

-- | Get a 'StateRef' from the 'LedgerSeq' (via 'getVolatileLedgerSeq') in the
-- 'LedgerDBEnv', with the 'LedgerTablesHandle' having been duplicated (such
-- that the original can be closed). The caller is responsible for closing the
-- handle.
-- | Get a 'StateRef' from the 'LedgerSeq' in the 'LedgerDBEnv', with the
-- 'LedgerTablesHandle' having been duplicated (such that the original can be
-- closed). The caller should close the handle using the returned @ResourceKey@,
-- although closing the registry will also release the handle.
--
-- For more flexibility, an arbitrary 'Traversable' of the 'StateRef' can be
-- returned; for the simple use case of getting a single 'StateRef', use @t ~
-- 'Solo'@.
getStateRef ::
(IOLike m, Traversable t, GetTip l) =>
LedgerDBEnv m l blk ->
ResourceRegistry m ->
(LedgerSeq m l -> t (StateRef m l)) ->
m (t (StateRef m l))
getStateRef ldbEnv project =
m (t (StateRef m l, ResourceKey m))
getStateRef ldbEnv reg project =
RAWLock.withReadAccess (ldbOpenHandlesLock ldbEnv) $ \() -> do
tst <- project <$> atomically (getVolatileLedgerSeq ldbEnv)
for tst $ \st -> do
tables' <- duplicate $ tables st
pure st{tables = tables'}
(resKey, tables') <- allocate reg (\_ -> duplicate $ tables st) close
pure (st{tables = tables'}, resKey)

-- | Like 'StateRef', but takes care of closing the handle when the given action
-- returns or errors.
withStateRef ::
(IOLike m, Traversable t, GetTip l) =>
LedgerDBEnv m l blk ->
(LedgerSeq m l -> t (StateRef m l)) ->
(t (StateRef m l) -> m a) ->
(t (StateRef m l, ResourceKey m) -> m a) ->
m a
withStateRef ldbEnv project =
bracket (getStateRef ldbEnv project) (traverse_ (close . tables))
withStateRef ldbEnv project f =
withRegistry $ \reg -> getStateRef ldbEnv reg project >>= f

acquireAtTarget ::
( HeaderHash l ~ HeaderHash blk
Expand All @@ -625,9 +624,10 @@ acquireAtTarget ::
) =>
LedgerDBEnv m l blk ->
Either Word64 (Target (Point blk)) ->
m (Either GetForkerError (StateRef m l))
acquireAtTarget ldbEnv target =
getStateRef ldbEnv $ \l -> case target of
ResourceRegistry m ->
m (Either GetForkerError (StateRef m l, ResourceKey m))
acquireAtTarget ldbEnv target reg =
getStateRef ldbEnv reg $ \l -> case target of
Right VolatileTip -> pure $ currentHandle l
Right ImmutableTip -> pure $ anchorHandle l
Right (SpecificPoint pt) -> do
Expand Down Expand Up @@ -661,7 +661,7 @@ newForkerAtTarget ::
Target (Point blk) ->
m (Either GetForkerError (Forker m l blk))
newForkerAtTarget h rr pt = getEnv h $ \ldbEnv ->
acquireAtTarget ldbEnv (Right pt) >>= traverse (newForker h ldbEnv rr)
acquireAtTarget ldbEnv (Right pt) rr >>= traverse (newForker h ldbEnv rr)

newForkerByRollback ::
( HeaderHash l ~ HeaderHash blk
Expand All @@ -676,14 +676,14 @@ newForkerByRollback ::
Word64 ->
m (Either GetForkerError (Forker m l blk))
newForkerByRollback h rr n = getEnv h $ \ldbEnv ->
acquireAtTarget ldbEnv (Left n) >>= traverse (newForker h ldbEnv rr)
acquireAtTarget ldbEnv (Left n) rr >>= traverse (newForker h ldbEnv rr)

closeForkerEnv ::
IOLike m => ForkerEnv m l blk -> m ()
closeForkerEnv ForkerEnv{foeResourcesToRelease = (lock, key, toRelease)} =
RAWLock.withWriteAccess lock $
const $ do
id =<< atomically (swapTVar toRelease (pure ()))
Monad.join $ atomically (swapTVar toRelease (pure ()))
_ <- release key
pure ((), ())

Expand Down Expand Up @@ -773,14 +773,19 @@ newForker ::
LedgerDBHandle m l blk ->
LedgerDBEnv m l blk ->
ResourceRegistry m ->
StateRef m l ->
(StateRef m l, ResourceKey m) ->
m (Forker m l blk)
newForker h ldbEnv rr st = do
newForker h ldbEnv rr (st, rk) = do
forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \r -> (r, r + 1)
let tr = LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
traceWith tr ForkerOpen
lseqVar <- newTVarIO . LedgerSeq . AS.Empty $ st
(k, toRelease) <- allocate rr (\_ -> newTVarIO (pure ())) (readTVarIO Monad.>=> id)
-- The closing action that we allocate in the TVar from the start is not
-- strictly necessary if the caller uses a short-lived registry like the ones
-- in Chain selection or the forging loop. Just in case the user passes a
-- long-lived registry, we store such closing action to make sure the handle
-- is closed even under @forkerClose@ if the registry outlives the forker.
(k, toRelease) <- allocate rr (\_ -> newTVarIO (Monad.void (release rk))) (Monad.join . readTVarIO)
let forkerEnv =
ForkerEnv
{ foeLedgerSeq = lseqVar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ implForkerCommit env = do
_ AS.:< closeOld' -> closeLedgerSeq (LedgerSeq closeOld')
-- Finally, close the anchor of @lseq@ (which is a duplicate of
-- the head of @olddb'@).
--
-- Note if the resource registry used to create the Forker is
-- ephemeral as the one created on each Chain selection or each
-- Forging loop iteration, this first duplicated state will be
-- closed by the resource registry closing down, so this will be
-- a double release, which is fine. We prefer keeping this
-- action just in case some client passes a registry that
-- outlives the forker.
--
-- The rest of the states in the forker will be closed via
-- @foeResourcesToRelease@ instead of via the registry.
close $ tables $ AS.anchor lseq
pure (closeDiscarded, LedgerSeq newdb)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do
pure
LedgerTablesHandle
{ close = do
atomically $ writeTVar tv LedgerTablesHandleClosed
traceWith tracer V2.TraceLedgerTablesHandleClose
p <- atomically $ swapTVar tv LedgerTablesHandleClosed
case p of
LedgerTablesHandleOpen{} -> traceWith tracer V2.TraceLedgerTablesHandleClose
_ -> pure ()
, duplicate = do
hs <- readTVarIO tv
!x <- guardClosed hs $ newInMemoryLedgerTablesHandle tracer someFS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import Ouroboros.Consensus.Util hiding (Some)
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import qualified Ouroboros.Network.AnchoredSeq as AS
import Ouroboros.Network.Protocol.LocalStateQuery.Type
import qualified System.Directory as Dir
import System.FS.API
import qualified System.FS.IO as FSIO
Expand Down Expand Up @@ -280,6 +281,9 @@ instance StateModel Model where
Action Model (ExtLedgerState TestBlock EmptyMK, ExtLedgerState TestBlock EmptyMK)
Init :: SecurityParam -> Action Model ()
ValidateAndCommit :: Word64 -> [TestBlock] -> Action Model ()
-- \| This action is used only to observe the side effects of closing an
-- uncommitted forker, to ensure all handles are properly deallocated.
OpenAndCloseForker :: Action Model ()

actionName WipeLedgerDB{} = "WipeLedgerDB"
actionName TruncateSnapshots{} = "TruncateSnapshots"
Expand All @@ -288,6 +292,7 @@ instance StateModel Model where
actionName GetState{} = "GetState"
actionName Init{} = "Init"
actionName ValidateAndCommit{} = "ValidateAndCommit"
actionName OpenAndCloseForker = "OpenAndCloseForker"

arbitraryAction _ UnInit = Some . Init <$> QC.arbitrary
arbitraryAction _ model@(Model chain secParam) =
Expand Down Expand Up @@ -322,6 +327,7 @@ instance StateModel Model where
)
, (1, pure $ Some WipeLedgerDB)
, (1, pure $ Some TruncateSnapshots)
, (1, pure $ Some OpenAndCloseForker)
]

initialState = UnInit
Expand Down Expand Up @@ -363,6 +369,7 @@ instance StateModel Model where
nextState state WipeLedgerDB _var = state
nextState state TruncateSnapshots _var = state
nextState state (DropAndRestore n) _var = modelRollback n state
nextState state OpenAndCloseForker _var = state
nextState UnInit _ _ = error "Uninitialized model created a command different than Init"

precondition UnInit Init{} = True
Expand Down Expand Up @@ -583,6 +590,14 @@ instance RunModel Model (StateT Environment IO) where
atomically $ modifyTVar (dbChain chainDb) (drop (fromIntegral n))
closeLedgerDB testInternals
perform state (Init secParam) lk
perform _ OpenAndCloseForker _ = do
Environment ldb _ _ _ _ _ _ <- get
lift $ withRegistry $ \rr -> do
eFrk <- LedgerDB.getForkerAtTarget ldb rr VolatileTip
case eFrk of
Left err -> error $ "Impossible: can't acquire forker at tip: " <> show err
Right frk -> forkerClose frk
pure $ pure ()
perform _ TruncateSnapshots _ = do
Environment _ testInternals _ _ _ _ _ <- get
lift $ truncateSnapshots testInternals
Expand Down
Loading