@@ -249,7 +249,23 @@ impl AppTrait for App {
249249 } ;
250250 let mut interrupt = self . interrupt . clone ( ) ;
251251 tokio:: select! {
252- res = self . process_sender_session( sender_state, & persister) => return res,
252+ res = self . process_sender_session( sender_state, & persister) => {
253+ match res {
254+ Ok ( ( ) ) => return Ok ( ( ) ) ,
255+ Err ( err) => {
256+ match self . maybe_broadcast_sender_fallback( & persister) {
257+ Ok ( true ) => return Ok ( ( ) ) ,
258+ Ok ( false ) => return Err ( err) ,
259+ Err ( broadcast_err) => {
260+ return Err ( err. context( format!(
261+ "Failed to auto-broadcast fallback after sender session error: {}" ,
262+ broadcast_err
263+ ) ) ) ;
264+ }
265+ }
266+ }
267+ }
268+ } ,
253269 _ = interrupt. changed( ) => {
254270 println!( "Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions." ) ;
255271 return Err ( anyhow!( "Interrupted" ) )
@@ -472,6 +488,20 @@ impl AppTrait for App {
472488}
473489
474490impl App {
491+ fn maybe_broadcast_sender_fallback ( & self , persister : & SenderPersister ) -> Result < bool > {
492+ let ( session, history) = replay_sender_event_log ( persister) ?;
493+ match session {
494+ SendSession :: Closed ( SenderSessionOutcome :: Failure )
495+ | SendSession :: Closed ( SenderSessionOutcome :: Cancel ) => {
496+ let fallback_tx = history. fallback_tx ( ) ;
497+ self . wallet ( ) . broadcast_tx ( & fallback_tx) ?;
498+ println ! ( "Broadcasted fallback transaction txid: {}" , fallback_tx. compute_txid( ) ) ;
499+ Ok ( true )
500+ }
501+ _ => Ok ( false ) ,
502+ }
503+ }
504+
475505 fn close_failed_session < P > ( persister : & P , session_id : & SessionId , role : & str )
476506 where
477507 P : SessionPersister ,
@@ -497,7 +527,11 @@ impl App {
497527 self . process_pj_response ( proposal) ?;
498528 return Ok ( ( ) ) ;
499529 }
500- _ => return Err ( anyhow ! ( "Unexpected sender state" ) ) ,
530+ SendSession :: Closed ( SenderSessionOutcome :: Failure )
531+ | SendSession :: Closed ( SenderSessionOutcome :: Cancel ) => {
532+ self . maybe_broadcast_sender_fallback ( persister) ?;
533+ return Ok ( ( ) ) ;
534+ }
501535 }
502536 Ok ( ( ) )
503537 }
@@ -872,3 +906,131 @@ impl App {
872906 . context ( "HTTP request failed" )
873907 }
874908}
909+
910+ #[ cfg( test) ]
911+ mod tests {
912+ use std:: sync:: { Arc , Mutex } ;
913+
914+ use payjoin:: bitcoin:: { Amount , FeeRate } ;
915+ use payjoin:: persist:: { NoopSessionPersister , SessionPersister } ;
916+ use payjoin:: receive:: v2:: ReceiverBuilder ;
917+ use payjoin:: send:: v2:: {
918+ SendSession , SenderBuilder , SessionEvent as SenderSessionEvent ,
919+ SessionOutcome as SenderSessionOutcome ,
920+ } ;
921+ use payjoin:: { PjParam , Uri , UriExt } ;
922+ use payjoin_test_utils:: { corepc_node, TestServices } ;
923+ use tempfile:: tempdir;
924+ use tokio:: sync:: watch;
925+
926+ use super :: ohttp:: RelayManager ;
927+ use super :: App ;
928+ use crate :: app:: config:: { BitcoindConfig , Config } ;
929+ use crate :: app:: wallet:: BitcoindWallet ;
930+ use crate :: app:: App as AppTrait ;
931+ use crate :: db:: v2:: SenderPersister ;
932+ use crate :: db:: Database ;
933+
934+ async fn test_app (
935+ bitcoind : & corepc_node:: Node ,
936+ wallet_name : & str ,
937+ ) -> ( App , BitcoindWallet , tempfile:: TempDir ) {
938+ let rpchost = payjoin:: Url :: parse ( & format ! (
939+ "http://{}/wallet/{}" ,
940+ bitcoind. params. rpc_socket, wallet_name
941+ ) )
942+ . expect ( "valid url" ) ;
943+ let bitcoind_config = BitcoindConfig {
944+ rpchost : rpchost. clone ( ) ,
945+ cookie : Some ( bitcoind. params . cookie_file . clone ( ) ) ,
946+ rpcuser : String :: new ( ) ,
947+ rpcpassword : String :: new ( ) ,
948+ } ;
949+ let wallet = BitcoindWallet :: new ( & bitcoind_config) . await . expect ( "wallet should connect" ) ;
950+ let temp_dir = tempdir ( ) . expect ( "temp dir" ) ;
951+ let db = Arc :: new ( Database :: create ( temp_dir. path ( ) . join ( "test.db" ) ) . expect ( "db created" ) ) ;
952+ let ( _, interrupt_rx) = watch:: channel ( ( ) ) ;
953+ let config = Config {
954+ db_path : temp_dir. path ( ) . to_path_buf ( ) ,
955+ max_fee_rate : None ,
956+ bitcoind : bitcoind_config,
957+ version : None ,
958+ #[ cfg( feature = "_manual-tls" ) ]
959+ root_certificate : None ,
960+ #[ cfg( feature = "_manual-tls" ) ]
961+ certificate_key : None ,
962+ } ;
963+ let app = App {
964+ config,
965+ db,
966+ wallet : wallet. clone ( ) ,
967+ interrupt : interrupt_rx,
968+ relay_manager : Arc :: new ( Mutex :: new ( RelayManager :: new ( ) ) ) ,
969+ } ;
970+ ( app, wallet, temp_dir)
971+ }
972+
973+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
974+ async fn sender_processes_fallback_on_failure ( ) {
975+ let ( bitcoind, _sender, receiver) =
976+ payjoin_test_utils:: init_bitcoind_sender_receiver ( None , None )
977+ . expect ( "bitcoind should start" ) ;
978+ let ( app, wallet, _temp_dir) = test_app ( & bitcoind, "sender" ) . await ;
979+ let mut services = TestServices :: initialize ( ) . await . expect ( "services should initialize" ) ;
980+ services. wait_for_services_ready ( ) . await . expect ( "services should be healthy" ) ;
981+
982+ let receive_address = receiver. new_address ( ) . expect ( "receiver address should be created" ) ;
983+ let ohttp_keys = services. fetch_ohttp_keys ( ) . await . expect ( "ohttp keys should be fetched" ) ;
984+ let receiver_session =
985+ ReceiverBuilder :: new ( receive_address, services. directory_url ( ) . as_str ( ) , ohttp_keys)
986+ . expect ( "receiver builder should initialize" )
987+ . with_amount ( Amount :: from_sat ( 50_000 ) )
988+ . build ( )
989+ . save ( & NoopSessionPersister :: default ( ) )
990+ . expect ( "receiver session should persist" ) ;
991+ let pj_uri = receiver_session. pj_uri ( ) ;
992+
993+ let uri = Uri :: try_from ( pj_uri. to_string ( ) )
994+ . expect ( "pj uri string should parse" )
995+ . assume_checked ( )
996+ . check_pj_supported ( )
997+ . expect ( "pj uri should support payjoin" ) ;
998+ let ( pj_param, amount) = match ( uri. extras . pj_param ( ) , uri. amount ) {
999+ ( PjParam :: V2 ( pj_param) , Some ( amount) ) => ( pj_param, amount) ,
1000+ _ => panic ! ( "receiver should produce a v2 URI with amount" ) ,
1001+ } ;
1002+
1003+ let original_psbt = app
1004+ . create_original_psbt ( & uri. address , amount, FeeRate :: BROADCAST_MIN )
1005+ . expect ( "original psbt should be created" ) ;
1006+ let sender =
1007+ SenderBuilder :: from_parts ( original_psbt, & pj_param, & uri. address , Some ( amount) )
1008+ . build_recommended ( FeeRate :: BROADCAST_MIN )
1009+ . expect ( "sender should be created" ) ;
1010+
1011+ let sender_persister =
1012+ SenderPersister :: new ( app. db . clone ( ) , & pj_uri. to_string ( ) , pj_param. receiver_pubkey ( ) )
1013+ . expect ( "persister created" ) ;
1014+ sender. save ( & sender_persister) . expect ( "sender should persist" ) ;
1015+
1016+ // Manually mark the sender session as failed to trigger the fallback
1017+ sender_persister
1018+ . save_event ( SenderSessionEvent :: Closed ( SenderSessionOutcome :: Failure ) )
1019+ . expect ( "failure event should be saved" ) ;
1020+ app. process_sender_session (
1021+ SendSession :: Closed ( SenderSessionOutcome :: Failure ) ,
1022+ & sender_persister,
1023+ )
1024+ . await
1025+ . expect ( "sender failure should process" ) ;
1026+
1027+ let ( _, history) =
1028+ payjoin:: send:: v2:: replay_event_log ( & sender_persister) . expect ( "sender history replays" ) ;
1029+ let fallback_tx = history. fallback_tx ( ) ;
1030+ wallet
1031+ . get_raw_transaction ( & fallback_tx. compute_txid ( ) )
1032+ . expect ( "fallback tx should be broadcast" ) ;
1033+ let _ = services. take_ohttp_relay_handle ( ) ;
1034+ let _ = services. take_directory_handle ( ) ;
1035+ }
1036+ }
0 commit comments