Skip to content

Commit 929bc66

Browse files
committed
Add async payments
1 parent a489fbf commit 929bc66

File tree

14 files changed

+623
-11
lines changed

14 files changed

+623
-11
lines changed

bindings/ldk_node.udl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dictionary Config {
1313
u64 probing_liquidity_limit_multiplier;
1414
AnchorChannelsConfig? anchor_channels_config;
1515
RouteParametersConfig? route_parameters;
16+
boolean async_payment_services_enabled;
1617
};
1718

1819
dictionary AnchorChannelsConfig {
@@ -209,6 +210,12 @@ interface Bolt12Payment {
209210
Bolt12Invoice request_refund_payment([ByRef]Refund refund);
210211
[Throws=NodeError]
211212
Refund initiate_refund(u64 amount_msat, u32 expiry_secs, u64? quantity, string? payer_note);
213+
[Throws=NodeError]
214+
Offer receive_async();
215+
[Throws=NodeError]
216+
void set_paths_to_static_invoice_server(bytes paths);
217+
[Throws=NodeError]
218+
bytes blinded_paths_for_async_recipient(bytes recipient_id);
212219
};
213220

214221
interface SpontaneousPayment {
@@ -311,6 +318,8 @@ enum NodeError {
311318
"InsufficientFunds",
312319
"LiquiditySourceUnavailable",
313320
"LiquidityFeeTooHigh",
321+
"InvalidBlindedPaths",
322+
"AsyncPaymentServicesDisabled",
314323
};
315324

316325
dictionary NodeStatus {

src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1455,7 +1455,7 @@ fn build_with_store_internal(
14551455
Arc::clone(&channel_manager),
14561456
message_router,
14571457
Arc::clone(&channel_manager),
1458-
IgnoringMessageHandler {},
1458+
Arc::clone(&channel_manager),
14591459
IgnoringMessageHandler {},
14601460
IgnoringMessageHandler {},
14611461
));

src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ pub struct Config {
179179
/// **Note:** If unset, default parameters will be used, and you will be able to override the
180180
/// parameters on a per-payment basis in the corresponding method calls.
181181
pub route_parameters: Option<RouteParametersConfig>,
182+
/// Whether to enable the static invoice service to support async payment reception for clients.
183+
pub async_payment_services_enabled: bool,
182184
}
183185

184186
impl Default for Config {
@@ -193,6 +195,7 @@ impl Default for Config {
193195
anchor_channels_config: Some(AnchorChannelsConfig::default()),
194196
route_parameters: None,
195197
node_alias: None,
198+
async_payment_services_enabled: false,
196199
}
197200
}
198201
}

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ pub enum Error {
120120
LiquiditySourceUnavailable,
121121
/// The given operation failed due to the LSP's required opening fee being too high.
122122
LiquidityFeeTooHigh,
123+
/// The given blinded paths are invalid.
124+
InvalidBlindedPaths,
125+
/// Asynchronous payment services are disabled.
126+
AsyncPaymentServicesDisabled,
123127
}
124128

125129
impl fmt::Display for Error {
@@ -193,6 +197,10 @@ impl fmt::Display for Error {
193197
Self::LiquidityFeeTooHigh => {
194198
write!(f, "The given operation failed due to the LSP's required opening fee being too high.")
195199
},
200+
Self::InvalidBlindedPaths => write!(f, "The given blinded paths are invalid."),
201+
Self::AsyncPaymentServicesDisabled => {
202+
write!(f, "Asynchronous payment services are disabled.")
203+
},
196204
}
197205
}
198206
}

src/event.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
89
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
9-
1010
use crate::{
1111
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
1212
UserChannelId,
@@ -458,6 +458,7 @@ where
458458
runtime: Arc<Runtime>,
459459
logger: L,
460460
config: Arc<Config>,
461+
static_invoice_store: Option<StaticInvoiceStore>,
461462
}
462463

463464
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -470,8 +471,9 @@ where
470471
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
471472
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
472473
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
473-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
474-
logger: L, config: Arc<Config>,
474+
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
475+
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
476+
config: Arc<Config>,
475477
) -> Self {
476478
Self {
477479
event_queue,
@@ -487,6 +489,7 @@ where
487489
logger,
488490
runtime,
489491
config,
492+
static_invoice_store,
490493
}
491494
}
492495

