Skip to content

Commit f93d0ae

Browse files
terminal: separate session RPC server init & start
In the upcoming actions migration, we need fetch LNDs macaroons prior to initializing the stores, as the macaroons will be required during the migration. This requires that we setup the connection to LND before we initialize the stores, as we can only fetch the macaroons after the LND connection is established. In preparation of doing so, we cannot reference the stores when initializing/creating the sessions RPC server object, as we do so prior to setting up the LND connection. This commit therefore refactors the sessions RPC server so that we separate the initializing from the starting of the RPC server, and only require the store reference during the actual startup of the RPC server. Note that while this commit adds an `active atomic int32` to the RPC server which must be set toggled before the server will process requests, the RPC proxy and status manager will ensure that no requests get sent to the RPC prior to LiT's subserver being set to running. This is added to ensure that we dont introduce any nil pointer panics in future updates though, as the RPC server would panic in requests were actually processed prior to it having the dependencies injected during the `Start` function. Also note that we still keep the init of the sessions RPC server reference prior to setting up the LND connection, as not doing so would require that we'd refactor the registering of `GrpcSubserver`s in a more complex and in a less elegant way.
1 parent f3428cb commit f93d0ae

File tree

2 files changed

+133
-76
lines changed

2 files changed

+133
-76
lines changed

session_rpcserver.go

Lines changed: 100 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/btcsuite/btcd/btcec/v2"
@@ -40,8 +41,17 @@ import (
4041
// other special cases.
4142
const readOnlyAction = "***readonly***"
4243

44+
var (
45+
// ErrServerNotActive indicates that the server has started but hasn't
46+
// fully finished the startup process.
47+
ErrServerNotActive = errors.New("session server is still in the " +
48+
"process of starting")
49+
)
50+
4351
// sessionRpcServer is the gRPC server for the Session RPC interface.
4452
type sessionRpcServer struct {
53+
active int32 // atomic
54+
4555
litrpc.UnimplementedSessionsServer
4656
litrpc.UnimplementedFirewallServer
4757
litrpc.UnimplementedAutopilotServer
@@ -70,41 +80,10 @@ type sessionRpcServerConfig struct {
7080
privMap firewalldb.PrivacyMapper
7181
}
7282

73-
// newSessionRPCServer creates a new sessionRpcServer using the passed config.
74-
func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer,
75-
error) {
76-
77-
// Create the gRPC server that handles adding/removing sessions and the
78-
// actual mailbox server that spins up the Terminal Connect server
79-
// interface.
80-
server := session.NewServer(
81-
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
82-
// Add the session ID injector interceptors first so
83-
// that the session ID is available in the context of
84-
// all interceptors that come after.
85-
allOpts := []grpc.ServerOption{
86-
addSessionIDToStreamCtx(id),
87-
addSessionIDToUnaryCtx(id),
88-
}
89-
90-
allOpts = append(allOpts, cfg.grpcOptions...)
91-
allOpts = append(allOpts, opts...)
92-
93-
// Construct the gRPC server with the options.
94-
grpcServer := grpc.NewServer(allOpts...)
95-
96-
// Register various grpc servers with the LNC session
97-
// server.
98-
cfg.registerGrpcServers(grpcServer)
99-
100-
return grpcServer
101-
},
102-
)
103-
83+
// newSessionRPCServer creates a new sessionRpcServer.
84+
func newSessionRPCServer() (*sessionRpcServer, error) {
10485
return &sessionRpcServer{
105-
cfg: cfg,
106-
sessionServer: server,
107-
quit: make(chan struct{}),
86+
quit: make(chan struct{}),
10887
}, nil
10988
}
11089

@@ -164,9 +143,52 @@ func addSessionIDToUnaryCtx(id session.ID) grpc.ServerOption {
164143
})
165144
}
166145

167-
// start all the components necessary for the sessionRpcServer to start serving
168-
// requests. This includes resuming all non-revoked sessions.
169-
func (s *sessionRpcServer) start(ctx context.Context) error {
146+
// started returns true if the server has been started, and false otherwise.
147+
// NOTE: This function is safe for concurrent access.
148+
func (s *sessionRpcServer) started() bool {
149+
return atomic.LoadInt32(&s.active) != 0
150+
}
151+
152+
// start starts a new sessionRpcServer using the passed config, and adds all
153+
// components necessary for the sessionRpcServer to start serving requests. This
154+
// includes resuming all non-revoked sessions.
155+
func (s *sessionRpcServer) start(ctx context.Context,
156+
cfg *sessionRpcServerConfig) error {
157+
158+
if s.started() {
159+
return errors.New("session rpc server is already started")
160+
}
161+
162+
// Create the gRPC server that handles adding/removing sessions and the
163+
// actual mailbox server that spins up the Terminal Connect server
164+
// interface.
165+
server := session.NewServer(
166+
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
167+
// Add the session ID injector interceptors first so
168+
// that the session ID is available in the context of
169+
// all interceptors that come after.
170+
allOpts := []grpc.ServerOption{
171+
addSessionIDToStreamCtx(id),
172+
addSessionIDToUnaryCtx(id),
173+
}
174+
175+
allOpts = append(allOpts, cfg.grpcOptions...)
176+
allOpts = append(allOpts, opts...)
177+
178+
// Construct the gRPC server with the options.
179+
grpcServer := grpc.NewServer(allOpts...)
180+
181+
// Register various grpc servers with the LNC session
182+
// server.
183+
cfg.registerGrpcServers(grpcServer)
184+
185+
return grpcServer
186+
},
187+
)
188+
189+
s.cfg = cfg
190+
s.sessionServer = server
191+
170192
// Delete all sessions in the Reserved state.
171193
err := s.cfg.db.DeleteReservedSessions(ctx)
172194
if err != nil {
@@ -248,14 +270,18 @@ func (s *sessionRpcServer) start(ctx context.Context) error {
248270
}
249271
}
250272

273+
atomic.StoreInt32(&s.active, 1)
274+
251275
return nil
252276
}
253277

