Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Breaking

- Changed pruning of immutable ledger states to happen on LedgerDB garbage
collection instead of directly on every block adoption. This is purely an
internal refactoring (with breaking API changes) supporting predictable ledger
snapshotting.

- Avoid maintaining volatile ledger states during ledger replay, making it
slightly more efficient.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
let testing =
Internal
{ intCopyToImmutableDB = getEnv h (withFuse copyTestFuse . Background.copyToImmutableDB)
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intGarbageCollect = \slot -> getEnv h $ \e -> do
Background.garbageCollectBlocks e slot
LedgerDB.garbageCollect (cdbLedgerDB e) slot
, intTryTakeSnapshot = getEnv h $ \env' ->
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
Expand All @@ -19,11 +20,10 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
launchBgTasks

-- * Copying blocks from the VolatileDB to the ImmutableDB
, copyAndSnapshotRunner
, copyToImmutableDB

-- * Executing garbage collection
, garbageCollect
, garbageCollectBlocks

-- * Scheduling garbage collections
, GcParams (..)
Expand Down Expand Up @@ -76,6 +76,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF

Expand All @@ -99,17 +100,30 @@ launchBgTasks cdb@CDB{..} replayed = do
!addBlockThread <-
launch "ChainDB.addBlockRunner" $
addBlockRunner cdbChainSelFuse cdb

ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
!ledgerDbMaintenaceThread <-
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger

gcSchedule <- newGcSchedule
!gcThread <-
launch "ChainDB.gcScheduleRunner" $
launch "ChainDB.gcBlocksScheduleRunner" $
gcScheduleRunner gcSchedule $
garbageCollect cdb
!copyAndSnapshotThread <-
launch "ChainDB.copyAndSnapshotRunner" $
copyAndSnapshotRunner cdb gcSchedule replayed cdbCopyFuse
garbageCollectBlocks cdb

!copyToImmutableDBThread <-
launch "ChainDB.copyToImmutableDBRunner" $
copyToImmutableDBRunner cdb ledgerDbTasksTrigger gcSchedule cdbCopyFuse

atomically $
writeTVar cdbKillBgThreads $
sequence_ [addBlockThread, gcThread, copyAndSnapshotThread]
sequence_
[ addBlockThread
, cancelThread ledgerDbMaintenaceThread
, gcThread
, copyToImmutableDBThread
]
where
launch :: String -> m Void -> m (m ())
launch = fmap cancelThread .: forkLinkedThread cdbRegistry
Expand Down Expand Up @@ -198,22 +212,18 @@ copyToImmutableDB CDB{..} = electric $ do
_ -> error "header to remove not on the current chain"

{-------------------------------------------------------------------------------
Snapshotting
Copy to ImmutableDB
-------------------------------------------------------------------------------}

-- | Copy blocks from the VolatileDB to ImmutableDB and take snapshots of the
-- LedgerDB
-- | Copy blocks from the VolatileDB to ImmutableDB and trigger further tasks in
-- other threads.
--
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
-- the headers older than @k@ are copied from the VolatileDB to the ImmutableDB
-- (using 'copyToImmutableDB'). Once that is complete,
--
-- * We periodically take a snapshot of the LedgerDB (depending on its config).
-- When enough blocks (depending on its config) have been replayed during
-- startup, a snapshot of the replayed LedgerDB will be written to disk at the
-- start of this function. NOTE: After this initial snapshot we do not take a
-- snapshot of the LedgerDB until the chain has changed again, irrespective of
-- the LedgerDB policy.
-- * Trigger LedgerDB maintenance tasks, namely flushing, taking snapshots and
-- garbage collection.
--
-- * Schedule GC of the VolatileDB ('scheduleGC') for the 'SlotNo' of the most
-- recent block that was copied.
Expand All @@ -228,32 +238,26 @@ copyToImmutableDB CDB{..} = electric $ do
-- GC can happen, when we restart the node and schedule the /next/ GC, it will
-- /imply/ any previously scheduled GC, since GC is driven by slot number
-- ("garbage collect anything older than @x@").
copyAndSnapshotRunner ::
copyToImmutableDBRunner ::
forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
) =>
ChainDbEnv m blk ->
LedgerDbTasksTrigger m ->
GcSchedule m ->
-- | Number of immutable blocks replayed on ledger DB startup
Word64 ->
Fuse m ->
m Void
copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
-- this first flush will persist the differences that come from the initial
-- chain selection.
LedgerDB.tryFlush cdbLedgerDB
loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB Nothing replayed
forever copyAndTrigger
where
SecurityParam k = configSecurityParam cdbTopLevelConfig