@@ -1494,11 +1497,46 @@ where
14941497
LdkEvent::OnionMessagePeerConnected { .. } => {
14951498
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
14961499
},
1497-
LdkEvent::PersistStaticInvoice { .. } => {
1498-
debug_assert!(false, "We currently don't support static invoice persistence, so this event should never be emitted.");
1500+
1501+
LdkEvent::PersistStaticInvoice {
1502+
invoice,
1503+
invoice_slot,
1504+
recipient_id,
1505+
invoice_persisted_path,
1506+
} => {
1507+
if let Some(store) = self.static_invoice_store.as_ref() {
1508+
match store
1509+
.handle_persist_static_invoice(invoice, invoice_slot, recipient_id)
1510+
.await
1511+
{
1512+
Ok(_) => {},
1513+
Err(e) => {
1514+
log_error!(self.logger, "Failed to persist static invoice: {}", e);
1515+
},
1516+
};
1517+
1518+
self.channel_manager.static_invoice_persisted(invoice_persisted_path);
1519+
}
14991520
},
1500-
LdkEvent::StaticInvoiceRequested { .. } => {
1501-
debug_assert!(false, "We currently don't support static invoice persistence, so this event should never be emitted.");
1521+
LdkEvent::StaticInvoiceRequested { recipient_id, invoice_slot, reply_path } => {
1522+
if let Some(store) = self.static_invoice_store.as_ref() {
1523+
let invoice =
1524+
store.handle_static_invoice_requested(&recipient_id, invoice_slot).await;
1525+
1526+
match invoice {
1527+
Ok(Some(invoice)) => {
1528+
if let Err(e) =
1529+
self.channel_manager.send_static_invoice(invoice, reply_path)
1530+
{
1531+
log_error!(self.logger, "Failed to send static invoice: {:?}", e);
1532+
}
1533+
},
1534+
Ok(None) => {},
1535+
Err(e) => {
1536+
log_error!(self.logger, "Failed to retrieve static invoice: {}", e);
1537+
},
1538+
}
1539+
}
15021540
},
15031541
LdkEvent::FundingTransactionReadyForSigning { .. } => {
15041542
debug_assert!(false, "We currently don't support interactive-tx, so this event should never be emitted.");

src/io/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,8 @@ pub(crate) const BDK_WALLET_TX_GRAPH_KEY: &str = "tx_graph";
7373
pub(crate) const BDK_WALLET_INDEXER_PRIMARY_NAMESPACE: &str = "bdk_wallet";
7474
pub(crate) const BDK_WALLET_INDEXER_SECONDARY_NAMESPACE: &str = "";
7575
pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
76+
77+
// Static invoices will be persisted at "static_invoices/<sha256(recipient_id)>/<invoice_slot>".
78+
//
79+
// Example: static_invoices/039058c6f2c0cb492c533b0a4d14ef77cc0f78abccced5287d84a1a2011cfb81/001f
80+
pub(crate) const STATIC_INVOICES_PRIMARY_NAMESPACE: &str = "static_invoices";

src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ use gossip::GossipSource;
136136
use graph::NetworkGraph;
137137
use io::utils::write_node_metrics;
138138
use liquidity::{LSPS1Liquidity, LiquiditySource};
139+
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
139140
use payment::{
140141
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
141142
UnifiedQrPayment,
@@ -498,6 +499,12 @@ impl Node {
498499
Arc::clone(&self.logger),
499500
));
500501

502+
let static_invoice_store = if self.config.async_payment_services_enabled {
503+
Some(StaticInvoiceStore::new(Arc::clone(&self.kv_store)))
504+
} else {
505+
None
506+
};
507+
501508
let event_handler = Arc::new(EventHandler::new(
502509
Arc::clone(&self.event_queue),
503510
Arc::clone(&self.wallet),
@@ -509,6 +516,7 @@ impl Node {
509516
self.liquidity_source.clone(),
510517
Arc::clone(&self.payment_store),
511518
Arc::clone(&self.peer_store),
519+
static_invoice_store,
512520
Arc::clone(&self.runtime),
513521
Arc::clone(&self.logger),
514522
Arc::clone(&self.config),
@@ -818,6 +826,7 @@ impl Node {
818826
Bolt12Payment::new(
819827
Arc::clone(&self.channel_manager),
820828
Arc::clone(&self.payment_store),
829+
self.config.async_payment_services_enabled,
821830
Arc::clone(&self.is_running),
822831
Arc::clone(&self.logger),
823832
)
@@ -831,6 +840,7 @@ impl Node {
831840
Arc::new(Bolt12Payment::new(
832841
Arc::clone(&self.channel_manager),
833842
Arc::clone(&self.payment_store),
843+
self.config.async_payment_services_enabled,
834844
Arc::clone(&self.is_running),
835845
Arc::clone(&self.logger),
836846
))

src/payment/asynchronous/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
mod rate_limiter;
2+
pub(crate) mod static_invoice_store;
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::collections::HashMap;
2+
use std::time::{Duration, Instant};
3+
4+
/// Implements a leaky-bucket style rate limiter parameterized by the max capacity of the bucket, the refill interval,
5+
/// and the max idle duration.
6+
///
7+
/// For every passing of the refill interval, one token is added to the bucket, up to the maximum capacity. When the
8+
/// bucket has remained at the maximum capacity for longer than the max idle duration, it is removed to prevent memory
9+
/// leakage.
10+
pub(crate) struct RateLimiter {
11+
users: HashMap<Vec<u8>, Bucket>,
12+
capacity: u32,
13+
refill_interval: Duration,
14+
max_idle: Duration,
15+
}
16+
17+
struct Bucket {
18+
tokens: u32,
19+
last_refill: Instant,
20+
}
21+
22+
impl RateLimiter {
23+
pub(crate) fn new(capacity: u32, refill_interval: Duration, max_idle: Duration) -> Self {
24+
Self { users: HashMap::new(), capacity, refill_interval, max_idle }
25+
}
26+
27+
pub(crate) fn allow(&mut self, user_id: &[u8]) -> bool {
28+
let now = Instant::now();
29+
30+
let entry = self.users.entry(user_id.to_vec());
31+
let is_new_user = matches!(entry, std::collections::hash_map::Entry::Vacant(_));
32+
33+
let bucket = entry.or_insert(Bucket { tokens: self.capacity, last_refill: now });
34+
35+
let elapsed = now.duration_since(bucket.last_refill);
36+
let tokens_to_add = (elapsed.as_secs_f64() / self.refill_interval.as_secs_f64()) as u32;
37+
38+
if tokens_to_add > 0 {
39+
bucket.tokens = (bucket.tokens + tokens_to_add).min(self.capacity);
40+
bucket.last_refill = now;
41+
}
42+
43+
let allow = if bucket.tokens > 0 {
44+
bucket.tokens -= 1;
45+
true
46+
} else {
47+
false
48+
};
49+
50+
// Each time a new user is added, we take the opportunity to clean up old rate limits.
51+
if is_new_user {
52+
self.garbage_collect(self.max_idle);
53+
}
54+
55+
allow
56+
}
57+
58+
fn garbage_collect(&mut self, max_idle: Duration) {
59+
let now = Instant::now();
60+
self.users.retain(|_, bucket| now.duration_since(bucket.last_refill) < max_idle);
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use crate::payment::asynchronous::rate_limiter::RateLimiter;
67+
68+
use std::time::Duration;
69+
70+
#[test]
71+
fn rate_limiter_test() {
72+
// Test
73+
let mut rate_limiter =
74+
RateLimiter::new(3, Duration::from_millis(100), Duration::from_secs(1));
75+
76+
assert!(rate_limiter.allow(b"user1"));
77+
assert!(rate_limiter.allow(b"user1"));
78+
assert!(rate_limiter.allow(b"user1"));
79+
assert!(!rate_limiter.allow(b"user1"));
80+
assert!(rate_limiter.allow(b"user2"));
81+
82+
std::thread::sleep(Duration::from_millis(150));
83+
84+
assert!(rate_limiter.allow(b"user1"));
85+
assert!(rate_limiter.allow(b"user2"));
86+
}
87+
}

0 commit comments

Comments
 (0)