Skip to content

Commit 9ab2d27

Browse files
Use VecDeque on LSPS5/Client to limit amount of pending requests
1 parent d2c7c66 commit 9ab2d27

File tree

2 files changed

+151
-36
lines changed

2 files changed

+151
-36
lines changed

lightning-liquidity/src/lsps5/client.rs

Lines changed: 149 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,68 @@ use lightning::ln::msgs::{ErrorAction, LightningError};
3131
use lightning::sign::EntropySource;
3232
use lightning::util::logger::Level;
3333

34+
use alloc::collections::VecDeque;
3435
use alloc::string::String;
3536

3637
use core::ops::Deref;
3738

39+
impl PartialEq<LSPSRequestId> for (LSPSRequestId, (LSPS5AppName, LSPS5WebhookUrl)) {
40+
fn eq(&self, other: &LSPSRequestId) -> bool {
41+
&self.0 == other
42+
}
43+
}
44+
45+
impl PartialEq<LSPSRequestId> for (LSPSRequestId, LSPS5AppName) {
46+
fn eq(&self, other: &LSPSRequestId) -> bool {
47+
&self.0 == other
48+
}
49+
}
50+
3851
#[derive(Debug, Clone, Copy, Default)]
3952
/// Configuration for the LSPS5 client
4053
pub struct LSPS5ClientConfig {}
4154

4255
struct PeerState {
43-
pending_set_webhook_requests: HashMap<LSPSRequestId, (LSPS5AppName, LSPS5WebhookUrl)>,
44-
pending_list_webhooks_requests: HashMap<LSPSRequestId, ()>,
45-
pending_remove_webhook_requests: HashMap<LSPSRequestId, LSPS5AppName>,
56+
pending_set_webhook_requests: VecDeque<(LSPSRequestId, (LSPS5AppName, LSPS5WebhookUrl))>,
57+
pending_list_webhooks_requests: VecDeque<LSPSRequestId>,
58+
pending_remove_webhook_requests: VecDeque<(LSPSRequestId, LSPS5AppName)>,
4659
}
4760

61+
const MAX_PENDING_REQUESTS: usize = 5;
62+
4863
impl PeerState {
4964
fn new() -> Self {
5065
Self {
51-
pending_set_webhook_requests: new_hash_map(),
52-
pending_list_webhooks_requests: new_hash_map(),
53-
pending_remove_webhook_requests: new_hash_map(),
66+
pending_set_webhook_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
67+
pending_list_webhooks_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
68+
pending_remove_webhook_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
69+
}
70+
}
71+
72+
fn add_request<T, F>(&mut self, item: T, queue_selector: F)
73+
where
74+
F: FnOnce(&mut Self) -> &mut VecDeque<T>,
75+
{
76+
let queue = queue_selector(self);
77+
if queue.len() == MAX_PENDING_REQUESTS {
78+
queue.pop_front();
79+
}
80+
queue.push_back(item);
81+
}
82+
83+
fn find_and_remove_request<T, F>(
84+
&mut self, queue_selector: F, request_id: &LSPSRequestId,
85+
) -> Option<T>
86+
where
87+
F: FnOnce(&mut Self) -> &mut VecDeque<T>,
88+
T: Clone,
89+
for<'a> &'a T: PartialEq<&'a LSPSRequestId>,
90+
{
91+
let queue = queue_selector(self);
92+
if let Some(pos) = queue.iter().position(|item| item == request_id) {
93+
queue.remove(pos)
94+
} else {
95+
None
5496
}
5597
}
5698
}
@@ -153,9 +195,10 @@ where
153195
let request_id = generate_request_id(&self.entropy_source);
154196

155197
self.with_peer_state(counterparty_node_id, |peer_state| {
156-
peer_state
157-
.pending_set_webhook_requests
158-
.insert(request_id.clone(), (app_name.clone(), lsps_webhook_url.clone()));
198+
peer_state.add_request(
199+
(request_id.clone(), (app_name.clone(), lsps_webhook_url.clone())),
200+
|s| &mut s.pending_set_webhook_requests,
201+
);
159202
});
160203

161204
let request =
@@ -187,7 +230,7 @@ where
187230
let request_id = generate_request_id(&self.entropy_source);
188231

189232
self.with_peer_state(counterparty_node_id, |peer_state| {
190-
peer_state.pending_list_webhooks_requests.insert(request_id.clone(), ());
233+
peer_state.add_request(request_id.clone(), |s| &mut s.pending_list_webhooks_requests);
191234
});
192235

