@@ -198,6 +198,14 @@ type LightningClient interface {
198
198
RegisterRPCMiddleware (ctx context.Context , middlewareName ,
199
199
customCaveatName string , readOnly bool , timeout time.Duration ,
200
200
intercept InterceptFunction ) (chan error , error )
201
+
202
+ // SendCustomMessage sends a custom message to a peer.
203
+ SendCustomMessage (ctx context.Context , msg CustomMessage ) error
204
+
205
+ // SubscribeCustomMessages creates a subscription to custom messages
206
+ // received from our peers.
207
+ SubscribeCustomMessages (ctx context.Context ) (<- chan CustomMessage ,
208
+ <- chan error , error )
201
209
}
202
210
203
211
// Info contains info about the connected lnd node.
@@ -1076,6 +1084,18 @@ type QueryRoutesResponse struct {
1076
1084
TotalAmtMsat lnwire.MilliSatoshi
1077
1085
}
1078
1086
1087
+ // CustomMessage describes custom messages exchanged with peers.
1088
+ type CustomMessage struct {
1089
+ // Peer is the peer that the message was exchanged with.
1090
+ Peer route.Vertex
1091
+
1092
+ // MsgType is the protocol message type number for the custom message.
1093
+ MsgType uint32
1094
+
1095
+ // Data is the data exchanged.
1096
+ Data []byte
1097
+ }
1098
+
1079
1099
var (
1080
1100
// ErrNoRouteFound is returned if we can't find a path with the passed
1081
1101
// parameters.
@@ -3723,3 +3743,84 @@ func (s *lightningClient) RegisterRPCMiddleware(ctx context.Context,
3723
3743
3724
3744
return errChan , nil
3725
3745
}
3746
+
3747
+ // SendCustomMessage sends a custom message to one of our existing peers. Note
3748
+ // that lnd must already be connected to a peer to send it messages.
3749
+ func (s * lightningClient ) SendCustomMessage (ctx context.Context ,
3750
+ msg CustomMessage ) error {
3751
+
3752
+ rpcCtx , cancel := context .WithTimeout (ctx , s .timeout )
3753
+ defer cancel ()
3754
+
3755
+ rpcCtx = s .adminMac .WithMacaroonAuth (rpcCtx )
3756
+ rpcReq := & lnrpc.SendCustomMessageRequest {
3757
+ Peer : msg .Peer [:],
3758
+ Type : msg .MsgType ,
3759
+ Data : msg .Data ,
3760
+ }
3761
+
3762
+ _ , err := s .client .SendCustomMessage (rpcCtx , rpcReq )
3763
+ return err
3764
+ }
3765
+
3766
+ // SubscribeCustomMessages subscribes to a stream of custom messages, optionally
3767
+ // filtering by peer and message type. The channels returned will be closed
3768
+ // when the subscription exits.
3769
+ func (s * lightningClient ) SubscribeCustomMessages (ctx context.Context ) (
3770
+ <- chan CustomMessage , <- chan error , error ) {
3771
+
3772
+ rpcCtx := s .adminMac .WithMacaroonAuth (ctx )
3773
+ rpcReq := & lnrpc.SubscribeCustomMessagesRequest {}
3774
+
3775
+ client , err := s .client .SubscribeCustomMessages (rpcCtx , rpcReq )
3776
+ if err != nil {
3777
+ return nil , nil , err
3778
+ }
3779
+
3780
+ var (
3781
+ // Buffer error channel by 1 so that consumer reading from this
3782
+ // channel does not block our exit.
3783
+ errChan = make (chan error , 1 )
3784
+ msgChan = make (chan CustomMessage )
3785
+ )
3786
+
3787
+ s .wg .Add (1 )
3788
+ go func () {
3789
+ defer func () {
3790
+ // Close channels on exit so that callers know the
3791
+ // subscription has finished.
3792
+ close (errChan )
3793
+ close (msgChan )
3794
+
3795
+ s .wg .Done ()
3796
+ }()
3797
+
3798
+ for {
3799
+ msg , err := client .Recv ()
3800
+ if err != nil {
3801
+ errChan <- fmt .Errorf ("receive failed: %w" , err )
3802
+ return
3803
+ }
3804
+
3805
+ peer , err := route .NewVertexFromBytes (msg .Peer )
3806
+ if err != nil {
3807
+ errChan <- fmt .Errorf ("invalid peer: %w" , err )
3808
+ return
3809
+ }
3810
+
3811
+ customMsg := CustomMessage {
3812
+ Peer : peer ,
3813
+ Data : msg .Data ,
3814
+ MsgType : msg .Type ,
3815
+ }
3816
+
3817
+ select {
3818
+ case msgChan <- customMsg :
3819
+ case <- ctx .Done ():
3820
+ return
3821
+ }
3822
+ }
3823
+ }()
3824
+
3825
+ return msgChan , errChan , nil
3826
+ }
0 commit comments