@@ -469,8 +469,7 @@ where
469
469
return Ok ( ( ) ) ;
470
470
}
471
471
472
- let persist_fut = {
473
- let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
472
+ self . update_state ( |state_lock| -> Result < ( ( ) , bool ) , ( ) > {
474
473
for descriptor in relevant_descriptors {
475
474
let output_info = TrackedSpendableOutput {
476
475
descriptor,
@@ -490,17 +489,12 @@ where
490
489
}
491
490
492
491
state_lock. outputs . push ( output_info) ;
492
+ state_lock. dirty = true ;
493
493
}
494
494
495
- state_lock. dirty = false ;
496
- self . persist_state ( & state_lock)
497
- } ;
498
-
499
- persist_fut. await . map_err ( |e| {
500
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
501
-
502
- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
495
+ Ok ( ( ( ) , true ) )
503
496
} )
497
+ . await
504
498
}
505
499
506
500
/// Returns a list of the currently tracked spendable outputs.
@@ -556,29 +550,18 @@ where
556
550
} ;
557
551
558
552
// See if there is anything to sweep before requesting a change address.
559
- let ( persist_fut, has_respends) = {
560
- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
561
-
562
- let cur_height = sweeper_state. best_block . height ;
563
- let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
564
- // If there is nothing to sweep, we still persist the state if it is dirty.
565
- let fut_opt = if !has_respends && sweeper_state. dirty {
566
- sweeper_state. dirty = false ;
567
- Some ( self . persist_state ( & sweeper_state) )
568
- } else {
569
- None
570
- } ;
571
-
572
- ( fut_opt, has_respends)
573
- } ;
553
+ let has_respends = self
554
+ . update_state ( |sweeper_state| -> Result < ( bool , bool ) , ( ) > {
555
+ let cur_height = sweeper_state. best_block . height ;
556
+ let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
574
557
575
- if let Some ( persist_fut ) = persist_fut {
576
- persist_fut . await . map_err ( |e| {
577
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
558
+ // If there are respends, we don't need to persist a dirty state already now and can postpone it until
559
+ // after the sweep.
560
+ let persist_if_dirty = !has_respends ;
578
561
579
- log_error ! ( self . logger , "Error persisting OutputSweeper: {:?}" , e ) ;
580
- } ) ? ;
581
- } ;
562
+ Ok ( ( has_respends , persist_if_dirty ) )
563
+ } )
564
+ . await ? ;
582
565
583
566
if !has_respends {
584
567
return Ok ( ( ) ) ;
@@ -589,67 +572,56 @@ where
589
572
self . change_destination_source . get_change_destination_script ( ) . await ?;
590
573
591
574
// Sweep the outputs.
592
- let spending_tx;
593
- let persist_fut = {
594
- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
595
-
596
- let cur_height = sweeper_state. best_block . height ;
597
- let cur_hash = sweeper_state. best_block . block_hash ;
575
+ let spending_tx = self
576
+ . update_state ( |sweeper_state| -> Result < ( Option < Transaction > , bool ) , ( ) > {
577
+ let cur_height = sweeper_state. best_block . height ;
578
+ let cur_hash = sweeper_state. best_block . block_hash ;
598
579
599
- let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
600
- . outputs
601
- . iter ( )
602
- . filter ( |o| filter_fn ( * o, cur_height) )
603
- . map ( |o| & o. descriptor )
604
- . collect ( ) ;
605
-
606
- // Exit if there is nothing to spend anymore and there also is no need to persist the state.
607
- if respend_descriptors. is_empty ( ) && !sweeper_state. dirty {
608
- return Ok ( ( ) ) ;
609
- }
610
-
611
- // Generate the spending transaction and broadcast it.
612
- spending_tx = if !respend_descriptors. is_empty ( ) {
613
- let spending_tx = self
614
- . spend_outputs ( & sweeper_state, & respend_descriptors, change_destination_script)
615
- . map_err ( |e| {
616
- log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
617
- } ) ?;
618
-
619
- log_debug ! (
620
- self . logger,
621
- "Generating and broadcasting sweeping transaction {}" ,
622
- spending_tx. compute_txid( )
623
- ) ;
580
+ let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
581
+ . outputs
582
+ . iter ( )
583
+ . filter ( |o| filter_fn ( * o, cur_height) )
584
+ . map ( |o| & o. descriptor )
585
+ . collect ( ) ;
586
+
587
+ // Generate the spending transaction and broadcast it.
588
+ if !respend_descriptors. is_empty ( ) {
589
+ let spending_tx = self
590
+ . spend_outputs (
591
+ & sweeper_state,
592
+ & respend_descriptors,
593
+ change_destination_script,
594
+ )
595
+ . map_err ( |e| {
596
+ log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
597
+ } ) ?;
598
+
599
+ log_debug ! (
600
+ self . logger,
601
+ "Generating and broadcasting sweeping transaction {}" ,
602
+ spending_tx. compute_txid( )
603
+ ) ;
624
604
625
- // As we didn't modify the state so far, the same filter_fn yields the same elements as
626
- // above.
627
- let respend_outputs =
628
- sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
629
- for output_info in respend_outputs {
630
- if let Some ( filter) = self . chain_data_source . as_ref ( ) {
631
- let watched_output = output_info. to_watched_output ( cur_hash) ;
632
- filter. register_output ( watched_output) ;
605
+ // As we didn't modify the state so far, the same filter_fn yields the same elements as
606
+ // above.
607
+ let respend_outputs =
608
+ sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
609
+ for output_info in respend_outputs {
610
+ if let Some ( filter) = self . chain_data_source . as_ref ( ) {
611
+ let watched_output = output_info. to_watched_output ( cur_hash) ;
612
+ filter. register_output ( watched_output) ;
613
+ }
614
+
615
+ output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
616
+ sweeper_state. dirty = true ;
633
617
}
634
618
635
- output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
619
+ Ok ( ( Some ( spending_tx) , true ) )
620
+ } else {
621
+ Ok ( ( None , true ) )
636
622
}
637
-
638
- Some ( spending_tx)
639
- } else {
640
- None
641
- } ;
642
-
643
- // Either the state was already dirty or we modified it above, so we persist it.
644
- sweeper_state. dirty = false ;
645
- self . persist_state ( & sweeper_state)
646
- } ;
647
-
648
- persist_fut. await . map_err ( |e| {
649
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
650
-
651
- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
652
- } ) ?;
623
+ } )
624
+ . await ?;
653
625
654
626
// Persistence completely successfully. If we have a spending transaction, we broadcast it.
655
627
if let Some ( spending_tx) = spending_tx {
@@ -694,6 +666,34 @@ where
694
666
)
695
667
}
696
668
669
+ /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
670
+ /// but only if persist_if_dirty is also true. Returning false for persist_if_dirty allows the callback to postpone
671
+ /// persisting a potentially dirty state.
672
+ async fn update_state < X > (
673
+ & self , callback : impl FnOnce ( & mut SweeperState ) -> Result < ( X , bool ) , ( ) > ,
674
+ ) -> Result < X , ( ) > {
675
+ let ( fut, res) = {
676
+ let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
677
+
678
+ let ( res, persist_if_dirty) = callback ( & mut state_lock) ?;
679
+ if !state_lock. dirty || !persist_if_dirty {
680
+ return Ok ( res) ;
681
+ }
682
+
683
+ state_lock. dirty = false ;
684
+
685
+ ( self . persist_state ( & state_lock) , res)
686
+ } ;
687
+
688
+ fut. await . map_err ( |e| {
689
+ self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
690
+
691
+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
692
+ } ) ?;
693
+
694
+ Ok ( res)
695
+ }
696
+
697
697
fn spend_outputs (
698
698
& self , sweeper_state : & SweeperState , descriptors : & [ & SpendableOutputDescriptor ] ,
699
699
change_destination_script : ScriptBuf ,
0 commit comments