193236
let request = LSPS5Request::ListWebhooks(ListWebhooksRequest {});
@@ -224,7 +267,9 @@ where
224267
let request_id = generate_request_id(&self.entropy_source);
225268

226269
self.with_peer_state(counterparty_node_id, |peer_state| {
227-
peer_state.pending_remove_webhook_requests.insert(request_id.clone(), app_name.clone());
270+
peer_state.add_request((request_id.clone(), app_name.clone()), |s| {
271+
&mut s.pending_remove_webhook_requests
272+
});
228273
});
229274

230275
let request = LSPS5Request::RemoveWebhook(RemoveWebhookRequest { app_name });
@@ -255,8 +300,8 @@ where
255300
});
256301
let event_queue_notifier = self.pending_events.notifier();
257302
let handle_response = |peer_state: &mut PeerState| {
258-
if let Some((app_name, webhook_url)) =
259-
peer_state.pending_set_webhook_requests.remove(&request_id)
303+
if let Some((_, (app_name, webhook_url))) = peer_state
304+
.find_and_remove_request(|s| &mut s.pending_set_webhook_requests, &request_id)
260305
{
261306
match &response {
262307
LSPS5Response::SetWebhook(r) => {
@@ -288,7 +333,9 @@ where
288333
});
289334
},
290335
}
291-
} else if peer_state.pending_list_webhooks_requests.remove(&request_id).is_some() {
336+
} else if let Some(_) = peer_state
337+
.find_and_remove_request(|s| &mut s.pending_list_webhooks_requests, &request_id)
338+
{
292339
match &response {
293340
LSPS5Response::ListWebhooks(r) => {
294341
event_queue_notifier.enqueue(LSPS5ClientEvent::WebhooksListed {
@@ -306,8 +353,8 @@ where
306353
});
307354
},
308355
}
309-
} else if let Some(app_name) =
310-
peer_state.pending_remove_webhook_requests.remove(&request_id)
356+
} else if let Some((_, app_name)) = peer_state
357+
.find_and_remove_request(|s| &mut s.pending_remove_webhook_requests, &request_id)
311358
{
312359
match &response {
313360
LSPS5Response::RemoveWebhook(_) => {
@@ -364,19 +411,31 @@ where
364411
mod tests {
365412

366413
use super::*;
367-
use crate::{
368-
lsps0::ser::LSPSRequestId, lsps5::msgs::SetWebhookResponse, tests::utils::TestEntropy,
369-
};
414+
use crate::{lsps0::ser::LSPSRequestId, lsps5::msgs::SetWebhookResponse};
370415
use bitcoin::{key::Secp256k1, secp256k1::SecretKey};
416+
use core::sync::atomic::{AtomicU64, Ordering};
417+
418+
struct UniqueTestEntropy {
419+
counter: AtomicU64,
420+
}
421+
422+
impl EntropySource for UniqueTestEntropy {
423+
fn get_secure_random_bytes(&self) -> [u8; 32] {
424+
let counter = self.counter.fetch_add(1, Ordering::SeqCst);
425+
let mut bytes = [0u8; 32];
426+
bytes[0..8].copy_from_slice(&counter.to_be_bytes());
427+
bytes
428+
}
429+
}
371430

372431
fn setup_test_client() -> (
373-
LSPS5ClientHandler<Arc<TestEntropy>>,
432+
LSPS5ClientHandler<Arc<UniqueTestEntropy>>,
374433
Arc<MessageQueue>,
375434
Arc<EventQueue>,
376435
PublicKey,
377436
PublicKey,
378437
) {
379-
let test_entropy_source = Arc::new(TestEntropy {});
438+
let test_entropy_source = Arc::new(UniqueTestEntropy { counter: AtomicU64::new(2) });
380439
let message_queue = Arc::new(MessageQueue::new());
381440
let event_queue = Arc::new(EventQueue::new());
382441
let client = LSPS5ClientHandler::new(
@@ -410,10 +469,16 @@ mod tests {
410469
let outer_state_lock = client.per_peer_state.read().unwrap();
411470

412471
let peer_1_state = outer_state_lock.get(&peer_1).unwrap().lock().unwrap();
413-
assert!(peer_1_state.pending_set_webhook_requests.contains_key(&req_id_1));
472+
assert!(peer_1_state
473+
.pending_set_webhook_requests
474+
.iter()
475+
.any(|(id, _)| id == &req_id_1));
414476

415477
let peer_2_state = outer_state_lock.get(&peer_2).unwrap().lock().unwrap();
416-
assert!(peer_2_state.pending_set_webhook_requests.contains_key(&req_id_2));
478+
assert!(peer_2_state
479+
.pending_set_webhook_requests
480+
.iter()
481+
.any(|(id, _)| id == &req_id_2));
417482
}
418483
}
419484

@@ -432,17 +497,21 @@ mod tests {
432497
{
433498
let outer_state_lock = client.per_peer_state.read().unwrap();
434499
let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
435-
assert_eq!(
436-
peer_state.pending_set_webhook_requests.get(&set_req_id).unwrap(),
437-
&(lsps5_app_name.clone(), lsps5_webhook_url)
438-
);
439-
440-
assert!(peer_state.pending_list_webhooks_requests.contains_key(&list_req_id));
441-
442-
assert_eq!(
443-
peer_state.pending_remove_webhook_requests.get(&remove_req_id).unwrap(),
444-
&lsps5_app_name
445-
);
500+
let set_request = peer_state
501+
.pending_set_webhook_requests
502+
.iter()
503+
.find(|(id, _)| id == &set_req_id)
504+
.unwrap();
505+
assert_eq!(&set_request.1, &(lsps5_app_name.clone(), lsps5_webhook_url));
506+
507+
assert!(peer_state.pending_list_webhooks_requests.contains(&list_req_id));
508+
509+
let remove_request = peer_state
510+
.pending_remove_webhook_requests
511+
.iter()
512+
.find(|(id, _)| id == &remove_req_id)
513+
.unwrap();
514+
assert_eq!(&remove_request.1, &lsps5_app_name);
446515
}
447516
}
448517

@@ -497,4 +566,50 @@ mod tests {
497566
let error = result.unwrap_err();
498567
assert!(error.err.to_lowercase().contains("unknown request id"));
499568
}
569+
570+
#[test]
571+
fn test_pending_request_eviction() {
572+
let (client, _, _, peer, _) = setup_test_client();
573+
574+
let mut request_ids = Vec::new();
575+
for i in 0..MAX_PENDING_REQUESTS {
576+
let req_id = client
577+
.set_webhook(peer, format!("app-{}", i), format!("https://example.com/hook{}", i))
578+
.unwrap();
579+
request_ids.push(req_id);
580+
}
581+
582+
{
583+
let outer_state_lock = client.per_peer_state.read().unwrap();
584+
let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
585+
for req_id in &request_ids {
586+
assert!(peer_state.pending_set_webhook_requests.iter().any(|(id, _)| id == req_id));
587+
}
588+
assert_eq!(peer_state.pending_set_webhook_requests.len(), MAX_PENDING_REQUESTS);
589+
}
590+
591+
let new_req_id = client
592+
.set_webhook(peer, "app-new".to_string(), "https://example.com/hook-new".to_string())
593+
.unwrap();
594+
595+
{
596+
let outer_state_lock = client.per_peer_state.read().unwrap();
597+
let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
598+
assert_eq!(peer_state.pending_set_webhook_requests.len(), MAX_PENDING_REQUESTS);
599+
600+
assert!(!peer_state
601+
.pending_set_webhook_requests
602+
.iter()
603+
.any(|(id, _)| id == &request_ids[0]));
604+
605+
for req_id in &request_ids[1..] {
606+
assert!(peer_state.pending_set_webhook_requests.iter().any(|(id, _)| id == req_id));
607+
}
608+
609+
assert!(peer_state
610+
.pending_set_webhook_requests
611+
.iter()
612+
.any(|(id, _)| id == &new_req_id));
613+
}
614+
}
500615
}

lightning-liquidity/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use lightning::sign::EntropySource;
77

88
use crate::lsps0::ser::LSPSRequestId;
99

10+
pub mod time;
11+
1012
/// Converts a human-readable string representation of a short channel ID (SCID)
1113
pub fn scid_from_human_readable_string(human_readable_scid: &str) -> Result<u64, ()> {
1214
let mut parts = human_readable_scid.split('x');
@@ -56,5 +58,3 @@ mod tests {
5658
assert_eq!(vout_from_scid(scid), vout);
5759
}
5860
}
59-
60-
pub mod time;

0 commit comments

Comments
 (0)