loop :: LedgerDB.SnapCounters -> m Void
loop counters = do
let LedgerDB.SnapCounters
{ prevSnapshotTime
, ntBlocksSinceLastSnap
} = counters

copyAndTrigger :: m ()
copyAndTrigger = do
-- Wait for the chain to grow larger than @k@
numToWrite <- atomically $ do
curChain <- icWithoutTime <$> readTVar cdbChain
Expand All @@ -264,14 +268,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
--
-- This is a synchronous operation: when it returns, the blocks have been
-- copied to disk (though not flushed, necessarily).
withFuse fuse (copyToImmutableDB cdb) >>= scheduleGC'
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)

LedgerDB.tryFlush cdbLedgerDB

now <- getMonotonicTime
let ntBlocksSinceLastSnap' = ntBlocksSinceLastSnap + numToWrite

loop =<< LedgerDB.tryTakeSnapshot cdbLedgerDB ((,now) <$> prevSnapshotTime) ntBlocksSinceLastSnap'
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
scheduleGC' gcSlotNo

scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' Origin = return ()
Expand All @@ -285,16 +285,104 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
}
gcSchedule

{-------------------------------------------------------------------------------
LedgerDB maintenance tasks
-------------------------------------------------------------------------------}

-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
-- DB tip slot advances when we finish copying blocks to it.
newtype LedgerDbTasksTrigger m
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)

data LedgerDbTaskState = LedgerDbTaskState
{ ldbtsImmTip :: !(WithOrigin SlotNo)
, ldbtsPrevSnapshotTime :: !(Maybe Time)
, ldbtsBlocksSinceLastSnapshot :: !Word64
}
deriving stock Generic
deriving anyclass NoThunks

newLedgerDbTasksTrigger ::
IOLike m =>
-- | Number of blocks replayed.
Word64 ->
m (LedgerDbTasksTrigger m)
newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
where
st =
LedgerDbTaskState
{ ldbtsImmTip = Origin
, ldbtsPrevSnapshotTime = Nothing
, ldbtsBlocksSinceLastSnapshot = replayed
}

triggerLedgerDbTasks ::
forall m.
IOLike m =>
LedgerDbTasksTrigger m ->
-- | New tip of the ImmutableDB.
WithOrigin SlotNo ->
-- | Number of blocks written to the ImmutableDB.
Word64 ->
m ()
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
atomically $ modifyTVar varSt $ \st ->
st
{ ldbtsImmTip = immTip
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
}

-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
--
-- * Flushing of differences.
-- * Taking snapshots.
-- * Garbage collection.
ledgerDbTaskWatcher ::
forall m blk.
IOLike m =>
ChainDbEnv m blk ->
LedgerDbTasksTrigger m ->
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) =
Watcher
{ wFingerprint = ldbtsImmTip
, wInitial = Nothing
, wReader = readTVar varSt
, wNotify =
\LedgerDbTaskState
{ ldbtsImmTip
, ldbtsBlocksSinceLastSnapshot = blocksSinceLast
, ldbtsPrevSnapshotTime = prevSnapTime
} ->
whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do
LedgerDB.tryFlush cdbLedgerDB

now <- getMonotonicTime
LedgerDB.SnapCounters
{ prevSnapshotTime
, ntBlocksSinceLastSnap
} <-
LedgerDB.tryTakeSnapshot
cdbLedgerDB
((,now) <$> prevSnapTime)
blocksSinceLast
atomically $ modifyTVar varSt $ \st ->
st
{ ldbtsBlocksSinceLastSnapshot =
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
, ldbtsPrevSnapshotTime = prevSnapshotTime
}

LedgerDB.garbageCollect cdbLedgerDB slotNo
}

{-------------------------------------------------------------------------------
Executing garbage collection
-------------------------------------------------------------------------------}

