@@ -142,17 +142,29 @@ impl Writer for VecWriter {
142142 }
143143}
144144
145+ /// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
146+ /// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
147+ /// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
148+ /// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
149+ ///
150+ /// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
151+ /// simply be replayed on startup.
152+ struct LatestMonitorState {
153+ /// The latest monitor id which we told LDK we've persisted
154+ persisted_monitor_id : u64 ,
155+ /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
156+ persisted_monitor : Vec < u8 > ,
157+ /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
158+ /// from LDK's perspective.
159+ pending_monitors : Vec < ( u64 , Vec < u8 > ) > ,
160+ }
161+
145162struct TestChainMonitor {
146163 pub logger : Arc < dyn Logger > ,
147164 pub keys : Arc < KeyProvider > ,
148165 pub persister : Arc < TestPersister > ,
149166 pub chain_monitor : Arc < chainmonitor:: ChainMonitor < TestChannelSigner , Arc < dyn chain:: Filter > , Arc < TestBroadcaster > , Arc < FuzzEstimator > , Arc < dyn Logger > , Arc < TestPersister > > > ,
150- // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
151- // logic will automatically force-close our channels for us (as we don't have an up-to-date
152- // monitor implying we are not able to punish misbehaving counterparties). Because this test
153- // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154- // fully-serialized monitor state here, as well as the corresponding update_id.
155- pub latest_monitors : Mutex < HashMap < OutPoint , ( u64 , Vec < u8 > ) > > ,
167+ pub latest_monitors : Mutex < HashMap < OutPoint , LatestMonitorState > > ,
156168}
157169impl TestChainMonitor {
158170 pub fn new ( broadcaster : Arc < TestBroadcaster > , logger : Arc < dyn Logger > , feeest : Arc < FuzzEstimator > , persister : Arc < TestPersister > , keys : Arc < KeyProvider > ) -> Self {
@@ -169,22 +181,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169181 fn watch_channel ( & self , funding_txo : OutPoint , monitor : channelmonitor:: ChannelMonitor < TestChannelSigner > ) -> Result < chain:: ChannelMonitorUpdateStatus , ( ) > {
170182 let mut ser = VecWriter ( Vec :: new ( ) ) ;
171183 monitor. write ( & mut ser) . unwrap ( ) ;
172- if let Some ( _) = self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, ( monitor. get_latest_update_id ( ) , ser. 0 ) ) {
184+ let monitor_id = monitor. get_latest_update_id ( ) ;
185+ let res = self . chain_monitor . watch_channel ( funding_txo, monitor) ;
186+ let state = match res {
187+ Ok ( chain:: ChannelMonitorUpdateStatus :: Completed ) => {
188+ LatestMonitorState {
189+ persisted_monitor_id : monitor_id, persisted_monitor : ser. 0 ,
190+ pending_monitors : Vec :: new ( ) ,
191+ }
192+ } ,
193+ Ok ( chain:: ChannelMonitorUpdateStatus :: InProgress ) =>
194+ panic ! ( "The test currently doesn't test initial-persistence via the async pipeline" ) ,
195+ Ok ( chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ) => panic ! ( ) ,
196+ Err ( ( ) ) => panic ! ( ) ,
197+ } ;
198+ if self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, state) . is_some ( ) {
173199 panic ! ( "Already had monitor pre-watch_channel" ) ;
174200 }
175- self . chain_monitor . watch_channel ( funding_txo , monitor )
201+ res
176202 }
177203
178204 fn update_channel ( & self , funding_txo : OutPoint , update : & channelmonitor:: ChannelMonitorUpdate ) -> chain:: ChannelMonitorUpdateStatus {
179205 let mut map_lock = self . latest_monitors . lock ( ) . unwrap ( ) ;
180206 let map_entry = map_lock. get_mut ( & funding_txo) . expect ( "Didn't have monitor on update call" ) ;
207+ let latest_monitor_data = map_entry. pending_monitors . last ( ) . as_ref ( ) . map ( |( _, data) | data) . unwrap_or ( & map_entry. persisted_monitor ) ;
181208 let deserialized_monitor = <( BlockHash , channelmonitor:: ChannelMonitor < TestChannelSigner > ) >::
182- read ( & mut Cursor :: new ( & map_entry . 1 ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
209+ read ( & mut Cursor :: new ( & latest_monitor_data ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
183210 deserialized_monitor. update_monitor ( update, & & TestBroadcaster { } , & & FuzzEstimator { ret_val : atomic:: AtomicU32 :: new ( 253 ) } , & self . logger ) . unwrap ( ) ;
184211 let mut ser = VecWriter ( Vec :: new ( ) ) ;
185212 deserialized_monitor. write ( & mut ser) . unwrap ( ) ;
186- * map_entry = ( update. update_id , ser. 0 ) ;
187- self . chain_monitor . update_channel ( funding_txo, update)
213+ let res = self . chain_monitor . update_channel ( funding_txo, update) ;
214+ match res {
215+ chain:: ChannelMonitorUpdateStatus :: Completed => {
216+ map_entry. persisted_monitor_id = update. update_id ;
217+ map_entry. persisted_monitor = ser. 0 ;
218+ } ,
219+ chain:: ChannelMonitorUpdateStatus :: InProgress => {
220+ map_entry. pending_monitors . push ( ( update. update_id , ser. 0 ) ) ;
221+ } ,
222+ chain:: ChannelMonitorUpdateStatus :: UnrecoverableError => panic ! ( ) ,
223+ }
224+ res
188225 }
189226
190227 fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , Option < PublicKey > ) > {
@@ -511,9 +548,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511548
512549 let mut monitors = new_hash_map( ) ;
513550 let mut old_monitors = $old_monitors. latest_monitors. lock( ) . unwrap( ) ;
514- for ( outpoint, ( update_id, monitor_ser) ) in old_monitors. drain( ) {
515- monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read( & mut Cursor :: new( & monitor_ser) , ( & * $keys_manager, & * $keys_manager) ) . expect( "Failed to read monitor" ) . 1 ) ;
516- chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, ( update_id, monitor_ser) ) ;
551+ for ( outpoint, mut prev_state) in old_monitors. drain( ) {
552+ monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read(
553+ & mut Cursor :: new( & prev_state. persisted_monitor) , ( & * $keys_manager, & * $keys_manager)
554+ ) . expect( "Failed to read monitor" ) . 1 ) ;
555+ // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
556+ // considering them discarded. LDK should replay these for us as they're stored in
557+ // the `ChannelManager`.
558+ prev_state. pending_monitors. clear( ) ;
559+ chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, prev_state) ;
517560 }
518561 let mut monitor_refs = new_hash_map( ) ;
519562 for ( outpoint, monitor) in monitors. iter_mut( ) {
@@ -1040,6 +1083,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10401083 } }
10411084 }
10421085
1086+ let complete_first = |v : & mut Vec < _ > | if !v. is_empty ( ) { Some ( v. remove ( 0 ) ) } else { None } ;
1087+ let complete_second = |v : & mut Vec < _ > | if v. len ( ) > 1 { Some ( v. remove ( 1 ) ) } else { None } ;
1088+ let complete_monitor_update = |
1089+ monitor : & Arc < TestChainMonitor > , chan_funding,
1090+ compl_selector : & dyn Fn ( & mut Vec < ( u64 , Vec < u8 > ) > ) -> Option < ( u64 , Vec < u8 > ) > ,
1091+ | {
1092+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1093+ assert ! (
1094+ state. pending_monitors. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1095+ "updates should be sorted by id"
1096+ ) ;
1097+ if let Some ( ( id, data) ) = compl_selector( & mut state. pending_monitors) {
1098+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1099+ if id > state. persisted_monitor_id {
1100+ state. persisted_monitor_id = id;
1101+ state. persisted_monitor = data;
1102+ }
1103+ }
1104+ }
1105+ } ;
1106+
1107+ let complete_all_monitor_updates = |monitor: & Arc < TestChainMonitor > , chan_funding| {
1108+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1109+ assert ! (
1110+ state. pending_monitors. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1111+ "updates should be sorted by id"
1112+ ) ;
1113+ for ( id, data) in state. pending_monitors. drain( ..) {
1114+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1115+ if id > state. persisted_monitor_id {
1116+ state. persisted_monitor_id = id;
1117+ state. persisted_monitor = data;
1118+ }
1119+ }
1120+ }
1121+ } ;
1122+
10431123 let v = get_slice!( 1 ) [ 0 ] ;
10441124 out. locked_write( format!( "READ A BYTE! HANDLING INPUT {:x}...........\n " , v) . as_bytes( ) ) ;
10451125 match v {
@@ -1054,30 +1134,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10541134 0x05 => * monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
10551135 0x06 => * monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
10561136
1057- 0x08 => {
1058- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1059- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1060- nodes[ 0 ] . process_monitor_events ( ) ;
1061- }
1062- } ,
1063- 0x09 => {
1064- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1065- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1066- nodes[ 1 ] . process_monitor_events ( ) ;
1067- }
1068- } ,
1069- 0x0a => {
1070- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1071- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1072- nodes[ 1 ] . process_monitor_events ( ) ;
1073- }
1074- } ,
1075- 0x0b => {
1076- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1077- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1078- nodes[ 2 ] . process_monitor_events ( ) ;
1079- }
1080- } ,
1137+ 0x08 => complete_all_monitor_updates( & monitor_a, & chan_1_funding) ,
1138+ 0x09 => complete_all_monitor_updates( & monitor_b, & chan_1_funding) ,
1139+ 0x0a => complete_all_monitor_updates( & monitor_b, & chan_2_funding) ,
1140+ 0x0b => complete_all_monitor_updates( & monitor_c, & chan_2_funding) ,
10811141
10821142 0x0c => {
10831143 if !chan_a_disconnected {
@@ -1285,119 +1345,35 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
12851345 } ,
12861346 0x89 => { fee_est_c. ret_val. store( 253 , atomic:: Ordering :: Release ) ; nodes[ 2 ] . maybe_update_chan_fees( ) ; } ,
12871347
1288- 0xf0 => {
1289- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1290- if let Some ( id) = pending_updates. get ( 0 ) {
1291- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1292- }
1293- nodes[ 0 ] . process_monitor_events ( ) ;
1294- }
1295- 0xf1 => {
1296- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1297- if let Some ( id) = pending_updates. get ( 1 ) {
1298- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1299- }
1300- nodes[ 0 ] . process_monitor_events ( ) ;
1301- }
1302- 0xf2 => {
1303- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1304- if let Some ( id) = pending_updates. last ( ) {
1305- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1306- }
1307- nodes[ 0 ] . process_monitor_events ( ) ;
1308- }
1348+ 0xf0 => complete_monitor_update( & monitor_a, & chan_1_funding, & complete_first) ,
1349+ 0xf1 => complete_monitor_update( & monitor_a, & chan_1_funding, & complete_second) ,
1350+ 0xf2 => complete_monitor_update( & monitor_a, & chan_1_funding, & Vec :: pop) ,
13091351
1310- 0xf4 => {
1311- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1312- if let Some ( id) = pending_updates. get ( 0 ) {
1313- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1314- }
1315- nodes[ 1 ] . process_monitor_events ( ) ;
1316- }
1317- 0xf5 => {
1318- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1319- if let Some ( id) = pending_updates. get ( 1 ) {
1320- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1321- }
1322- nodes[ 1 ] . process_monitor_events ( ) ;
1323- }
1324- 0xf6 => {
1325- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1326- if let Some ( id) = pending_updates. last ( ) {
1327- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1328- }
1329- nodes[ 1 ] . process_monitor_events ( ) ;
1330- }
1352+ 0xf4 => complete_monitor_update( & monitor_b, & chan_1_funding, & complete_first) ,
1353+ 0xf5 => complete_monitor_update( & monitor_b, & chan_1_funding, & complete_second) ,
1354+ 0xf6 => complete_monitor_update( & monitor_b, & chan_1_funding, & Vec :: pop) ,
13311355
1332- 0xf8 => {
1333- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1334- if let Some ( id) = pending_updates. get ( 0 ) {
1335- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1336- }
1337- nodes[ 1 ] . process_monitor_events ( ) ;
1338- }
1339- 0xf9 => {
1340- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1341- if let Some ( id) = pending_updates. get ( 1 ) {
1342- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1343- }
1344- nodes[ 1 ] . process_monitor_events ( ) ;
1345- }
1346- 0xfa => {
1347- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1348- if let Some ( id) = pending_updates. last ( ) {
1349- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1350- }
1351- nodes[ 1 ] . process_monitor_events ( ) ;
1352- }
1356+ 0xf8 => complete_monitor_update( & monitor_b, & chan_2_funding, & complete_first) ,
1357+ 0xf9 => complete_monitor_update( & monitor_b, & chan_2_funding, & complete_second) ,
1358+ 0xfa => complete_monitor_update( & monitor_b, & chan_2_funding, & Vec :: pop) ,
13531359
1354- 0xfc => {
1355- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1356- if let Some ( id) = pending_updates. get ( 0 ) {
1357- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1358- }
1359- nodes[ 2 ] . process_monitor_events ( ) ;
1360- }
1361- 0xfd => {
1362- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1363- if let Some ( id) = pending_updates. get ( 1 ) {
1364- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1365- }
1366- nodes[ 2 ] . process_monitor_events ( ) ;
1367- }
1368- 0xfe => {
1369- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1370- if let Some ( id) = pending_updates. last ( ) {
1371- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1372- }
1373- nodes[ 2 ] . process_monitor_events ( ) ;
1374- }
1360+ 0xfc => complete_monitor_update( & monitor_c, & chan_2_funding, & complete_first) ,
1361+ 0xfd => complete_monitor_update( & monitor_c, & chan_2_funding, & complete_second) ,
1362+ 0xfe => complete_monitor_update( & monitor_c, & chan_2_funding, & Vec :: pop) ,
13751363
13761364 0xff => {
13771365 // Test that no channel is in a stuck state where neither party can send funds even
13781366 // after we resolve all pending events.
1379- // First make sure there are no pending monitor updates, resetting the error state
1380- // and calling force_channel_monitor_updated for each monitor .
1367+ // First make sure there are no pending monitor updates and further update
1368+ // operations complete .
13811369 * monitor_a. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
13821370 * monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
13831371 * monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
13841372
1385- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1386- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1387- nodes[ 0 ] . process_monitor_events ( ) ;
1388- }
1389- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1390- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1391- nodes[ 1 ] . process_monitor_events ( ) ;
1392- }
1393- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1394- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1395- nodes[ 1 ] . process_monitor_events ( ) ;
1396- }
1397- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1398- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1399- nodes[ 2 ] . process_monitor_events ( ) ;
1400- }
1373+ complete_all_monitor_updates( & monitor_a, & chan_1_funding) ;
1374+ complete_all_monitor_updates( & monitor_b, & chan_1_funding) ;
1375+ complete_all_monitor_updates( & monitor_b, & chan_2_funding) ;
1376+ complete_all_monitor_updates( & monitor_c, & chan_2_funding) ;
14011377
14021378 // Next, make sure peers are all connected to each other
14031379 if chan_a_disconnected {
0 commit comments