@@ -169,9 +169,10 @@ type batchConfig struct {
169169 // initial delay completion and publishing the batch transaction.
170170 batchPublishDelay time.Duration
171171
172- // noBumping instructs sweepbatcher not to fee bump itself and rely on
173- // external source of fee rates (FeeRateProvider).
174- noBumping bool
172+ // customFeeRate provides custom min fee rate per swap. The batch uses
173+ // max of the fee rates of its swaps. In this mode confTarget is
174+ // ignored and fee bumping by sweepbatcher is disabled.
175+ customFeeRate FeeRateProvider
175176
176177 // txLabeler is a function generating a transaction label. It is called
177178 // before publishing a batch transaction. Batch ID is passed to it.
@@ -232,6 +233,10 @@ type batch struct {
232233 // spendChan is the channel over which spend notifications are received.
233234 spendChan chan * chainntnfs.SpendDetail
234235
236+ // spendErrChan is the channel over which spend notifier errors are
237+ // received.
238+ spendErrChan chan error
239+
235240 // confChan is the channel over which confirmation notifications are
236241 // received.
237242 confChan chan * chainntnfs.TxConfirmation
@@ -378,9 +383,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
378383 id : - 1 ,
379384 state : Open ,
380385 sweeps : make (map [wire.OutPoint ]sweep ),
381- spendChan : make (chan * chainntnfs.SpendDetail ),
382386 confChan : make (chan * chainntnfs.TxConfirmation , 1 ),
383- reorgChan : make (chan struct {}, 1 ),
384387 testReqs : make (chan * testRequest ),
385388 errChan : make (chan error , 1 ),
386389 callEnter : make (chan struct {}),
@@ -423,9 +426,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
423426 state : bk .state ,
424427 primarySweepID : bk .primaryID ,
425428 sweeps : bk .sweeps ,
426- spendChan : make (chan * chainntnfs.SpendDetail ),
427429 confChan : make (chan * chainntnfs.TxConfirmation , 1 ),
428- reorgChan : make (chan struct {}, 1 ),
429430 testReqs : make (chan * testRequest ),
430431 errChan : make (chan error , 1 ),
431432 callEnter : make (chan struct {}),
@@ -723,6 +724,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
723724 // lower that minFeeRate of other sweeps (so it is
724725 // applied).
725726 if b .rbfCache .FeeRate < s .minFeeRate {
727+ b .Infof ("Increasing feerate of the batch " +
728+ "from %v to %v" , b .rbfCache .FeeRate ,
729+ s .minFeeRate )
726730 b .rbfCache .FeeRate = s .minFeeRate
727731 }
728732 }
@@ -769,6 +773,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
769773 // Update FeeRate. Max(s.minFeeRate) for all the sweeps of
770774 // the batch is the basis for fee bumps.
771775 if b .rbfCache .FeeRate < s .minFeeRate {
776+ b .Infof ("Increasing feerate of the batch " +
777+ "from %v to %v" , b .rbfCache .FeeRate ,
778+ s .minFeeRate )
772779 b .rbfCache .FeeRate = s .minFeeRate
773780 b .rbfCache .SkipNextBump = true
774781 }
@@ -968,6 +975,12 @@ func (b *batch) Run(ctx context.Context) error {
968975 continue
969976 }
970977
978+ // Update feerate of sweeps. This is normally done by
979+ // AddSweep, but it may not be called after the sweep
980+ // is confirmed, but fresh feerate is still needed to
981+ // keep publishing in case of reorg.
982+ b .updateFeeRate (ctx )
983+
971984 err := b .publish (ctx )
972985 if err != nil {
973986 return fmt .Errorf ("publish error: %w" , err )
@@ -979,23 +992,26 @@ func (b *batch) Run(ctx context.Context) error {
979992 return fmt .Errorf ("handleSpend error: %w" , err )
980993 }
981994
995+ case err := <- b .spendErrChan :
996+ b .writeToSpendErrChan (ctx , err )
997+
998+ return fmt .Errorf ("spend notifier failed: %w" , err )
999+
9821000 case conf := <- b .confChan :
9831001 if err := b .handleConf (runCtx , conf ); err != nil {
9841002 return fmt .Errorf ("handleConf error: %w" , err )
9851003 }
9861004
9871005 return nil
9881006
1007+ // A re-org has been detected. We set the batch state back to
1008+ // open since our batch transaction is no longer present in any
1009+ // block. We can accept more sweeps and try to publish.
9891010 case <- b .reorgChan :
9901011 b .state = Open
9911012 b .Warnf ("reorg detected, batch is able to " +
9921013 "accept new sweeps" )
9931014
994- err := b .monitorSpend (ctx , b .sweeps [b .primarySweepID ])
995- if err != nil {
996- return fmt .Errorf ("monitorSpend error: %w" , err )
997- }
998-
9991015 case testReq := <- b .testReqs :
10001016 testReq .handler ()
10011017 close (testReq .quit )
@@ -1013,6 +1029,41 @@ func (b *batch) Run(ctx context.Context) error {
10131029 }
10141030}
10151031
1032+ // updateFeeRate gets fresh values of minFeeRate for sweeps and updates the
1033+ // feerate of the batch if needed. This method must be called from event loop.
1034+ func (b * batch ) updateFeeRate (ctx context.Context ) {
1035+ for outpoint , s := range b .sweeps {
1036+ minFeeRate , err := minimumSweepFeeRate (
1037+ ctx , b .cfg .customFeeRate , b .wallet ,
1038+ s .swapHash , s .outpoint , s .confTarget ,
1039+ )
1040+ if err != nil {
1041+ b .Warnf ("failed to determine feerate for sweep %v of " +
1042+ "swap %x, confTarget %d: %w" , s .outpoint ,
1043+ s .swapHash [:6 ], s .confTarget , err )
1044+ continue
1045+ }
1046+
1047+ if minFeeRate <= s .minFeeRate {
1048+ continue
1049+ }
1050+
1051+ b .Infof ("Increasing feerate of sweep %v of swap %x from %v " +
1052+ "to %v" , s .outpoint , s .swapHash [:6 ], s .minFeeRate ,
1053+ minFeeRate )
1054+ s .minFeeRate = minFeeRate
1055+ b .sweeps [outpoint ] = s
1056+
1057+ if s .minFeeRate <= b .rbfCache .FeeRate {
1058+ continue
1059+ }
1060+
1061+ b .Infof ("Increasing feerate of the batch from %v to %v" ,
1062+ b .rbfCache .FeeRate , s .minFeeRate )
1063+ b .rbfCache .FeeRate = s .minFeeRate
1064+ }
1065+ }
1066+
10161067// testRunInEventLoop runs a function in the event loop blocking until
10171068// the function returns. For unit tests only!
10181069func (b * batch ) testRunInEventLoop (ctx context.Context , handler func ()) {
@@ -1790,7 +1841,7 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
17901841
17911842 // Set the initial value for our fee rate.
17921843 b .rbfCache .FeeRate = rate
1793- } else if ! b .cfg .noBumping {
1844+ } else if noBumping := b .cfg .customFeeRate != nil ; ! noBumping {
17941845 if b .rbfCache .SkipNextBump {
17951846 // Skip fee bumping, unset the flag, to bump next time.
17961847 b .rbfCache .SkipNextBump = false
@@ -1812,44 +1863,33 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
18121863// of the batch transaction gets confirmed, due to the uncertainty of RBF
18131864// replacements and network propagation, we can always detect the transaction.
18141865func (b * batch ) monitorSpend (ctx context.Context , primarySweep sweep ) error {
1815- spendCtx , cancel := context .WithCancel (ctx )
1866+ if b .spendChan != nil || b .spendErrChan != nil || b .reorgChan != nil {
1867+ return fmt .Errorf ("an attempt to run monitorSpend multiple " +
1868+ "times per batch" )
1869+ }
1870+
1871+ reorgChan := make (chan struct {}, 1 )
18161872
1817- spendChan , spendErr , err := b .chainNotifier .RegisterSpendNtfn (
1818- spendCtx , & primarySweep .outpoint , primarySweep .htlc .PkScript ,
1873+ spendChan , spendErrChan , err := b .chainNotifier .RegisterSpendNtfn (
1874+ ctx , & primarySweep .outpoint , primarySweep .htlc .PkScript ,
18191875 primarySweep .initiationHeight ,
1876+ lndclient .WithReOrgChan (reorgChan ),
18201877 )
18211878 if err != nil {
1822- cancel ()
1823-
1824- return err
1879+ return fmt .Errorf ("failed to register spend notifier for " +
1880+ "primary sweep %v, pkscript %x, height %d: %w" ,
1881+ primarySweep .outpoint , primarySweep .htlc .PkScript ,
1882+ primarySweep .initiationHeight , err )
18251883 }
18261884
1827- b .wg .Add (1 )
1828- go func () {
1829- defer cancel ()
1830- defer b .wg .Done ()
1831-
1832- b .Infof ("monitoring spend for outpoint %s" ,
1833- primarySweep .outpoint .String ())
1885+ b .Infof ("monitoring spend for outpoint %s" ,
1886+ primarySweep .outpoint .String ())
18341887
1835- select {
1836- case spend := <- spendChan :
1837- select {
1838- case b .spendChan <- spend :
1839-
1840- case <- ctx .Done ():
1841- }
1842-
1843- case err := <- spendErr :
1844- b .writeToSpendErrChan (ctx , err )
1845-
1846- b .writeToErrChan (
1847- fmt .Errorf ("spend error: %w" , err ),
1848- )
1849-
1850- case <- ctx .Done ():
1851- }
1852- }()
1888+ // This is safe to do as we always call monitorSpend from the event
1889+ // loop's goroutine.
1890+ b .spendChan = spendChan
1891+ b .spendErrChan = spendErrChan
1892+ b .reorgChan = reorgChan
18531893
18541894 return nil
18551895}
@@ -1862,14 +1902,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18621902 return fmt .Errorf ("can't find primarySweep" )
18631903 }
18641904
1865- reorgChan := make (chan struct {})
1866-
18671905 confCtx , cancel := context .WithCancel (ctx )
18681906
18691907 confChan , errChan , err := b .chainNotifier .RegisterConfirmationsNtfn (
18701908 confCtx , b .batchTxid , b .batchPkScript , batchConfHeight ,
18711909 primarySweep .initiationHeight ,
1872- lndclient .WithReOrgChan (reorgChan ),
18731910 )
18741911 if err != nil {
18751912 cancel ()
@@ -1895,18 +1932,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18951932 b .writeToErrChan (fmt .Errorf ("confirmations " +
18961933 "monitoring error: %w" , err ))
18971934
1898- case <- reorgChan :
1899- // A re-org has been detected. We set the batch
1900- // state back to open since our batch
1901- // transaction is no longer present in any
1902- // block. We can accept more sweeps and try to
1903- // publish new transactions, at this point we
1904- // need to monitor again for a new spend.
1905- select {
1906- case b .reorgChan <- struct {}{}:
1907- case <- ctx .Done ():
1908- }
1909-
19101935 case <- ctx .Done ():
19111936 }
19121937 }()
@@ -2395,12 +2420,6 @@ func (b *batch) writeToErrChan(err error) {
23952420
23962421// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
23972422func (b * batch ) writeToSpendErrChan (ctx context.Context , spendErr error ) {
2398- done , err := b .scheduleNextCall ()
2399- if err != nil {
2400- done ()
2401-
2402- return
2403- }
24042423 notifiers := make ([]* SpendNotifier , 0 , len (b .sweeps ))
24052424 for _ , s := range b .sweeps {
24062425 // If the sweep's notifier is empty then this means that a swap
@@ -2412,7 +2431,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
24122431
24132432 notifiers = append (notifiers , s .notifier )
24142433 }
2415- done ()
24162434
24172435 for _ , notifier := range notifiers {
24182436 select {
0 commit comments