-- | Trigger a garbage collection for blocks older than the given 'SlotNo' on
-- the VolatileDB.
--
-- Also removes the corresponding cached "previously applied points" from the
-- LedgerDB.
--
-- This is thread-safe as the VolatileDB locks itself while performing a GC.
--
-- When calling this function it is __critical__ that the blocks that will be
Expand All @@ -304,11 +392,10 @@ copyAndSnapshotRunner cdb@CDB{..} gcSchedule replayed fuse = do
--
-- TODO will a long GC be a bottleneck? It will block any other calls to
-- @putBlock@ and @getBlock@.
garbageCollect :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollect CDB{..} slotNo = do
garbageCollectBlocks :: forall m blk. IOLike m => ChainDbEnv m blk -> SlotNo -> m ()
garbageCollectBlocks CDB{..} slotNo = do
VolatileDB.garbageCollect cdbVolatileDB slotNo
atomically $ do
LedgerDB.garbageCollect cdbLedgerDB slotNo
modifyTVar cdbInvalid $ fmap $ Map.filter ((>= slotNo) . invalidBlockSlotNo)
traceWith cdbTracer $ TraceGCEvent $ PerformedGC slotNo

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,15 @@ data LedgerDB m l blk = LedgerDB
-- back as many blocks as the passed @Word64@.
, getPrevApplied :: STM m (Set (RealPoint blk))
-- ^ Get the references to blocks that have previously been applied.
, garbageCollect :: SlotNo -> STM m ()
-- ^ Garbage collect references to old blocks that have been previously
-- applied and committed.
, garbageCollect :: SlotNo -> m ()
-- ^ Garbage collect references to old state that is older than the given
-- slot.
--
-- Concretely, this affects:
--
-- * Ledger states (and potentially underlying handles for on-disk storage).
--
-- * The set of previously applied points.
, tryTakeSnapshot ::
l ~ ExtLedgerState blk =>
Maybe (Time, Time) ->
Expand Down Expand Up @@ -298,7 +304,14 @@ data TestInternals m l blk = TestInternals
{ wipeLedgerDB :: m ()
, takeSnapshotNOW :: WhereToTakeSnapshot -> Maybe String -> m ()
, push :: ExtLedgerState blk DiffMK -> m ()
-- ^ Push a ledger state, and prune the 'LedgerDB' to its immutable tip.
--
-- This does not modify the set of previously applied points.
, reapplyThenPushNOW :: blk -> m ()
-- ^ Apply block to the tip ledger state (using reapplication), and prune the
-- 'LedgerDB' to its immutable tip.
--
-- This does not modify the set of previously applied points.
, truncateSnapshots :: m ()
, closeLedgerDB :: m ()
, getNumLedgerTablesHandles :: m Word64
Expand Down Expand Up @@ -456,11 +469,10 @@ data InitDB db m blk = InitDB
-- ^ Closing the database, to be reopened again with a different snapshot or
-- with the genesis state.
, initReapplyBlock :: !(LedgerDbCfg (ExtLedgerState blk) -> blk -> db -> m db)
-- ^ Reapply a block from the immutable DB when initializing the DB.
-- ^ Reapply a block from the immutable DB when initializing the DB. Prune the
-- LedgerDB such that there are no volatile states.
, currentTip :: !(db -> LedgerState blk EmptyMK)
-- ^ Getting the current tip for tracing the Ledger Events.
, pruneDb :: !(db -> m db)
-- ^ Prune the database so that no immutable states are considered volatile.
, mkLedgerDb ::
!(db -> m (LedgerDB m (ExtLedgerState blk) blk, TestInternals m (ExtLedgerState blk) blk))
-- ^ Create a LedgerDB from the initialized data structures from previous
Expand Down Expand Up @@ -545,13 +557,7 @@ initialize
Left err -> do
closeDb initDb
error $ "Invariant violation: invalid immutable chain " <> show err
Right (db, replayed) -> do
db' <- pruneDb dbIface db
return
( acc InitFromGenesis
, db'
, replayed
)
Right (db, replayed) -> return (acc InitFromGenesis, db, replayed)
tryNewestFirst acc (s : ss) = do
eInitDb <- initFromSnapshot s
case eInitDb of
Expand Down Expand Up @@ -603,9 +609,7 @@ initialize
Monad.when (diskSnapshotIsTemporary s) $ deleteSnapshot hasFS s
closeDb initDb
tryNewestFirst (acc . InitFailure s err) ss
Right (db, replayed) -> do
db' <- pruneDb dbIface db
return (acc (InitFromSnapshot s pt), db', replayed)
Right (db, replayed) -> return (acc (InitFromSnapshot s pt), db, replayed)

replayTracer' =
decorateReplayTracerWithGoal
Expand Down Expand Up @@ -775,10 +779,10 @@ type LedgerSupportsLedgerDB blk =
-------------------------------------------------------------------------------}

-- | Options for prunning the LedgerDB
--
-- Rather than using a plain `Word64` we use this to be able to distinguish that
-- we are indeed using
-- 1. @0@ in places where it is necessary
-- 2. the security parameter as is, in other places
data LedgerDbPrune = LedgerDbPruneAll | LedgerDbPruneKeeping SecurityParam
data LedgerDbPrune
= -- | Prune all states, keeping only the current tip.
LedgerDbPruneAll
| -- | Prune such that all (non-anchor) states are not older than the given
-- slot.
LedgerDbPruneBeforeSlot SlotNo
deriving Show
Loading
Loading