254278
// stop cleans up any sessionRpcServer resources.
255279
func (s *sessionRpcServer) stop() error {
256280
var returnErr error
257281
s.stopOnce.Do(func() {
258-
s.sessionServer.Stop()
282+
if s.sessionServer != nil {
283+
s.sessionServer.Stop()
284+
}
259285

260286
close(s.quit)
261287
s.wg.Wait()
@@ -268,6 +294,10 @@ func (s *sessionRpcServer) stop() error {
268294
func (s *sessionRpcServer) AddSession(ctx context.Context,
269295
req *litrpc.AddSessionRequest) (*litrpc.AddSessionResponse, error) {
270296

297+
if !s.started() {
298+
return nil, ErrServerNotActive
299+
}
300+
271301
expiry := time.Unix(int64(req.ExpiryTimestampSeconds), 0)
272302
if time.Now().After(expiry) {
273303
return nil, fmt.Errorf("expiry must be in the future")
@@ -618,6 +648,10 @@ func (s *sessionRpcServer) resumeSession(ctx context.Context,
618648
func (s *sessionRpcServer) ListSessions(ctx context.Context,
619649
_ *litrpc.ListSessionsRequest) (*litrpc.ListSessionsResponse, error) {
620650

651+
if !s.started() {
652+
return nil, ErrServerNotActive
653+
}
654+
621655
sessions, err := s.cfg.db.ListAllSessions(ctx)
622656
if err != nil {
623657
return nil, fmt.Errorf("error fetching sessions: %v", err)
@@ -642,6 +676,10 @@ func (s *sessionRpcServer) ListSessions(ctx context.Context,
642676
func (s *sessionRpcServer) RevokeSession(ctx context.Context,
643677
req *litrpc.RevokeSessionRequest) (*litrpc.RevokeSessionResponse, error) {
644678

679+
if !s.started() {
680+
return nil, ErrServerNotActive
681+
}
682+
645683
pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
646684
if err != nil {
647685
return nil, fmt.Errorf("error parsing public key: %v", err)
@@ -676,6 +714,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
676714
req *litrpc.PrivacyMapConversionRequest) (
677715
*litrpc.PrivacyMapConversionResponse, error) {
678716

717+
if !s.started() {
718+
return nil, ErrServerNotActive
719+
}
720+
679721
var (
680722
groupID session.ID
681723
err error
@@ -733,6 +775,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
733775
func (s *sessionRpcServer) ListActions(ctx context.Context,
734776
req *litrpc.ListActionsRequest) (*litrpc.ListActionsResponse, error) {
735777

778+
if !s.started() {
779+
return nil, ErrServerNotActive
780+
}
781+
736782
// If no maximum number of actions is given, use a default of 100.
737783
if req.MaxNumActions == 0 {
738784
req.MaxNumActions = 100
@@ -841,6 +887,10 @@ func (s *sessionRpcServer) ListAutopilotFeatures(ctx context.Context,
841887
_ *litrpc.ListAutopilotFeaturesRequest) (
842888
*litrpc.ListAutopilotFeaturesResponse, error) {
843889

890+
if !s.started() {
891+
return nil, ErrServerNotActive
892+
}
893+
844894
fs, err := s.cfg.autopilot.ListFeatures(ctx)
845895
if err != nil {
846896
return nil, err
@@ -884,6 +934,10 @@ func (s *sessionRpcServer) AddAutopilotSession(ctx context.Context,
884934
req *litrpc.AddAutopilotSessionRequest) (
885935
*litrpc.AddAutopilotSessionResponse, error) {
886936

937+
if !s.started() {
938+
return nil, ErrServerNotActive
939+
}
940+
887941
if len(req.Features) == 0 {
888942
return nil, fmt.Errorf("must include at least one feature")
889943
}
@@ -1325,6 +1379,10 @@ func (s *sessionRpcServer) ListAutopilotSessions(ctx context.Context,
13251379
_ *litrpc.ListAutopilotSessionsRequest) (
13261380
*litrpc.ListAutopilotSessionsResponse, error) {
13271381

1382+
if !s.started() {
1383+
return nil, ErrServerNotActive
1384+
}
1385+
13281386
sessions, err := s.cfg.db.ListSessionsByType(ctx, session.TypeAutopilot)
13291387
if err != nil {
13301388
return nil, fmt.Errorf("error fetching sessions: %v", err)
@@ -1349,6 +1407,10 @@ func (s *sessionRpcServer) RevokeAutopilotSession(ctx context.Context,
13491407
req *litrpc.RevokeAutopilotSessionRequest) (
13501408
*litrpc.RevokeAutopilotSessionResponse, error) {
13511409

1410+
if !s.started() {
1411+
return nil, ErrServerNotActive
1412+
}
1413+
13521414
pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
13531415
if err != nil {
13541416
return nil, fmt.Errorf("error parsing public key: %v", err)

terminal.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -463,14 +463,6 @@ func (g *LightningTerminal) start(ctx context.Context) error {
463463
return fmt.Errorf("error creating account service: %v", err)
464464
}
465465

466-
superMacBaker := func(ctx context.Context, rootKeyID uint64,
467-
perms []bakery.Op, caveats []macaroon.Caveat) (string, error) {
468-
469-
return litmac.BakeSuperMacaroon(
470-
ctx, g.basicClient, rootKeyID, perms, caveats,
471-
)
472-
}
473-
474466
g.accountRpcServer = accounts.NewRPCServer()
475467

476468
g.ruleMgrs = rules.NewRuleManagerSet()
@@ -504,35 +496,7 @@ func (g *LightningTerminal) start(ctx context.Context) error {
504496
}
505497
}
506498

507-
g.sessionRpcServer, err = newSessionRPCServer(&sessionRpcServerConfig{
508-
db: g.stores.sessions,
509-
basicAuth: g.rpcProxy.basicAuth,
510-
grpcOptions: []grpc.ServerOption{
511-
grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck,
512-
grpc.ChainStreamInterceptor(
513-
g.rpcProxy.StreamServerInterceptor,
514-
),
515-
grpc.ChainUnaryInterceptor(
516-
g.rpcProxy.UnaryServerInterceptor,
517-
),
518-
grpc.UnknownServiceHandler(
519-
grpcProxy.TransparentHandler(
520-
// Don't allow calls to litrpc.
521-
g.rpcProxy.makeDirector(false),
522-
),
523-
),
524-
},
525-
registerGrpcServers: func(server *grpc.Server) {
526-
g.registerSubDaemonGrpcServers(server, true)
527-
},
528-
superMacBaker: superMacBaker,
529-
firstConnectionDeadline: g.cfg.FirstLNCConnDeadline,
530-
permMgr: g.permsMgr,
531-
actionsDB: g.stores.firewall,
532-
autopilot: g.autopilotClient,
533-
ruleMgrs: g.ruleMgrs,
534-
privMap: g.stores.firewall,
535-
})
499+
g.sessionRpcServer, err = newSessionRPCServer()
536500
if err != nil {
537501
return fmt.Errorf("could not create new session rpc "+
538502
"server: %v", err)
@@ -1055,7 +1019,38 @@ func (g *LightningTerminal) startInternalSubServers(ctx context.Context,
10551019
}
10561020

10571021
log.Infof("Starting LiT session server")
1058-
if err = g.sessionRpcServer.start(ctx); err != nil {
1022+
1023+
sessionCfg := &sessionRpcServerConfig{
1024+
db: g.stores.sessions,
1025+
basicAuth: g.rpcProxy.basicAuth,
1026+
grpcOptions: []grpc.ServerOption{
1027+
grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck,
1028+
grpc.ChainStreamInterceptor(
1029+
g.rpcProxy.StreamServerInterceptor,
1030+
),
1031+
grpc.ChainUnaryInterceptor(
1032+
g.rpcProxy.UnaryServerInterceptor,
1033+
),
1034+
grpc.UnknownServiceHandler(
1035+
grpcProxy.TransparentHandler(
1036+
// Don't allow calls to litrpc.
1037+
g.rpcProxy.makeDirector(false),
1038+
),
1039+
),
1040+
},
1041+
registerGrpcServers: func(server *grpc.Server) {
1042+
g.registerSubDaemonGrpcServers(server, true)
1043+
},
1044+
superMacBaker: superMacBaker,
1045+
firstConnectionDeadline: g.cfg.FirstLNCConnDeadline,
1046+
permMgr: g.permsMgr,
1047+
actionsDB: g.stores.firewall,
1048+
autopilot: g.autopilotClient,
1049+
ruleMgrs: g.ruleMgrs,
1050+
privMap: g.stores.firewall,
1051+
}
1052+
1053+
if err = g.sessionRpcServer.start(ctx, sessionCfg); err != nil {
10591054
return err
10601055
}
10611056
g.sessionRpcServerStarted = true

0 commit comments

Comments
 (0)