diff --git a/ouroboros-consensus/changelog.d/20250626_193647_alexander.esgen_ledgerdb_garbage_collect_states.md b/ouroboros-consensus/changelog.d/20250626_193647_alexander.esgen_ledgerdb_garbage_collect_states.md new file mode 100644 index 0000000000..9dfb07f311 --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250626_193647_alexander.esgen_ledgerdb_garbage_collect_states.md @@ -0,0 +1,9 @@ +### Breaking + +- Changed pruning of immutable ledger states to happen on LedgerDB garbage + collection instead of directly on every block adoption. This is purely an + internal refactoring (with breaking API changes) supporting predictable ledger + snapshotting. + +- Avoid maintaining volatile ledger states during ledger replay, making it + slightly more efficient. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index a33055b067..3ee8da303f 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -278,7 +278,9 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do let testing = Internal { intCopyToImmutableDB = getEnv h (withFuse copyTestFuse . Background.copyToImmutableDB) - , intGarbageCollect = getEnv1 h Background.garbageCollect + , intGarbageCollect = \slot -> getEnv h $ \e -> do + Background.garbageCollectBlocks e slot + LedgerDB.garbageCollect (cdbLedgerDB e) slot , intTryTakeSnapshot = getEnv h $ \env' -> void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound , intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 71a4abcbbc..ed8ce9bc97 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -1,6 +1,7 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} @@ -19,11 +20,10 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background launchBgTasks -- * Copying blocks from the VolatileDB to the ImmutableDB - , copyAndSnapshotRunner , copyToImmutableDB -- * Executing garbage collection - , garbageCollect + , garbageCollectBlocks -- * Scheduling garbage collections , GcParams (..) @@ -76,6 +76,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Condense import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher) import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..)) import qualified Ouroboros.Network.AnchoredFragment as AF @@ -99,17 +100,30 @@ launchBgTasks cdb@CDB{..} replayed = do !addBlockThread <- launch "ChainDB.addBlockRunner" $ addBlockRunner cdbChainSelFuse cdb + + ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed + !ledgerDbMaintenaceThread <- + forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $ + ledgerDbTaskWatcher cdb ledgerDbTasksTrigger + gcSchedule <- newGcSchedule !gcThread <- - launch "ChainDB.gcScheduleRunner" $ + launch "ChainDB.gcBlocksScheduleRunner" $ gcScheduleRunner gcSchedule $ - garbageCollect cdb - !copyAndSnapshotThread <- - launch "ChainDB.copyAndSnapshotRunner" $ - copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse + garbageCollectBlocks cdb + + !copyToImmutableDBThread <- + launch "ChainDB.copyToImmutableDBRunner" $ + copyToImmutableDBRunner cdb ledgerDbTasksTrigger gcSchedule cdbCopyFuse + atomically $ writeTVar cdbKillBgThreads $ - sequence_ [addBlockThread, gcThread, copyAndSnapshotThread] + sequence_ + [ addBlockThread + , cancelThread ledgerDbMaintenaceThread + , gcThread + , copyToImmutableDBThread + ] where launch :: String -> m Void -> m (m ()) launch = fmap cancelThread .: forkLinkedThread cdbRegistry @@ -198,22 +212,18 @@ copyToImmutableDB CDB{..} = electric $ do _ -> error "header to remove not on the current chain" {------------------------------------------------------------------------------- - Snapshotting + Copy to ImmutableDB -------------------------------------------------------------------------------} --- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the --- LedgerDB +-- | Copy blocks from the VolatileDB to ImmutableDB and trigger further tasks in +-- other threads. -- -- We watch the chain for changes. Whenever the chain is longer than @k@, then -- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB -- (using 'copyToImmutableDB'). Once that is complete, -- --- * We periodically take a snapshot of the LedgerDB (depending on its config). --- When enough blocks (depending on its config) have been replayed during --- startup, a snapshot of the replayed LedgerDB will be written to disk at the --- start of this function. NOTE: After this initial snapshot we do not take a --- snapshot of the LedgerDB until the chain has changed again, irrespective of --- the LedgerDB policy. +-- * Trigger LedgerDB maintenance tasks, namely flushing, taking snapshots and +-- garbage collection. -- -- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most -- recent block that was copied. @@ -228,32 +238,26 @@ copyToImmutableDB CDB{..} = electric $ do -- GC can happen, when we restart the node and schedule the /next/ GC, it will -- /imply/ any previously scheduled GC, since GC is driven by slot number -- ("garbage collect anything older than @x@"). -copyAndSnapshotRunner :: +copyToImmutableDBRunner :: forall m blk. ( IOLike m , LedgerSupportsProtocol blk ) => ChainDbEnv m blk -> + LedgerDbTasksTrigger m -> GcSchedule m -> - -- | Number of immutable blocks replayed on ledger DB startup - Word64 -> Fuse m -> m Void -copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do +copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do -- this first flush will persist the differences that come from the initial -- chain selection. LedgerDB.tryFlush cdbLedgerDB - loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB Nothing replayed + forever copyAndTrigger where SecurityParam k = configSecurityParam cdbTopLevelConfig - loop :: LedgerDB.SnapCounters -> m Void - loop counters = do - let LedgerDB.SnapCounters - { prevSnapshotTime - , ntBlocksSinceLastSnap - } = counters - + copyAndTrigger :: m () + copyAndTrigger = do -- Wait for the chain to grow larger than @k@ numToWrite <- atomically $ do curChain <- icWithoutTime <$> readTVar cdbChain @@ -264,14 +268,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do -- -- This is a synchronous operation: when it returns, the blocks have been -- copied to disk (though not flushed, necessarily). - withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC' + gcSlotNo <- withFuse fuse (copyToImmutableDB cdb) - LedgerDB.tryFlush cdbLedgerDB - - now <- getMonotonicTime - let ntBlocksSinceLastSnap' = ntBlocksSinceLastSnap + numToWrite - - loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap' + triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite + scheduleGC' gcSlotNo scheduleGC' :: WithOrigin SlotNo -> m () scheduleGC' Origin = return () @@ -285,6 +285,97 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do } gcSchedule +{------------------------------------------------------------------------------- + LedgerDB maintenance tasks +-------------------------------------------------------------------------------} + +-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable +-- DB tip slot advances when we finish copying blocks to it. +newtype LedgerDbTasksTrigger m + = LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState) + +data LedgerDbTaskState = LedgerDbTaskState + { ldbtsImmTip :: !(WithOrigin SlotNo) + , ldbtsPrevSnapshotTime :: !(Maybe Time) + , ldbtsBlocksSinceLastSnapshot :: !Word64 + } + deriving stock Generic + deriving anyclass NoThunks + +newLedgerDbTasksTrigger :: + IOLike m => + -- | Number of blocks replayed. + Word64 -> + m (LedgerDbTasksTrigger m) +newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st + where + st = + LedgerDbTaskState + { ldbtsImmTip = Origin + , ldbtsPrevSnapshotTime = Nothing + , ldbtsBlocksSinceLastSnapshot = replayed + } + +triggerLedgerDbTasks :: + forall m. + IOLike m => + LedgerDbTasksTrigger m -> + -- | New tip of the ImmutableDB. + WithOrigin SlotNo -> + -- | Number of blocks written to the ImmutableDB. + Word64 -> + m () +triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten = + atomically $ modifyTVar varSt $ \st -> + st + { ldbtsImmTip = immTip + , ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten + } + +-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes. +-- +-- * Flushing of differences. +-- * Taking snapshots. +-- * Garbage collection. +ledgerDbTaskWatcher :: + forall m blk. + IOLike m => + ChainDbEnv m blk -> + LedgerDbTasksTrigger m -> + Watcher m LedgerDbTaskState (WithOrigin SlotNo) +ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) = + Watcher + { wFingerprint = ldbtsImmTip + , wInitial = Nothing + , wReader = readTVar varSt + , wNotify = + \LedgerDbTaskState + { ldbtsImmTip + , ldbtsBlocksSinceLastSnapshot = blocksSinceLast + , ldbtsPrevSnapshotTime = prevSnapTime + } -> + whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do + LedgerDB.tryFlush cdbLedgerDB + + now <- getMonotonicTime + LedgerDB.SnapCounters + { prevSnapshotTime + , ntBlocksSinceLastSnap + } <- + LedgerDB.tryTakeSnapshot + cdbLedgerDB + ((,now) <$> prevSnapTime) + blocksSinceLast + atomically $ modifyTVar varSt $ \st -> + st + { ldbtsBlocksSinceLastSnapshot = + ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap + , ldbtsPrevSnapshotTime = prevSnapshotTime + } + + LedgerDB.garbageCollect cdbLedgerDB slotNo + } + {------------------------------------------------------------------------------- Executing garbage collection -------------------------------------------------------------------------------} @@ -292,9 +383,6 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do -- | Trigger a garbage collection for blocks older than the given 'SlotNo' on -- the VolatileDB. -- --- Also removes the corresponding cached "previously applied points" from the --- LedgerDB. --- -- This is thread-safe as the VolatileDB locks itself while performing a GC. -- -- When calling this function it is __critical__ that the blocks that will be @@ -304,11 +392,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do -- -- TODO will a long GC be a bottleneck? It will block any other calls to -- @putBlock@ and @getBlock@. -garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m () -garbageCollect CDB{..} slotNo = do +garbageCollectBlocks :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m () +garbageCollectBlocks CDB{..} slotNo = do VolatileDB.garbageCollect cdbVolatileDB slotNo atomically $ do - LedgerDB.garbageCollect cdbLedgerDB slotNo modifyTVar cdbInvalid $ fmap $ Map.filter ((>= slotNo) . invalidBlockSlotNo) traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index 3491f343da..8db64c44bc 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -251,9 +251,15 @@ data LedgerDB m l blk = LedgerDB -- back as many blocks as the passed @Word64@. , getPrevApplied :: STM m (Set (RealPoint blk)) -- ^ Get the references to blocks that have previously been applied. - , garbageCollect :: SlotNo -> STM m () - -- ^ Garbage collect references to old blocks that have been previously - -- applied and committed. + , garbageCollect :: SlotNo -> m () + -- ^ Garbage collect references to old state that is older than the given + -- slot. + -- + -- Concretely, this affects: + -- + -- * Ledger states (and potentially underlying handles for on-disk storage). + -- + -- * The set of previously applied points. , tryTakeSnapshot :: l ~ ExtLedgerState blk => Maybe (Time, Time) -> @@ -298,7 +304,14 @@ data TestInternals m l blk = TestInternals { wipeLedgerDB :: m () , takeSnapshotNOW :: WhereToTakeSnapshot -> Maybe String -> m () , push :: ExtLedgerState blk DiffMK -> m () + -- ^ Push a ledger state, and prune the 'LedgerDB' to its immutable tip. + -- + -- This does not modify the set of previously applied points. , reapplyThenPushNOW :: blk -> m () + -- ^ Apply block to the tip ledger state (using reapplication), and prune the + -- 'LedgerDB' to its immutable tip. + -- + -- This does not modify the set of previously applied points. , truncateSnapshots :: m () , closeLedgerDB :: m () , getNumLedgerTablesHandles :: m Word64 @@ -456,11 +469,10 @@ data InitDB db m blk = InitDB -- ^ Closing the database, to be reopened again with a different snapshot or -- with the genesis state. , initReapplyBlock :: !(LedgerDbCfg (ExtLedgerState blk) -> blk -> db -> m db) - -- ^ Reapply a block from the immutable DB when initializing the DB. + -- ^ Reapply a block from the immutable DB when initializing the DB. Prune the + -- LedgerDB such that there are no volatile states. , currentTip :: !(db -> LedgerState blk EmptyMK) -- ^ Getting the current tip for tracing the Ledger Events. - , pruneDb :: !(db -> m db) - -- ^ Prune the database so that no immutable states are considered volatile. , mkLedgerDb :: !(db -> m (LedgerDB m (ExtLedgerState blk) blk, TestInternals m (ExtLedgerState blk) blk)) -- ^ Create a LedgerDB from the initialized data structures from previous @@ -545,13 +557,7 @@ initialize Left err -> do closeDb initDb error $ "Invariant violation: invalid immutable chain " <> show err - Right (db, replayed) -> do - db' <- pruneDb dbIface db - return - ( acc InitFromGenesis - , db' - , replayed - ) + Right (db, replayed) -> return (acc InitFromGenesis, db, replayed) tryNewestFirst acc (s : ss) = do eInitDb <- initFromSnapshot s case eInitDb of @@ -603,9 +609,7 @@ initialize Monad.when (diskSnapshotIsTemporary s) $ deleteSnapshot hasFS s closeDb initDb tryNewestFirst (acc . InitFailure s err) ss - Right (db, replayed) -> do - db' <- pruneDb dbIface db - return (acc (InitFromSnapshot s pt), db', replayed) + Right (db, replayed) -> return (acc (InitFromSnapshot s pt), db, replayed) replayTracer' = decorateReplayTracerWithGoal @@ -775,10 +779,10 @@ type LedgerSupportsLedgerDB blk = -------------------------------------------------------------------------------} -- | Options for prunning the LedgerDB --- --- Rather than using a plain `Word64` we use this to be able to distinguish that --- we are indeed using --- 1. @0@ in places where it is necessary --- 2. the security parameter as is, in other places -data LedgerDbPrune = LedgerDbPruneAll | LedgerDbPruneKeeping SecurityParam +data LedgerDbPrune + = -- | Prune all states, keeping only the current tip. + LedgerDbPruneAll + | -- | Prune such that all (non-anchor) states are not older than the given + -- slot. + LedgerDbPruneBeforeSlot SlotNo deriving Show diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs index aa08ce0cec..d03775f0a6 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs @@ -19,6 +19,7 @@ -- module will be gone. module Ouroboros.Consensus.Storage.LedgerDB.V1 (mkInitDb) where +import Cardano.Ledger.BaseTypes.NonZero (NonZero (..)) import Control.Arrow ((>>>)) import Control.Monad import Control.Monad.Except @@ -26,6 +27,7 @@ import Control.Monad.Trans (lift) import Control.ResourceRegistry import Control.Tracer import qualified Data.Foldable as Foldable +import Data.Functor ((<&>)) import Data.Functor.Contravariant ((>$<)) import Data.Kind (Type) import Data.Map (Map) @@ -119,7 +121,6 @@ mkInitDb args bss getBlock = else pure chlog' pure (chlog'', r, bstore) , currentTip = \(ch, _, _) -> ledgerState . current $ ch - , pruneDb = \(ch, r, bs) -> pure (pruneToImmTipOnly ch, r, bs) , mkLedgerDb = \(db, ldbBackingStoreKey, ldbBackingStore) -> do (varDB, prevApplied) <- (,) <$> newTVarIO db <*> newTVarIO Set.empty @@ -185,7 +186,7 @@ implMkLedgerDb h = , getForkerAtTarget = newForkerAtTarget h , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied - , garbageCollect = getEnvSTM1 h implGarbageCollect + , garbageCollect = getEnv1 h implGarbageCollect , tryTakeSnapshot = getEnv2 h implTryTakeSnapshot , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h @@ -200,10 +201,15 @@ implGetVolatileTip :: implGetVolatileTip = fmap current . readTVar . ldbChangelog implGetImmutableTip :: - MonadSTM m => + (MonadSTM m, GetTip l) => LedgerDBEnv m l blk -> STM m (l EmptyMK) -implGetImmutableTip = fmap anchor . readTVar . ldbChangelog +implGetImmutableTip env = + -- The DbChangelog might contain more than k states if they have not yet + -- been garbage-collected. + fmap (AS.anchor . AS.anchorNewest (envMaxRollbacks env) . changelogStates) + . readTVar + $ ldbChangelog env implGetPastLedgerState :: ( MonadSTM m @@ -214,7 +220,17 @@ implGetPastLedgerState :: , HeaderHash l ~ HeaderHash blk ) => LedgerDBEnv m l blk -> Point blk -> STM m (Maybe (l EmptyMK)) -implGetPastLedgerState env point = getPastLedgerAt point <$> readTVar (ldbChangelog env) +implGetPastLedgerState env point = + readTVar (ldbChangelog env) <&> \chlog -> do + -- The DbChangelog might contain more than k states if they have not yet + -- been garbage-collected, so make sure that the point is volatile (or the + -- immutable tip). + guard $ + AS.withinBounds + (pointSlot point) + ((point ==) . castPoint . either getTip getTip) + (AS.anchorNewest (envMaxRollbacks env) (changelogStates chlog)) + getPastLedgerAt point chlog implGetHeaderStateHistory :: ( MonadSTM m @@ -237,6 +253,9 @@ implGetHeaderStateHistory env = do pure . HeaderStateHistory . AS.bimap mkHeaderStateWithTime' mkHeaderStateWithTime' + -- The DbChangelog might contain more than k states if they have not yet + -- been garbage-collected, so only take the corresponding suffix. + . AS.anchorNewest (envMaxRollbacks env) $ changelogStates ldb implValidate :: @@ -274,10 +293,17 @@ implValidate h ldbEnv rr tr cache rollbacks hdrs = implGetPrevApplied :: MonadSTM m => LedgerDBEnv m l blk -> STM m (Set (RealPoint blk)) implGetPrevApplied env = readTVar (ldbPrevApplied env) --- | Remove all points with a slot older than the given slot from the set of --- previously applied points. -implGarbageCollect :: MonadSTM m => LedgerDBEnv m l blk -> SlotNo -> STM m () -implGarbageCollect env slotNo = +-- | Remove 'DbChangelog' states older than the given slot, and all points with +-- a slot older than the given slot from the set of previously applied points. +implGarbageCollect :: + ( MonadSTM m + , IsLedger (LedgerState blk) + , l ~ ExtLedgerState blk + ) => + LedgerDBEnv m l blk -> SlotNo -> m () +implGarbageCollect env slotNo = atomically $ do + modifyTVar (ldbChangelog env) $ + prune (LedgerDbPruneBeforeSlot slotNo) modifyTVar (ldbPrevApplied env) $ Set.dropWhileAntitone ((< slotNo) . realPointSlot) @@ -410,7 +436,7 @@ implIntPush :: LedgerDBEnv m l blk -> l DiffMK -> m () implIntPush env st = do chlog <- readTVarIO $ ldbChangelog env - let chlog' = prune (LedgerDbPruneKeeping (ledgerDbCfgSecParam $ ldbCfg env)) $ extend st chlog + let chlog' = pruneToImmTipOnly $ extend st chlog atomically $ writeTVar (ldbChangelog env) chlog' implIntReapplyThenPush :: @@ -558,6 +584,10 @@ deriving instance ) => NoThunks (LedgerDBEnv m l blk) +-- | Return the security parameter @k@. Convenience function. +envMaxRollbacks :: LedgerDBEnv m l blk -> Word64 +envMaxRollbacks = unNonZero . maxRollbacks . ledgerDbCfgSecParam . ldbCfg + -- | Check if the LedgerDB is open, if so, executing the given function on the -- 'LedgerDBEnv', otherwise, throw a 'CloseDBError'. getEnv :: @@ -729,27 +759,36 @@ acquireAtTarget :: ReadLocked m (Either GetForkerError (DbChangelog l)) acquireAtTarget ldbEnv target = readLocked $ runExceptT $ do dblog <- lift $ readTVarIO (ldbChangelog ldbEnv) + -- The DbChangelog might contain more than k states if they have not yet + -- been garbage-collected. + let immTip :: Point blk + immTip = castPoint $ getTip $ AS.anchor $ AS.anchorNewest k $ changelogStates dblog + + rollbackTo pt + | pointSlot pt < pointSlot immTip = throwError $ PointTooOld Nothing + | otherwise = case rollback pt dblog of + Nothing -> throwError PointNotOnChain + Just dblog' -> pure dblog' -- Get the prefix of the dblog ending in the specified target. case target of Right VolatileTip -> pure dblog - Right ImmutableTip -> pure $ rollbackToAnchor dblog - Right (SpecificPoint pt) -> do - let immTip = getTip $ anchor dblog - case rollback pt dblog of - Nothing - | pointSlot pt < pointSlot immTip -> throwError $ PointTooOld Nothing - | otherwise -> throwError PointNotOnChain - Just dblog' -> pure dblog' - Left n -> case rollbackN n dblog of - Nothing -> + Right ImmutableTip -> rollbackTo immTip + Right (SpecificPoint pt) -> rollbackTo pt + Left n -> do + let rollbackMax = maxRollback dblog `min` k + when (n > rollbackMax) $ throwError $ PointTooOld $ Just ExceededRollback - { rollbackMaximum = maxRollback dblog + { rollbackMaximum = rollbackMax , rollbackRequested = n } - Just dblog' -> pure dblog' + case rollbackN n dblog of + Nothing -> error "unreachable" + Just dblog' -> pure dblog' + where + k = envMaxRollbacks ldbEnv {------------------------------------------------------------------------------- Make forkers from consistent views @@ -761,6 +800,7 @@ newForker :: , LedgerSupportsProtocol blk , NoThunks (l EmptyMK) , GetTip l + , StandardHash l ) => LedgerDBHandle m l blk -> LedgerDBEnv m l blk -> @@ -776,7 +816,6 @@ newForker h ldbEnv rr dblog = readLocked $ do { foeBackingStoreValueHandle = forkerMVar , foeChangelog = dblogVar , foeSwitchVar = ldbChangelog ldbEnv - , foeSecurityParam = ledgerDbCfgSecParam $ ldbCfg ldbEnv , foeTracer = LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv } @@ -798,6 +837,7 @@ mkForker :: , HasHeader blk , HasLedgerTables l , GetTip l + , StandardHash l ) => LedgerDBHandle m l blk -> QueryBatchSize -> diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/DbChangelog.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/DbChangelog.hs index f402fde67d..bf913d6062 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/DbChangelog.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/DbChangelog.hs @@ -47,8 +47,8 @@ -- == Carrying states -- -- The 'DbChangelog' contains an instantiation of the 'AnchoredSeq' data type to --- hold the last \(k\) in-memory ledger states. This data type is impemented --- using the /finger tree/ data structure and has the following time +-- hold (at least) the last \(k\) in-memory ledger states. This data type is +-- implemented using the /finger tree/ data structure and has the following time -- complexities: -- -- - Appending a new ledger state to the end in constant time. @@ -67,16 +67,12 @@ -- -- == Appending in-memory states -- --- When a new ledger state is appended to a fully saturated 'DbChangelog' (i.e. --- that contains \(k\) states), the ledger state at the anchor is dropped and --- the oldest element in the sequence becomes the new anchor, as it has become --- immutable. Note that we only refer here to the in-memory states, as the diffs --- from the anchor will remain in the 'DbChangelog' until flushing happens. This --- maintains the invariant that only the last \(k\) in-memory ledger states are --- stored, /excluding/ the ledger state at the anchor. This means that in --- practice, \(k + 1\) ledger states will be kept in memory. When the --- 'DbChangelog' contains fewer than \(k\) elements, new ones are appended --- without shifting the anchor until it is saturated. +-- When a new ledger state is appended to a 'DbChangelog', the ledger state at +-- the anchor is now subject to pruning/garbage collection as they are +-- immutable. This means that in practice, slightly more than \(k + 1\) ledger +-- states will be kept in memory. When the 'DbChangelog' contains fewer than +-- \(k\) elements, new ones are appended without causing the ones near the +-- anchor to be pruned/garbage-collected. -- -- == Getting and appending differences -- @@ -223,7 +219,8 @@ import qualified Ouroboros.Network.AnchoredSeq as AS -- that need a 'BackingStore' as an anchor point. -- -- We illustrate its contents below, where @k = 3@ (for a state @Li@, the --- corresponding set of differences is @Di@): +-- corresponding set of differences is @Di@), assuming that we prune after every +-- step: -- -- +----------------+------------------------------------+------------------------------------------+ -- | lastFlushed | states | tableDiffs | @@ -362,27 +359,28 @@ reapplyThenPush :: DbChangelog l -> m (DbChangelog l) reapplyThenPush cfg ap ksReader db = - (\current' -> prune (LedgerDbPruneKeeping (ledgerDbCfgSecParam cfg)) $ extend current' db) + (\current' -> pruneToImmTipOnly $ extend current' db) <$> reapplyBlock (ledgerDbCfgComputeLedgerEvents cfg) (ledgerDbCfg cfg) ap ksReader db --- | Prune oldest ledger states until at we have at most @k@ in the DbChangelog, --- excluding the one stored at the anchor. +-- | Prune oldest ledger states according to the given 'LedgerDbPrune' strategy. -- -- +--------------+----------------------------+----------------------+ -- | lastFlushed | states | tableDiffs | -- +==============+============================+======================+ -- | @L0@ | @L0 :> [ L1, L2, L3, L4 ]@ | @[ D1, D2, D3, D4 ]@ | -- +--------------+----------------------------+----------------------+ --- | @>> prune (SecurityParam 3)@ | +-- | @>> prune (LedgerDbPruneBeforeSlot 3)@ | -- +--------------+----------------------------+----------------------+ -- | @L0@ | @L2 :> [ L3, L4 ]@ | @[ D1, D2, D3, D4 ]@ | -- +--------------+----------------------------+----------------------+ +-- +-- where the state @LX@ is from slot @X@. prune :: GetTip l => LedgerDbPrune -> DbChangelog l -> DbChangelog l -prune (LedgerDbPruneKeeping (SecurityParam k)) dblog = +prune LedgerDbPruneAll dblog = dblog{changelogStates = vol'} where DbChangelog{changelogStates} = dblog @@ -390,18 +388,15 @@ prune (LedgerDbPruneKeeping (SecurityParam k)) dblog = nvol = AS.length changelogStates vol' = - if toEnum nvol <= unNonZero k - then changelogStates - else snd $ AS.splitAt (nvol - fromEnum (unNonZero k)) changelogStates -prune LedgerDbPruneAll dblog = + snd $ AS.splitAt nvol changelogStates +prune (LedgerDbPruneBeforeSlot slot) dblog = dblog{changelogStates = vol'} where DbChangelog{changelogStates} = dblog - nvol = AS.length changelogStates - - vol' = - snd $ AS.splitAt nvol changelogStates + -- The anchor of @vol'@ might still have a tip slot smaller than @slot@, which + -- is fine to ignore (we will prune it later). + vol' = snd $ AS.splitAtMeasure (NotOrigin slot) changelogStates -- NOTE: we must inline 'prune' otherwise we get unexplained thunks in -- 'DbChangelog' and thus a space leak. Alternatively, we could disable the diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Forker.hs index aa1e162932..a018764611 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1/Forker.hs @@ -29,11 +29,9 @@ import qualified Data.Set as Set import GHC.Generics (Generic) import NoThunks.Class import Ouroboros.Consensus.Block -import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsProtocol import qualified Ouroboros.Consensus.Ledger.Tables.Diff as Diff -import Ouroboros.Consensus.Storage.LedgerDB.API import Ouroboros.Consensus.Storage.LedgerDB.Args import Ouroboros.Consensus.Storage.LedgerDB.Forker as Forker import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore @@ -46,6 +44,7 @@ import Ouroboros.Consensus.Storage.LedgerDB.V1.DiffSeq import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DiffSeq as DS import Ouroboros.Consensus.Storage.LedgerDB.V1.Lock import Ouroboros.Consensus.Util.IOLike +import qualified Ouroboros.Network.AnchoredSeq as AS {------------------------------------------------------------------------------- Forkers @@ -71,8 +70,6 @@ data ForkerEnv m l blk = ForkerEnv -- -- The anchor of this and 'foeChangelog' might get out of sync if diffs are -- flushed, but 'forkerCommit' will take care of this. - , foeSecurityParam :: !SecurityParam - -- ^ Config , foeTracer :: !(Tracer m TraceForkerEvent) -- ^ Config } @@ -314,14 +311,12 @@ implForkerPush env newState = do traceWith (foeTracer env) ForkerPushStart atomically $ do chlog <- readTVar (foeChangelog env) - let chlog' = - prune (LedgerDbPruneKeeping (foeSecurityParam env)) $ - extend newState chlog + let chlog' = extend newState chlog writeTVar (foeChangelog env) chlog' traceWith (foeTracer env) ForkerPushEnd implForkerCommit :: - (MonadSTM m, GetTip l, HasLedgerTables l) => + (MonadSTM m, GetTip l, StandardHash l, HasLedgerTables l) => ForkerEnv m l blk -> STM m () implForkerCommit env = do @@ -335,9 +330,17 @@ implForkerCommit env = do . pointSlot . getTip $ changelogLastFlushedState orig + -- The 'DbChangelog' might have gotten pruned in the meantime. + splitAfterOrigAnchor = + AS.splitAfterMeasure (pointSlot origAnchor) (either sameState sameState) + where + sameState = (origAnchor ==) . getTip + origAnchor = getTip $ anchor orig in DbChangelog { changelogLastFlushedState = changelogLastFlushedState orig - , changelogStates = changelogStates dblog + , changelogStates = case splitAfterOrigAnchor (changelogStates dblog) of + Nothing -> error "Forker chain does no longer intersect with selected chain." + Just (_, suffix) -> suffix , changelogDiffs = ltliftA2 (doPrune s) (changelogDiffs orig) (changelogDiffs dblog) } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs index b7c523d469..5188460233 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs @@ -16,7 +16,9 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2 (mkInitDb) where +import Cardano.Ledger.BaseTypes (unNonZero) import Control.Arrow ((>>>)) +import Control.Monad (join) import qualified Control.Monad as Monad (void, (>=>)) import Control.Monad.Except import Control.RAWLock @@ -89,10 +91,6 @@ mkInitDb args flavArgs getBlock = x pure y , currentTip = ledgerState . current - , pruneDb = \lseq -> do - let (rel, dbPrunedToImmDBTip) = pruneToImmTipOnly lseq - rel - pure dbPrunedToImmDBTip , mkLedgerDb = \lseq -> do varDB <- newTVarIO lseq prevApplied <- newTVarIO Set.empty @@ -173,7 +171,7 @@ implMkLedgerDb h bss = , getForkerAtTarget = newForkerAtTarget h , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied - , garbageCollect = \s -> getEnvSTM h (flip implGarbageCollect s) + , garbageCollect = \s -> getEnv h (flip implGarbageCollect s) , tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot bss) , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h @@ -209,8 +207,9 @@ mkInternals bss h = eFrk <- newForkerAtTarget h reg VolatileTip case eFrk of Left{} -> error "Unreachable, Volatile tip MUST be in LedgerDB" - Right frk -> + Right frk -> do forkerPush frk st >> atomically (forkerCommit frk) >> forkerClose frk + getEnv h pruneLedgerSeq , reapplyThenPushNOW = \blk -> getEnv h $ \env -> withRegistry $ \reg -> do eFrk <- newForkerAtTarget h reg VolatileTip case eFrk of @@ -225,6 +224,7 @@ mkInternals bss h = blk (st `withLedgerTables` tables) forkerPush frk st' >> atomically (forkerCommit frk) >> forkerClose frk + pruneLedgerSeq env , wipeLedgerDB = getEnv h $ destroySnapshots . ldbHasFS , closeLedgerDB = let LDBHandle tvar = h @@ -247,6 +247,10 @@ mkInternals bss h = InMemoryHandleArgs -> InMemory.takeSnapshot LSMHandleArgs x -> absurd x + pruneLedgerSeq :: LedgerDBEnv m (ExtLedgerState blk) blk -> m () + pruneLedgerSeq env = + join $ atomically $ stateTVar (ldbSeq env) $ pruneToImmTipOnly + -- | Testing only! Truncate all snapshots in the DB. implIntTruncateSnapshots :: MonadThrow m => SomeHasFS m -> m () implIntTruncateSnapshots sfs@(SomeHasFS fs) = do @@ -268,13 +272,13 @@ implGetVolatileTip :: (MonadSTM m, GetTip l) => LedgerDBEnv m l blk -> STM m (l EmptyMK) -implGetVolatileTip = fmap current . readTVar . ldbSeq +implGetVolatileTip = fmap current . getVolatileLedgerSeq implGetImmutableTip :: - MonadSTM m => + (MonadSTM m, GetTip l) => LedgerDBEnv m l blk -> STM m (l EmptyMK) -implGetImmutableTip = fmap anchor . readTVar . ldbSeq +implGetImmutableTip = fmap anchor . getVolatileLedgerSeq implGetPastLedgerState :: ( MonadSTM m @@ -284,7 +288,8 @@ implGetPastLedgerState :: , HeaderHash l ~ HeaderHash blk ) => LedgerDBEnv m l blk -> Point blk -> STM m (Maybe (l EmptyMK)) -implGetPastLedgerState env point = getPastLedgerAt point <$> readTVar (ldbSeq env) +implGetPastLedgerState env point = + getPastLedgerAt point <$> getVolatileLedgerSeq env implGetHeaderStateHistory :: ( MonadSTM m @@ -295,7 +300,7 @@ implGetHeaderStateHistory :: ) => LedgerDBEnv m l blk -> STM m (HeaderStateHistory blk) implGetHeaderStateHistory env = do - ldb <- readTVar (ldbSeq env) + ldb <- getVolatileLedgerSeq env let currentLedgerState = ledgerState $ current ldb -- This summary can convert all tip slots of the ledger states in the -- @ledgerDb@ as these are not newer than the tip slot of the current @@ -308,7 +313,8 @@ implGetHeaderStateHistory env = do pure . HeaderStateHistory . AS.bimap mkHeaderStateWithTime' mkHeaderStateWithTime' - $ getLedgerSeq ldb + . getLedgerSeq + $ ldb implValidate :: forall m l blk. @@ -345,12 +351,18 @@ implValidate h ldbEnv rr tr cache rollbacks hdrs = implGetPrevApplied :: MonadSTM m => LedgerDBEnv m l blk -> STM m (Set (RealPoint blk)) implGetPrevApplied env = readTVar (ldbPrevApplied env) --- | Remove all points with a slot older than the given slot from the set of --- previously applied points. -implGarbageCollect :: MonadSTM m => LedgerDBEnv m l blk -> SlotNo -> STM m () -implGarbageCollect env slotNo = - modifyTVar (ldbPrevApplied env) $ - Set.dropWhileAntitone ((< slotNo) . realPointSlot) +-- | Remove 'LedgerSeq' states older than the given slot, and all points with a +-- slot older than the given slot from the set of previously applied points. +implGarbageCollect :: (IOLike m, GetTip l) => LedgerDBEnv m l blk -> SlotNo -> m () +implGarbageCollect env slotNo = do + atomically $ + modifyTVar (ldbPrevApplied env) $ + Set.dropWhileAntitone ((< slotNo) . realPointSlot) + -- It is safe to close the handles outside of the locked region, which reduces + -- contention. See the docs of 'ldbOpenHandlesLock'. + join $ RAWLock.withWriteAccess (ldbOpenHandlesLock env) $ \() -> do + close <- atomically $ stateTVar (ldbSeq env) $ prune (LedgerDbPruneBeforeSlot slotNo) + pure (close, ()) implTryTakeSnapshot :: forall m l blk. @@ -473,7 +485,7 @@ data LedgerDBEnv m l blk = LedgerDBEnv -- while holding a write lock. See e.g. 'closeForkerEnv'. -- -- * Modify 'ldbSeq' while holding a write lock, and then close the removed - -- handles without any locking. + -- handles without any locking. See e.g. 'implGarbageCollect'. } deriving Generic @@ -563,21 +575,32 @@ getEnvSTM (LDBHandle varState) f = Acquiring consistent views -------------------------------------------------------------------------------} --- | Get a 'StateRef' from the 'LedgerSeq' in the 'LedgerDBEnv', with the --- 'LedgerTablesHandle' having been duplicated (such that the original can be --- closed). The caller is responsible for closing the handle. +-- | Take the suffix of the 'ldbSeq' containing the @k@ most recent states. The +-- 'LedgerSeq' can contain more than @k@ states if we adopted new blocks, but +-- garbage collection has not yet been run. +getVolatileLedgerSeq :: + (MonadSTM m, GetTip l) => LedgerDBEnv m l blk -> STM m (LedgerSeq m l) +getVolatileLedgerSeq env = + LedgerSeq . AS.anchorNewest k . getLedgerSeq <$> readTVar (ldbSeq env) + where + k = unNonZero $ maxRollbacks $ ledgerDbCfgSecParam $ ldbCfg env + +-- | Get a 'StateRef' from the 'LedgerSeq' (via 'getVolatileLedgerSeq') in the +-- 'LedgerDBEnv', with the 'LedgerTablesHandle' having been duplicated (such +-- that the original can be closed). The caller is responsible for closing the +-- handle. -- -- For more flexibility, an arbitrary 'Traversable' of the 'StateRef' can be -- returned; for the simple use case of getting a single 'StateRef', use @t ~ -- 'Solo'@. getStateRef :: - (IOLike m, Traversable t) => + (IOLike m, Traversable t, GetTip l) => LedgerDBEnv m l blk -> (LedgerSeq m l -> t (StateRef m l)) -> m (t (StateRef m l)) getStateRef ldbEnv project = RAWLock.withReadAccess (ldbOpenHandlesLock ldbEnv) $ \() -> do - tst <- project <$> readTVarIO (ldbSeq ldbEnv) + tst <- project <$> atomically (getVolatileLedgerSeq ldbEnv) for tst $ \st -> do tables' <- duplicate $ tables st pure st{tables = tables'} @@ -585,7 +608,7 @@ getStateRef ldbEnv project = -- | Like 'StateRef', but takes care of closing the handle when the given action -- returns or errors. withStateRef :: - (IOLike m, Traversable t) => + (IOLike m, Traversable t, GetTip l) => LedgerDBEnv m l blk -> (LedgerSeq m l -> t (StateRef m l)) -> (t (StateRef m l) -> m a) -> @@ -762,7 +785,6 @@ newForker h ldbEnv rr st = do ForkerEnv { foeLedgerSeq = lseqVar , foeSwitchVar = ldbSeq ldbEnv - , foeSecurityParam = ledgerDbCfgSecParam $ ldbCfg ldbEnv , foeTracer = tr , foeResourcesToRelease = (ldbOpenHandlesLock ldbEnv, k, toRelease) } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs index 3a2e7f8940..76c076ca6f 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs @@ -26,7 +26,6 @@ import Data.Maybe (fromMaybe) import GHC.Generics import NoThunks.Class import Ouroboros.Consensus.Block -import Ouroboros.Consensus.Config import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Ledger.Tables.Utils @@ -49,8 +48,6 @@ data ForkerEnv m l blk = ForkerEnv -- ^ Local version of the LedgerSeq , foeSwitchVar :: !(StrictTVar m (LedgerSeq m l)) -- ^ This TVar is the same as the LedgerDB one - , foeSecurityParam :: !SecurityParam - -- ^ Config , foeTracer :: !(Tracer m TraceForkerEvent) -- ^ Config , foeResourcesToRelease :: !(RAWLock m (), ResourceKey m, StrictTVar m (m ())) @@ -154,10 +151,7 @@ implForkerCommit env = do (olddb', toClose) <- AS.splitAfterMeasure intersectionSlot (either predicate predicate) olddb -- Join the prefix of the selection with the sequence in the forker newdb <- AS.join (const $ const True) olddb' lseq - -- Prune the resulting sequence to keep @k@ states - let (closePruned, s) = prune (LedgerDbPruneKeeping (foeSecurityParam env)) (LedgerSeq newdb) - closeDiscarded = do - closePruned + let closeDiscarded = do -- Do /not/ close the anchor of @toClose@, as that is also the -- tip of @olddb'@ which will be used in @newdb@. case toClose of @@ -166,7 +160,7 @@ implForkerCommit env = do -- Finally, close the anchor of @lseq@ (which is a duplicate of -- the head of @olddb'@). close $ tables $ AS.anchor lseq - pure (closeDiscarded, s) + pure (closeDiscarded, LedgerSeq newdb) ) -- We are discarding the previous value in the TVar because we had accumulated diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs index fdf3b75207..52719cc453 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs @@ -14,7 +14,6 @@ {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE UndecidableInstances #-} -{-# LANGUAGE ViewPatterns #-} -- | The data structure that holds the cached ledger states. module Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq @@ -206,7 +205,7 @@ reapplyThenPush :: LedgerSeq m l -> m (m (), LedgerSeq m l) reapplyThenPush rr cfg ap db = - (\current' -> prune (LedgerDbPruneKeeping (ledgerDbCfgSecParam cfg)) $ extend current' db) + (\current' -> pruneToImmTipOnly $ extend current' db) <$> reapplyBlock (ledgerDbCfgComputeLedgerEvents cfg) (ledgerDbCfg cfg) ap rr db reapplyBlock :: @@ -229,32 +228,33 @@ reapplyBlock evs cfg b _rr db = do pushDiffs newtbs st st' pure (StateRef newst newtbs) --- | Prune older ledger states until at we have at most @k@ volatile states in --- the LedgerDB, plus the one stored at the anchor. +-- | Prune older ledger states according to the given 'LedgerDbPrune' strategy. -- -- The @fst@ component of the returned value is an action closing the pruned -- ledger states. -- -- >>> ldb = LedgerSeq $ AS.fromOldestFirst l0 [l1, l2, l3] -- >>> ldb' = LedgerSeq $ AS.fromOldestFirst l1 [l2, l3] --- >>> snd (prune (LedgerDbPruneKeeping (SecurityParam (unsafeNonZero 2))) ldb) == ldb' +-- >>> snd (prune (LedgerDbPruneBeforeSlot 1) ldb) == ldb' -- True +-- +-- where @lX@ is a ledger state from slot @X-1@ (or 'Origin' for @l0@). prune :: (Monad m, GetTip l) => LedgerDbPrune -> LedgerSeq m l -> (m (), LedgerSeq m l) prune howToPrune (LedgerSeq ldb) = case howToPrune of - LedgerDbPruneKeeping (SecurityParam (fromEnum . unNonZero -> k)) - | nvol <= k -> (pure (), LedgerSeq ldb) - | otherwise -> (closeButHead before, LedgerSeq after) - where - nvol = AS.length ldb - (before, after) = AS.splitAt (nvol - k) ldb LedgerDbPruneAll -> (closeButHead before, LedgerSeq after) where (before, after) = (ldb, AS.Empty (AS.headAnchor ldb)) + LedgerDbPruneBeforeSlot slot -> + (closeButHead before, LedgerSeq after) + where + -- The anchor of @vol'@ might still have a tip slot older than @slot@, which + -- is fine to ignore (we will prune it later). + (before, after) = AS.splitAtMeasure (NotOrigin slot) ldb where -- Above, we split @ldb@ into two sequences @before@ and @after@ such that -- @AS.headAnchor before == AS.anchor after@. We want to close all handles of @@ -292,15 +292,7 @@ extend newState = Reset -------------------------------------------------------------------------------} --- | When creating a new @LedgerDB@, we should load whichever snapshot we find --- and then replay the chain up to the immutable tip. When we get there, the --- @LedgerDB@ will have a @k@-long sequence of states, which all come from --- immutable blocks, so we just prune all of them and only keep the last one as --- an anchor, as it is the immutable tip. Then we can proceed with opening the --- VolatileDB. --- --- If we didn't do this step, the @LedgerDB@ would accept rollbacks into the --- immutable part of the chain, which must never be possible. +-- | Set the volatile tip as the immutable tip and prune all older states. -- -- >>> ldb = LedgerSeq $ AS.fromOldestFirst l0 [l1, l2, l3] -- >>> LedgerSeq ldb' = snd $ pruneToImmTipOnly ldb diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/Model.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/Model.hs index 5b352d2bc3..3369265f5e 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/Model.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/Model.hs @@ -33,7 +33,6 @@ module Test.Ouroboros.Storage.ChainDB.Model , getBlock , getBlockByPoint , getBlockComponentByPoint - , getDbChangelog , getIsValid , getLoEFragment , getMaxSlotNo @@ -84,11 +83,7 @@ module Test.Ouroboros.Storage.ChainDB.Model , wipeVolatileDB ) where -import Cardano.Ledger.BaseTypes - ( knownNonZeroBounded - , nonZeroOr - , unNonZero - ) +import Cardano.Ledger.BaseTypes (unNonZero) import Codec.Serialise (Serialise, serialise) import Control.Monad (unless) import Control.Monad.Except (runExcept) @@ -129,11 +124,6 @@ import Ouroboros.Consensus.Storage.ChainDB.API ) import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel (olderThanK) import Ouroboros.Consensus.Storage.Common () -import Ouroboros.Consensus.Storage.LedgerDB.API - ( LedgerDbCfgF (..) - , LedgerDbPrune (..) - ) -import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DbChangelog as DbChangelog import Ouroboros.Consensus.Util (repeatedly) import qualified Ouroboros.Consensus.Util.AnchoredFragment as Fragment import Ouroboros.Consensus.Util.IOLike (MonadSTM) @@ -375,35 +365,6 @@ isValid :: Maybe Bool isValid = flip getIsValid -getDbChangelog :: - (LedgerSupportsProtocol blk, LedgerTablesAreTrivial (LedgerState blk)) => - TopLevelConfig blk -> - Model blk -> - DbChangelog.DbChangelog' blk -getDbChangelog cfg m@Model{..} = - DbChangelog.prune tip - . DbChangelog.reapplyThenPushMany' ledgerDbCfg blks - $ DbChangelog.empty initLedger - where - blks = Chain.toOldestFirst $ currentChain m - - k = configSecurityParam cfg - - ledgerDbCfg = - LedgerDbCfg - { ledgerDbCfgSecParam = k - , ledgerDbCfg = ExtLedgerCfg cfg - , ledgerDbCfgComputeLedgerEvents = OmitLedgerEvents - } - - tip = - case maxActualRollback k m of - 0 -> LedgerDbPruneAll - n -> - -- Since we know that @`n`@ is not zero, it is impossible for `nonZeroOr` - -- to return a `Nothing` and the final result to have default value of @`1`@. - LedgerDbPruneKeeping $ SecurityParam $ nonZeroOr n $ knownNonZeroBounded @1 - getLoEFragment :: Model blk -> LoE (AnchoredFragment blk) getLoEFragment = loeFragment diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs index 308141d5bb..85ce9ac039 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs @@ -302,7 +302,13 @@ instance StateModel Model where min (fromIntegral . AS.length $ chain) (BT.unNonZero $ maxRollbacks secParam) - numRollback <- QC.choose (0, maxRollback) + numRollback <- + frequency + [ (10, QC.choose (0, maxRollback)) + , -- Sometimes generate invalid 'ValidateAndCommit's for + -- negative testing. + (1, QC.choose (maxRollback + 1, maxRollback + 5)) + ] numNewBlocks <- QC.choose (numRollback, numRollback + 2) let chain' = case modelRollback numRollback model of @@ -371,6 +377,9 @@ instance StateModel Model where precondition _ Init{} = False precondition _ _ = True + validFailingAction Model{} ValidateAndCommit{} = True + validFailingAction _ _ = False + {------------------------------------------------------------------------------- Mocked ChainDB -------------------------------------------------------------------------------} @@ -527,22 +536,29 @@ data Environment (IO NumOpenHandles) (IO ()) +data LedgerDBError = ErrorValidateExceededRollback + instance RunModel Model (StateT Environment IO) where + type Error Model (StateT Environment IO) = LedgerDBError + perform _ (Init secParam) _ = do Environment _ _ chainDb mkArgs fs _ cleanup <- get (ldb, testInternals, getNumOpenHandles) <- lift $ do let args = mkArgs secParam openLedgerDB (argFlavorArgs args) chainDb (argLedgerDbCfg args) fs put (Environment ldb testInternals chainDb mkArgs fs getNumOpenHandles cleanup) + pure $ pure () perform _ WipeLedgerDB _ = do Environment _ testInternals _ _ _ _ _ <- get lift $ wipeLedgerDB testInternals + pure $ pure () perform _ GetState _ = do Environment ldb _ _ _ _ _ _ <- get - lift $ atomically $ (,) <$> getImmutableTip ldb <*> getVolatileTip ldb + lift $ fmap pure $ atomically $ (,) <$> getImmutableTip ldb <*> getVolatileTip ldb perform _ ForceTakeSnapshot _ = do Environment _ testInternals _ _ _ _ _ <- get lift $ takeSnapshotNOW testInternals TakeAtImmutableTip Nothing + pure $ pure () perform _ (ValidateAndCommit n blks) _ = do Environment ldb _ chainDb _ _ _ _ <- get lift $ do @@ -558,7 +574,8 @@ instance RunModel Model (StateT Environment IO) where (reverse (map blockRealPoint blks) ++) . drop (fromIntegral n) atomically (forkerCommit forker) forkerClose forker - ValidateExceededRollBack{} -> error "Unexpected Rollback" + pure $ pure () + ValidateExceededRollBack{} -> pure $ Left ErrorValidateExceededRollback ValidateLedgerError (AnnLedgerError forker _ _) -> forkerClose forker >> error "Unexpected ledger error" perform state@(Model _ secParam) (DropAndRestore n) lk = do Environment _ testInternals chainDb _ _ _ _ <- get @@ -569,6 +586,7 @@ instance RunModel Model (StateT Environment IO) where perform _ TruncateSnapshots _ = do Environment _ testInternals _ _ _ _ _ <- get lift $ truncateSnapshots testInternals + pure $ pure () perform UnInit _ _ = error "Uninitialized model created a command different than Init" monitoring _ (ValidateAndCommit n _) _ _ = tabulate "Rollback depths" [show n] @@ -602,6 +620,11 @@ instance RunModel Model (StateT Environment IO) where pure $ volSt == vol && immSt == imm postcondition _ _ _ _ = pure True + postconditionOnFailure _ ValidateAndCommit{} _ res = case res of + Right () -> False <$ counterexamplePost "Unexpected success on invalid ValidateAndCommit" + Left ErrorValidateExceededRollback -> pure True + postconditionOnFailure _ _ _ _ = pure True + {------------------------------------------------------------------------------- Additional checks -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/V1/DbChangelog.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/V1/DbChangelog.hs index c423167baa..00d5cee279 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/V1/DbChangelog.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/V1/DbChangelog.hs @@ -31,7 +31,7 @@ -- * etc. module Test.Ouroboros.Storage.LedgerDB.V1.DbChangelog (tests) where -import Cardano.Ledger.BaseTypes (NonZero (..), unsafeNonZero) +import Cardano.Ledger.BaseTypes (NonZero (..)) import Cardano.Slotting.Slot (WithOrigin (..)) import Control.Monad hiding (ap) import Control.Monad.Trans.Class (lift) @@ -85,15 +85,12 @@ tests = , testGroup "Push" [ testProperty "expectedLedger" prop_pushExpectedLedger - , testProperty "pastLedger" prop_pastLedger ] , testGroup "Rollback" [ testProperty "maxRollbackGenesisZero" prop_maxRollbackGenesisZero - , testProperty "ledgerDbMaxRollback" prop_snapshotsMaxRollback , testProperty "switchSameChain" prop_switchSameChain , testProperty "switchExpectedLedger" prop_switchExpectedLedger - , testProperty "pastAfterSwitch" prop_pastAfterSwitch ] , testProperty "flushing" $ withMaxSuccess samples $ @@ -117,8 +114,8 @@ tests = ] , testProperty "extending adds head to volatile states" $ withMaxSuccess samples prop_extendingAdvancesTipOfVolatileStates - , testProperty "pruning leaves at most maxRollback volatile states" $ - withMaxSuccess samples prop_pruningLeavesAtMostMaxRollbacksVolatileStates + , testProperty "pruning before a slot works as expected" $ + withMaxSuccess samples prop_pruningBeforeSlotCorrectness ] {------------------------------------------------------------------------------- @@ -151,28 +148,6 @@ prop_pushExpectedLedger setup@ChainSetup{..} = cfg :: LedgerConfig TestBlock.TestBlock cfg = ledgerDbCfg (csBlockConfig setup) -prop_pastLedger :: ChainSetup -> Property -prop_pastLedger setup@ChainSetup{..} = - classify (chainSetupSaturated setup) "saturated" $ - classify withinReach "within reach" $ - getPastLedgerAt tip csPushed - === if withinReach - then Just (current afterPrefix) - else Nothing - where - prefix :: [TestBlock.TestBlock] - prefix = take (fromIntegral csPrefixLen) csChain - - tip :: Point TestBlock.TestBlock - tip = maybe GenesisPoint blockPoint (lastMaybe prefix) - - afterPrefix :: DbChangelog (LedgerState TestBlock.TestBlock) - afterPrefix = reapplyThenPushMany' (csBlockConfig setup) prefix csGenSnaps - - -- See 'prop_snapshotsMaxRollback' - withinReach :: Bool - withinReach = (csNumBlocks - csPrefixLen) <= maxRollback csPushed - {------------------------------------------------------------------------------- Rollback -------------------------------------------------------------------------------} @@ -182,18 +157,6 @@ prop_maxRollbackGenesisZero = maxRollback (empty (convertMapKind TestBlock.testInitLedger)) === 0 -prop_snapshotsMaxRollback :: ChainSetup -> Property -prop_snapshotsMaxRollback setup@ChainSetup{..} = - classify (chainSetupSaturated setup) "saturated" $ - conjoin - [ if chainSetupSaturated setup - then (maxRollback csPushed) `ge` unNonZero k - else (maxRollback csPushed) `ge` (min (unNonZero k) csNumBlocks) - , (maxRollback csPushed) `le` unNonZero k - ] - where - SecurityParam k = csSecParam - prop_switchSameChain :: SwitchSetup -> Property prop_switchSameChain setup@SwitchSetup{..} = classify (switchSetupSaturated setup) "saturated" $ @@ -219,29 +182,6 @@ prop_switchExpectedLedger setup@SwitchSetup{..} = cfg :: LedgerConfig TestBlock.TestBlock cfg = ledgerDbCfg (csBlockConfig ssChainSetup) --- | Check 'prop_pastLedger' still holds after switching to a fork -prop_pastAfterSwitch :: SwitchSetup -> Property -prop_pastAfterSwitch setup@SwitchSetup{..} = - classify (switchSetupSaturated setup) "saturated" $ - classify withinReach "within reach" $ - getPastLedgerAt tip ssSwitched - === if withinReach - then Just (current afterPrefix) - else Nothing - where - prefix :: [TestBlock.TestBlock] - prefix = take (fromIntegral ssPrefixLen) ssChain - - tip :: Point TestBlock.TestBlock - tip = maybe GenesisPoint blockPoint (lastMaybe prefix) - - afterPrefix :: DbChangelog (LedgerState TestBlock.TestBlock) - afterPrefix = reapplyThenPushMany' (csBlockConfig ssChainSetup) prefix (csGenSnaps ssChainSetup) - - -- See 'prop_snapshotsMaxRollback' - withinReach :: Bool - withinReach = (ssNumBlocks - ssPrefixLen) <= maxRollback ssSwitched - {------------------------------------------------------------------------------- Test setup -------------------------------------------------------------------------------} @@ -442,7 +382,7 @@ data DbChangelogTestSetup = DbChangelogTestSetup , dbChangelogStartsAt :: WithOrigin SlotNo } -data Operation l = Extend (l DiffMK) | Prune LedgerDbPrune +data Operation l = Extend (l DiffMK) | Prune deriving instance Show (l DiffMK) => Show (Operation l) data DbChangelogTestSetupWithRollbacks = DbChangelogTestSetupWithRollbacks @@ -507,7 +447,7 @@ applyOperations :: applyOperations ops dblog = foldr' apply' dblog ops where apply' (Extend newState) dblog' = DbChangelog.extend newState dblog' - apply' (Prune sp) dblog' = DbChangelog.prune sp dblog' + apply' Prune dblog' = DbChangelog.pruneToImmTipOnly dblog' {------------------------------------------------------------------------------- Properties @@ -549,14 +489,40 @@ prop_rollbackAfterExtendIsNoop setup (Positive n) = where dblog = resultingDbChangelog setup --- | The number of volatile states left after pruning is at most the maximum number of rollbacks. -prop_pruningLeavesAtMostMaxRollbacksVolatileStates :: - DbChangelogTestSetup -> SecurityParam -> Property -prop_pruningLeavesAtMostMaxRollbacksVolatileStates setup sp@(SecurityParam k) = - property $ AS.length (DbChangelog.changelogStates dblog') <= fromIntegral (unNonZero k) +-- | When pruning after a slot, all (non-anchor) states are not older than this +-- slot, and the anchor /is/ older (unless nothing was pruned). +prop_pruningBeforeSlotCorrectness :: + DbChangelogTestSetup -> Property +prop_pruningBeforeSlotCorrectness setup = + counterexample ("dblog: " <> show dblog) $ forAll genPruneSlot $ \pruneSlot -> + let dblog' = DbChangelog.prune (LedgerDbPruneBeforeSlot pruneSlot) dblog + in counterexample ("pruned dblog: " <> show dblog') $ + conjoin + [ counterexample "State not pruned unexpectedly" $ + conjoin + [ (NotOrigin pruneSlot `le` getTipSlot st) + | (_, st) <- + DbChangelog.snapshots dblog' + ] + , counterexample "Anchor too old" $ + let nothingPruned = DbChangelog.maxRollback dblog == DbChangelog.maxRollback dblog' + in if nothingPruned + then property () + else + getTipSlot (DbChangelog.anchor dblog') `lt` NotOrigin pruneSlot + ] where dblog = resultingDbChangelog setup - dblog' = DbChangelog.prune (LedgerDbPruneKeeping sp) dblog + + genPruneSlot = chooseEnum (lb, ub) + where + jitter = 5 + lb + | anchorSlot >= jitter = anchorSlot - jitter + | otherwise = 0 + where + anchorSlot = succWithOrigin $ getTipSlot $ DbChangelog.anchor dblog + ub = succWithOrigin (pointSlot (DbChangelog.tip dblog)) + jitter -- | The rollbackToAnchor function rolls back all volatile states. prop_rollbackToAnchorIsRollingBackVolatileStates :: DbChangelogTestSetup -> Property @@ -628,19 +594,9 @@ genOperations slotNo nOps = gosOps <$> execStateT (replicateM_ nOps genOperation genOperation :: StateT GenOperationsState Gen () genOperation = do - op <- frequency' [(1, genPrune), (10, genExtend)] + op <- frequency' [(1, pure Prune), (20, genExtend)] modify' $ \st -> st{gosOps = op : gosOps st} - genPrune :: StateT GenOperationsState Gen (Operation TestLedger) - genPrune = - Prune - <$> lift - ( oneof - [ pure LedgerDbPruneAll - , LedgerDbPruneKeeping . SecurityParam . unsafeNonZero <$> chooseEnum (1, 10) - ] - ) - genExtend :: StateT GenOperationsState Gen (Operation TestLedger) genExtend = do nextSlotNo <- advanceSlotNo =<< lift (chooseEnum (1, 5))