diff --git a/config_builder.go b/config_builder.go index e47b29b7d8..a413369ab9 100644 --- a/config_builder.go +++ b/config_builder.go @@ -1034,6 +1034,9 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( chanGraphOpts := []graphdb.ChanGraphOption{ graphdb.WithUseGraphCache(!cfg.DB.NoGraphCache), + graphdb.WithAsyncGraphCachePopulation( + !cfg.DB.SyncGraphCacheLoad, + ), } // We want to pre-allocate the channel graph cache according to what we diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index b1e73f0988..a00076c193 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -128,8 +128,16 @@ reader of a payment request. to improve readability and maintainability of the code. ## Breaking Changes + ## Performance Improvements +* Let the [channel graph cache be populated + asynchronously](https://github.com/lightningnetwork/lnd/pull/10065) on + startup. While the cache is being populated, the graph is still available for + queries, but all read queries will be served from the database until the cache + is fully populated. This new behaviour can be opted out of via the new + `--db.sync-graph-cache-load` option. + ## Deprecations ### ⚠️ **Warning:** The following RPCs will be removed in release version **0.21**: diff --git a/graph/db/graph.go b/graph/db/graph.go index a507157745..4bb8c62b7b 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -31,7 +31,13 @@ type ChannelGraph struct { started atomic.Bool stopped atomic.Bool - graphCache *GraphCache + opts *chanGraphOptions + + // cacheLoaded is true if the initial graphCache population has + // finished. We use this to ensure that when performing any reads, + // we only read from the graphCache if it has been fully populated. + cacheLoaded atomic.Bool + graphCache *GraphCache V1Store *topologyManager @@ -50,6 +56,7 @@ func NewChannelGraph(v1Store V1Store, } g := &ChannelGraph{ + opts: opts, V1Store: v1Store, topologyManager: newTopologyManager(), quit: make(chan struct{}), @@ -74,8 +81,19 @@ func (c *ChannelGraph) Start() error { log.Debugf("ChannelGraph starting") defer log.Debug("ChannelGraph started") - if c.graphCache != nil { - if err := c.populateCache(context.TODO()); err != nil { + ctx := context.TODO() + if c.opts.asyncGraphCachePopulation { + c.wg.Add(1) + go func() { + defer c.wg.Done() + + if err := c.populateCache(ctx); err != nil { + log.Errorf("Could not populate the graph "+ + "cache: %v", err) + } + }() + } else { + if err := c.populateCache(ctx); err != nil { return fmt.Errorf("could not populate the graph "+ "cache: %w", err) } @@ -157,9 +175,13 @@ func (c *ChannelGraph) handleTopologySubscriptions() { } // populateCache loads the entire channel graph into the in-memory graph cache. -// -// NOTE: This should only be called if the graphCache has been constructed. func (c *ChannelGraph) populateCache(ctx context.Context) error { + if c.graphCache == nil { + log.Info("In-memory channel graph cache disabled") + + return nil + } + startTime := time.Now() log.Info("Populating in-memory channel graph, this might take a " + "while...") @@ -188,6 +210,8 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error { return err } + c.cacheLoaded.Store(true) + log.Infof("Finished populating in-memory channel graph (took %v, %s)", time.Since(startTime), c.graphCache.Stats()) @@ -207,7 +231,7 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error { func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex, cb func(channel *DirectedChannel) error, reset func()) error { - if c.graphCache != nil { + if c.graphCache != nil && c.cacheLoaded.Load() { return c.graphCache.ForEachChannel(node, cb) } @@ -223,7 +247,7 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex, func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) ( *lnwire.FeatureVector, error) { - if c.graphCache != nil { + if c.graphCache != nil && c.cacheLoaded.Load() { return c.graphCache.GetFeatures(node), nil } @@ -237,7 +261,7 @@ func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) ( func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error, reset func()) error { - if c.graphCache != nil { + if c.graphCache != nil && c.cacheLoaded.Load() { return cb(c) } @@ -252,7 +276,7 @@ func (c *ChannelGraph) ForEachNodeCached(ctx context.Context, cb func(node route.Vertex, chans map[uint64]*DirectedChannel) error, reset func()) error { - if c.graphCache != nil { + if c.graphCache != nil && c.cacheLoaded.Load() { return c.graphCache.ForEachNode(cb) } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 25ce1edf72..fb50aca6ac 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -26,6 +26,7 @@ import ( "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" "github.com/stretchr/testify/require" @@ -102,7 +103,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraph(t, WithSyncGraphCachePopulation()) // We'd like to test basic insertion/deletion for vertexes from the // graph, so we'll create a test vertex to start with. @@ -260,7 +261,7 @@ func TestPartialNode(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraph(t, WithSyncGraphCachePopulation()) // To insert a partial node, we need to add a channel edge that has // node keys for nodes we are not yet aware @@ -386,7 +387,7 @@ func TestEdgeInsertionDeletion(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraph(t, WithSyncGraphCachePopulation()) // We'd like to test the insertion/deletion of edges, so we create two // vertexes to connect. @@ -511,7 +512,7 @@ func TestDisconnectBlockAtHeight(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraph(t, WithSyncGraphCachePopulation()) sourceNode := createTestVertex(t) if err := graph.SetSourceNode(ctx, sourceNode); err != nil { @@ -802,7 +803,7 @@ func TestEdgeInfoUpdates(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraph(t, WithSyncGraphCachePopulation()) // We'd like to test the update of edges inserted into the database, so // we create two vertexes to connect. @@ -987,9 +988,6 @@ func assertNodeNotInCache(t *testing.T, g *ChannelGraph, n route.Vertex) { _, ok := g.graphCache.nodeFeatures[n] require.False(t, ok) - _, ok = g.graphCache.nodeChannels[n] - require.False(t, ok) - // We should get the default features for this node. features := g.graphCache.GetFeatures(n) require.Equal(t, lnwire.EmptyFeatureVector(), features) @@ -1336,11 +1334,16 @@ func TestForEachSourceNodeChannel(t *testing.T) { require.Empty(t, expectedSrcChans) } +// TestGraphTraversal tests that we can traverse the graph and find all +// nodes and channels that we expect to find. func TestGraphTraversal(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + // If we turn the channel graph cache _off_, then iterate through the + // set of channels (to force the fall back), we should find all the + // channel as well as the nodes included. + graph := MakeTestGraph(t, WithUseGraphCache(false)) // We'd like to test some of the graph traversal capabilities within // the DB, so we'll create a series of fake nodes to insert into the @@ -1355,10 +1358,6 @@ func TestGraphTraversal(t *testing.T) { nodeIndex[node.PubKeyBytes] = struct{}{} } - // If we turn the channel graph cache _off_, then iterate through the - // set of channels (to force the fall back), we should find all the - // channel as well as the nodes included. - graph.graphCache = nil err := graph.ForEachNodeCached(ctx, func(node route.Vertex, chans map[uint64]*DirectedChannel) error { @@ -1507,10 +1506,13 @@ func TestGraphTraversalCacheable(t *testing.T) { require.Len(t, chanIndex2, 0) } +// TestGraphCacheTraversal tests traversal of the graph via the graph cache. func TestGraphCacheTraversal(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + // Explicitly enable the graph cache so that the + // ForEachNodeDirectedChannel call below will use the cache. + graph := MakeTestGraph(t, WithUseGraphCache(true)) // We'd like to test some of the graph traversal capabilities within // the DB, so we'll create a series of fake nodes to insert into the @@ -1526,7 +1528,7 @@ func TestGraphCacheTraversal(t *testing.T) { for _, node := range nodeList { node := node - err := graph.graphCache.ForEachChannel( + err := graph.ForEachNodeDirectedChannel( node.PubKeyBytes, func(d *DirectedChannel) error { delete(chanIndex, d.ChannelID) @@ -1548,6 +1550,8 @@ func TestGraphCacheTraversal(t *testing.T) { numNodeChans++ return nil + }, func() { + numNodeChans = 0 }, ) require.NoError(t, err) @@ -4257,17 +4261,16 @@ func BenchmarkForEachChannel(b *testing.B) { } } -// TestGraphCacheForEachNodeChannel tests that the forEachNodeDirectedChannel +// TestForEachNodeDirectedChannel tests that the ForEachNodeDirectedChannel // method works as expected, and is able to handle nil self edges. -func TestGraphCacheForEachNodeChannel(t *testing.T) { +func TestForEachNodeDirectedChannel(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) - // Unset the channel graph cache to simulate the user running with the - // option turned off. - graph.graphCache = nil + // option turned off. This forces the V1Store ForEachNodeDirectedChannel + // to be queried instead of the graph cache's ForEachChannel method. + graph := MakeTestGraph(t, WithUseGraphCache(false)) node1 := createTestVertex(t) require.NoError(t, graph.AddLightningNode(ctx, node1)) @@ -4351,7 +4354,9 @@ func TestGraphLoading(t *testing.T) { // Next, create the graph for the first time. graphStore := NewTestDB(t) - graph, err := NewChannelGraph(graphStore) + graph, err := NewChannelGraph( + graphStore, WithSyncGraphCachePopulation(), + ) require.NoError(t, err) require.NoError(t, graph.Start()) t.Cleanup(func() { @@ -4365,7 +4370,9 @@ func TestGraphLoading(t *testing.T) { // Recreate the graph. This should cause the graph cache to be // populated. - graphReloaded, err := NewChannelGraph(graphStore) + graphReloaded, err := NewChannelGraph( + graphStore, WithSyncGraphCachePopulation(), + ) require.NoError(t, err) require.NoError(t, graphReloaded.Start()) t.Cleanup(func() { @@ -4384,6 +4391,105 @@ func TestGraphLoading(t *testing.T) { ) } +// TestAsyncGraphCache tests the behaviour of the ChannelGraph when the graph +// cache is populated asynchronously. +func TestAsyncGraphCache(t *testing.T) { + t.Parallel() + ctx := context.Background() + + const ( + numNodes = 100 + numChannels = 3 + ) + + // Next, create the graph for the first time. + graphStore := NewTestDB(t) + + // The first time we spin up the graph, we Start is as normal and fill + // it with test data. This will ensure that the graph cache has + // something to load on the next Start. + graph, err := NewChannelGraph(graphStore) + require.NoError(t, err) + require.NoError(t, graph.Start()) + channels, nodes := fillTestGraph(t, graph, numNodes, numChannels) + + assertGraphState := func() { + var ( + numNodes int + chanIndex = make(map[uint64]struct{}, numChannels) + ) + // We query the graph for all nodes and channels, and + // assert that we get the expected number of nodes and + // channels. + err := graph.ForEachNodeCached( + ctx, func(node route.Vertex, + chans map[uint64]*DirectedChannel) error { + + numNodes++ + for chanID := range chans { + chanIndex[chanID] = struct{}{} + } + + return nil + }, func() { + numNodes = 0 + chanIndex = make( + map[uint64]struct{}, numChannels, + ) + }, + ) + require.NoError(t, err) + + require.Equal(t, len(nodes), numNodes) + require.Equal(t, len(channels), len(chanIndex)) + } + + assertGraphState() + + // Now we stop the graph. + require.NoError(t, graph.Stop()) + + // Recreate it but don't start it yet. + graph, err = NewChannelGraph(graphStore) + require.NoError(t, err) + + // Spin off a goroutine that starts to make queries to the ChannelGraph. + // We start this before we start the graph, so that we can ensure that + // the queries are made while the graph cache is being populated. + var ( + wg sync.WaitGroup + numRuns = 10 + ) + for i := 0; i < numRuns; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + assertGraphState() + }() + } + + require.NoError(t, graph.Start()) + t.Cleanup(func() { + require.NoError(t, graph.Stop()) + }) + + wg.Wait() + + // Wait for the cache to be fully populated. + err = wait.Predicate(func() bool { + return graph.cacheLoaded.Load() + }, wait.DefaultTimeout) + require.NoError(t, err) + + // And then assert that all the expected nodes and channels are + // present in the graph cache. + for _, node := range nodes { + _, ok := graph.graphCache.nodeChannels[node.PubKeyBytes] + require.True(t, ok) + } +} + // TestClosedScid tests that we can correctly insert a SCID into the index of // closed short channel ids. func TestClosedScid(t *testing.T) { diff --git a/graph/db/options.go b/graph/db/options.go index 3edda66021..377671d680 100644 --- a/graph/db/options.go +++ b/graph/db/options.go @@ -31,14 +31,20 @@ type chanGraphOptions struct { // preAllocCacheNumNodes is the number of nodes we expect to be in the // graph cache, so we can pre-allocate the map accordingly. preAllocCacheNumNodes int + + // asyncGraphCachePopulation indicates whether the graph cache + // should be populated asynchronously or if the Start method should + // block until the cache is fully populated. + asyncGraphCachePopulation bool } // defaultChanGraphOptions returns a new chanGraphOptions instance populated // with default values. func defaultChanGraphOptions() *chanGraphOptions { return &chanGraphOptions{ - useGraphCache: true, - preAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, + useGraphCache: true, + asyncGraphCachePopulation: true, + preAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, } } @@ -61,6 +67,25 @@ func WithPreAllocCacheNumNodes(n int) ChanGraphOption { } } +// WithAsyncGraphCachePopulation sets whether the graph cache should be +// populated asynchronously or if the Start method should block until the +// cache is fully populated. +func WithAsyncGraphCachePopulation(async bool) ChanGraphOption { + return func(o *chanGraphOptions) { + o.asyncGraphCachePopulation = async + } +} + +// WithSyncGraphCachePopulation will cause the ChannelGraph to block +// until the graph cache is fully populated before returning from the Start +// method. This is useful for tests that need to ensure the graph cache is +// fully populated before proceeding with further operations. +func WithSyncGraphCachePopulation() ChanGraphOption { + return func(o *chanGraphOptions) { + o.asyncGraphCachePopulation = false + } +} + // StoreOptions holds parameters for tuning and customizing a graph DB. type StoreOptions struct { // RejectCacheSize is the maximum number of rejectCacheEntries to hold diff --git a/lncfg/db.go b/lncfg/db.go index 3da1f4d3b8..68940ed287 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -92,6 +92,8 @@ type DB struct { NoGraphCache bool `long:"no-graph-cache" description:"Don't use the in-memory graph cache for path finding. Much slower but uses less RAM. Can only be used with a bolt database backend."` + SyncGraphCacheLoad bool `long:"sync-graph-cache-load" description:"Force synchronous loading of the graph cache. This will block the startup until the graph cache is fully loaded into memory. This is useful if any bugs appear with the new async loading feature of the graph cache."` + PruneRevocation bool `long:"prune-revocation" description:"Run the optional migration that prunes the revocation logs to save disk space."` NoRevLogAmtData bool `long:"no-rev-log-amt-data" description:"If set, the to-local and to-remote output amounts of revoked commitment transactions will not be stored in the revocation log. Note that once this data is lost, a watchtower client will not be able to back up the revoked state."` diff --git a/sample-lnd.conf b/sample-lnd.conf index 7e105de27f..78a28c2102 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -1471,6 +1471,12 @@ ; less RAM. Can only be used with a bolt database backend. ; db.no-graph-cache=false +; Block the start-up of LND until the graph cache has been fully populated. +; If not set, the graph cache will be populated asynchronously and any read +; calls made before the cache is fully populated will fall back to the +; database. +; db.sync-graph-cache-load=false + ; Specify whether the optional migration for pruning old revocation logs ; should be applied. This migration will only save disk space if there are open ; channels prior to lnd@v0.15.0.