Skip to content

Commit a792c60

Browse files
committed
lightningclient: add custom message subscription
1 parent d24db47 commit a792c60

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

lightning_client.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ type LightningClient interface {
201201

202202
// SendCustomMessage sends a custom message to a peer.
203203
SendCustomMessage(ctx context.Context, msg CustomMessage) error
204+
205+
// SubscribeCustomMessages creates a subscription to custom messages
206+
// received from our peers.
207+
SubscribeCustomMessages(ctx context.Context) (<-chan CustomMessage,
208+
<-chan error, error)
204209
}
205210

206211
// Info contains info about the connected lnd node.
@@ -3757,3 +3762,65 @@ func (s *lightningClient) SendCustomMessage(ctx context.Context,
37573762
_, err := s.client.SendCustomMessage(rpcCtx, rpcReq)
37583763
return err
37593764
}
3765+
3766+
// SubscribeCustomMessages subscribes to a stream of custom messages, optionally
3767+
// filtering by peer and message type. The channels returned will be closed
3768+
// when the subscription exits.
3769+
func (s *lightningClient) SubscribeCustomMessages(ctx context.Context) (
3770+
<-chan CustomMessage, <-chan error, error) {
3771+
3772+
rpcCtx := s.adminMac.WithMacaroonAuth(ctx)
3773+
rpcReq := &lnrpc.SubscribeCustomMessagesRequest{}
3774+
3775+
client, err := s.client.SubscribeCustomMessages(rpcCtx, rpcReq)
3776+
if err != nil {
3777+
return nil, nil, err
3778+
}
3779+
3780+
var (
3781+
// Buffer error channel by 1 so that consumer reading from this
3782+
// channel does not block our exit.
3783+
errChan = make(chan error, 1)
3784+
msgChan = make(chan CustomMessage)
3785+
)
3786+
3787+
s.wg.Add(1)
3788+
go func() {
3789+
defer func() {
3790+
// Close channels on exit so that callers know the
3791+
// subscription has finished.
3792+
close(errChan)
3793+
close(msgChan)
3794+
3795+
s.wg.Done()
3796+
}()
3797+
3798+
for {
3799+
msg, err := client.Recv()
3800+
if err != nil {
3801+
errChan <- fmt.Errorf("receive failed: %w", err)
3802+
return
3803+
}
3804+
3805+
peer, err := route.NewVertexFromBytes(msg.Peer)
3806+
if err != nil {
3807+
errChan <- fmt.Errorf("invalid peer: %w", err)
3808+
return
3809+
}
3810+
3811+
customMsg := CustomMessage{
3812+
Peer: peer,
3813+
Data: msg.Data,
3814+
MsgType: msg.Type,
3815+
}
3816+
3817+
select {
3818+
case msgChan <- customMsg:
3819+
case <-ctx.Done():
3820+
return
3821+
}
3822+
}
3823+
}()
3824+
3825+
return msgChan, errChan, nil
3826+
}

testdata/permissions.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,14 @@
540540
}
541541
]
542542
},
543+
"/lnrpc.Lightning/SubscribeCustomMessages": {
544+
"permissions": [
545+
{
546+
"entity": "peers",
547+
"action": "read"
548+
}
549+
]
550+
},
543551
"/lnrpc.Lightning/SubscribeChannelEvents": {
544552
"permissions": [
545553
{

0 commit comments

Comments
 (0)