Skip to content

Commit f02f1c5

Browse files
committed
graph/db: allow async cache population
We also add a functional option to opt out of this behaviour so that any tests that rely on the graph cache to continue to work reliably.
1 parent 88194c7 commit f02f1c5

File tree

3 files changed

+152
-11
lines changed

3 files changed

+152
-11
lines changed

graph/db/graph.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type ChannelGraph struct {
3131
started atomic.Bool
3232
stopped atomic.Bool
3333

34+
opts *chanGraphOptions
35+
3436
// cacheLoaded is true if the initial graphCache population has
3537
// finished. We use this to ensure that when performing any reads,
3638
// we only read from the graphCache if it has been fully populated.
@@ -54,6 +56,7 @@ func NewChannelGraph(v1Store V1Store,
5456
}
5557

5658
g := &ChannelGraph{
59+
opts: opts,
5760
V1Store: v1Store,
5861
topologyManager: newTopologyManager(),
5962
quit: make(chan struct{}),
@@ -78,8 +81,22 @@ func (c *ChannelGraph) Start() error {
7881
log.Debugf("ChannelGraph starting")
7982
defer log.Debug("ChannelGraph started")
8083

81-
if err := c.populateCache(context.TODO()); err != nil {
82-
return fmt.Errorf("could not populate the graph cache: %w", err)
84+
ctx := context.TODO()
85+
if c.opts.asyncGraphCachePopulation {
86+
c.wg.Add(1)
87+
go func() {
88+
defer c.wg.Done()
89+
90+
if err := c.populateCache(ctx); err != nil {
91+
log.Errorf("Could not populate the graph "+
92+
"cache: %v", err)
93+
}
94+
}()
95+
} else {
96+
if err := c.populateCache(ctx); err != nil {
97+
return fmt.Errorf("could not populate the graph "+
98+
"cache: %w", err)
99+
}
83100
}
84101

85102
c.wg.Add(1)

graph/db/graph_test.go

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/lightningnetwork/lnd/fn/v2"
2727
"github.com/lightningnetwork/lnd/graph/db/models"
2828
"github.com/lightningnetwork/lnd/kvdb"
29+
"github.com/lightningnetwork/lnd/lntest/wait"
2930
"github.com/lightningnetwork/lnd/lnwire"
3031
"github.com/lightningnetwork/lnd/routing/route"
3132
"github.com/stretchr/testify/require"
@@ -102,7 +103,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
102103
t.Parallel()
103104
ctx := context.Background()
104105

105-
graph := MakeTestGraph(t)
106+
graph := MakeTestGraph(t, WithSyncGraphCachePopulation())
106107

107108
// We'd like to test basic insertion/deletion for vertexes from the
108109
// graph, so we'll create a test vertex to start with.
@@ -260,7 +261,7 @@ func TestPartialNode(t *testing.T) {
260261
t.Parallel()
261262
ctx := context.Background()
262263

263-
graph := MakeTestGraph(t)
264+
graph := MakeTestGraph(t, WithSyncGraphCachePopulation())
264265

265266
// To insert a partial node, we need to add a channel edge that has
266267
// node keys for nodes we are not yet aware
@@ -386,7 +387,7 @@ func TestEdgeInsertionDeletion(t *testing.T) {
386387
t.Parallel()
387388
ctx := context.Background()
388389

389-
graph := MakeTestGraph(t)
390+
graph := MakeTestGraph(t, WithSyncGraphCachePopulation())
390391

391392
// We'd like to test the insertion/deletion of edges, so we create two
392393
// vertexes to connect.
@@ -511,7 +512,7 @@ func TestDisconnectBlockAtHeight(t *testing.T) {
511512
t.Parallel()
512513
ctx := context.Background()
513514

514-
graph := MakeTestGraph(t)
515+
graph := MakeTestGraph(t, WithSyncGraphCachePopulation())
515516

516517
sourceNode := createTestVertex(t)
517518
if err := graph.SetSourceNode(ctx, sourceNode); err != nil {
@@ -802,7 +803,7 @@ func TestEdgeInfoUpdates(t *testing.T) {
802803
t.Parallel()
803804
ctx := context.Background()
804805

805-
graph := MakeTestGraph(t)
806+
graph := MakeTestGraph(t, WithSyncGraphCachePopulation())
806807

807808
// We'd like to test the update of edges inserted into the database, so
808809
// we create two vertexes to connect.
@@ -4350,7 +4351,9 @@ func TestGraphLoading(t *testing.T) {
43504351
// Next, create the graph for the first time.
43514352
graphStore := NewTestDB(t)
43524353

4353-
graph, err := NewChannelGraph(graphStore)
4354+
graph, err := NewChannelGraph(
4355+
graphStore, WithSyncGraphCachePopulation(),
4356+
)
43544357
require.NoError(t, err)
43554358
require.NoError(t, graph.Start())
43564359
t.Cleanup(func() {
@@ -4364,7 +4367,9 @@ func TestGraphLoading(t *testing.T) {
43644367

43654368
// Recreate the graph. This should cause the graph cache to be
43664369
// populated.
4367-
graphReloaded, err := NewChannelGraph(graphStore)
4370+
graphReloaded, err := NewChannelGraph(
4371+
graphStore, WithSyncGraphCachePopulation(),
4372+
)
43684373
require.NoError(t, err)
43694374
require.NoError(t, graphReloaded.Start())
43704375
t.Cleanup(func() {
@@ -4383,6 +4388,100 @@ func TestGraphLoading(t *testing.T) {
43834388
)
43844389
}
43854390

4391+
// TestAsyncGraphCache tests the behaviour of the ChannelGraph when the graph
4392+
// cache is populated asynchronously.
4393+
func TestAsyncGraphCache(t *testing.T) {
4394+
t.Parallel()
4395+
ctx := context.Background()
4396+
4397+
const (
4398+
numNodes = 100
4399+
numChannels = 3
4400+
)
4401+
4402+
// Next, create the graph for the first time.
4403+
graphStore := NewTestDB(t)
4404+
4405+
// The first time we spin up the graph, we Start is as normal and fill
4406+
// it with test data. This will ensure that the graph cache has
4407+
// something to load on the next Start.
4408+
graph, err := NewChannelGraph(graphStore)
4409+
require.NoError(t, err)
4410+
require.NoError(t, graph.Start())
4411+
channels, nodes := fillTestGraph(t, graph, numNodes, numChannels)
4412+
4413+
assertGraphState := func() {
4414+
var (
4415+
numNodes int
4416+
chanIndex = make(map[uint64]struct{}, numChannels)
4417+
)
4418+
// We query the graph for all nodes and channels, and
4419+
// assert that we get the expected number of nodes and
4420+
// channels.
4421+
err := graph.ForEachNodeCached(
4422+
ctx, func(node route.Vertex,
4423+
chans map[uint64]*DirectedChannel) error {
4424+
4425+
numNodes++
4426+
for chanID := range chans {
4427+
chanIndex[chanID] = struct{}{}
4428+
}
4429+
4430+
return nil
4431+
},
4432+
)
4433+
require.NoError(t, err)
4434+
4435+
require.Equal(t, len(nodes), numNodes)
4436+
require.Equal(t, len(channels), len(chanIndex))
4437+
}
4438+
4439+
assertGraphState()
4440+
4441+
// Now we stop the graph.
4442+
require.NoError(t, graph.Stop())
4443+
4444+
// Recreate it but don't start it yet.
4445+
graph, err = NewChannelGraph(graphStore)
4446+
require.NoError(t, err)
4447+
4448+
// Spin off a goroutine that starts to make queries to the ChannelGraph.
4449+
// We start this before we start the graph, so that we can ensure that
4450+
// the queries are made while the graph cache is being populated.
4451+
var (
4452+
wg sync.WaitGroup
4453+
numRuns = 10
4454+
)
4455+
for i := 0; i < numRuns; i++ {
4456+
wg.Add(1)
4457+
go func() {
4458+
defer wg.Done()
4459+
4460+
assertGraphState()
4461+
}()
4462+
}
4463+
4464+
require.NoError(t, graph.Start())
4465+
t.Cleanup(func() {
4466+
require.NoError(t, graph.Stop())
4467+
})
4468+
4469+
wg.Wait()
4470+
4471+
// Wait for the cache to be fully populated.
4472+
err = wait.Predicate(func() bool {
4473+
return graph.cacheLoaded.Load()
4474+
}, wait.DefaultTimeout)
4475+
require.NoError(t, err)
4476+
4477+
// And then assert that all the expected nodes and channels are
4478+
// present in the graph cache.
4479+
for _, node := range nodes {
4480+
_, ok := graph.graphCache.nodeChannels[node.PubKeyBytes]
4481+
require.True(t, ok)
4482+
}
4483+
}
4484+
43864485
// TestClosedScid tests that we can correctly insert a SCID into the index of
43874486
// closed short channel ids.
43884487
func TestClosedScid(t *testing.T) {

graph/db/options.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,20 @@ type chanGraphOptions struct {
3131
// preAllocCacheNumNodes is the number of nodes we expect to be in the
3232
// graph cache, so we can pre-allocate the map accordingly.
3333
preAllocCacheNumNodes int
34+
35+
// asyncGraphCachePopulation indicates whether the graph cache
36+
// should be populated asynchronously or if the Start method should
37+
// block until the cache is fully populated.
38+
asyncGraphCachePopulation bool
3439
}
3540

3641
// defaultChanGraphOptions returns a new chanGraphOptions instance populated
3742
// with default values.
3843
func defaultChanGraphOptions() *chanGraphOptions {
3944
return &chanGraphOptions{
40-
useGraphCache: true,
41-
preAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
45+
useGraphCache: true,
46+
asyncGraphCachePopulation: true,
47+
preAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
4248
}
4349
}
4450

@@ -61,6 +67,25 @@ func WithPreAllocCacheNumNodes(n int) ChanGraphOption {
6167
}
6268
}
6369

70+
// WithAsyncGraphCachePopulation sets whether the graph cache should be
71+
// populated asynchronously or if the Start method should block until the
72+
// cache is fully populated.
73+
func WithAsyncGraphCachePopulation(async bool) ChanGraphOption {
74+
return func(o *chanGraphOptions) {
75+
o.asyncGraphCachePopulation = async
76+
}
77+
}
78+
79+
// WithSyncGraphCachePopulation will cause the ChannelGraph to block
80+
// until the graph cache is fully populated before returning from the Start
81+
// method. This is useful for tests that need to ensure the graph cache is
82+
// fully populated before proceeding with further operations.
83+
func WithSyncGraphCachePopulation() ChanGraphOption {
84+
return func(o *chanGraphOptions) {
85+
o.asyncGraphCachePopulation = false
86+
}
87+
}
88+
6489
// StoreOptions holds parameters for tuning and customizing a graph DB.
6590
type StoreOptions struct {
6691
// RejectCacheSize is the maximum number of rejectCacheEntries to hold

0 commit comments

Comments
 (0)