Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dictionary Config {
u64 probing_liquidity_limit_multiplier;
AnchorChannelsConfig? anchor_channels_config;
RouteParametersConfig? route_parameters;
boolean async_payment_services_enabled;
};

dictionary AnchorChannelsConfig {
Expand Down Expand Up @@ -209,6 +210,12 @@ interface Bolt12Payment {
Bolt12Invoice request_refund_payment([ByRef]Refund refund);
[Throws=NodeError]
Refund initiate_refund(u64 amount_msat, u32 expiry_secs, u64? quantity, string? payer_note);
[Throws=NodeError]
Offer receive_async();
[Throws=NodeError]
void set_paths_to_static_invoice_server(bytes paths);
[Throws=NodeError]
bytes blinded_paths_for_async_recipient(bytes recipient_id);
};

interface SpontaneousPayment {
Expand Down Expand Up @@ -311,6 +318,8 @@ enum NodeError {
"InsufficientFunds",
"LiquiditySourceUnavailable",
"LiquidityFeeTooHigh",
"InvalidBlindedPaths",
"AsyncPaymentServicesDisabled",
};

dictionary NodeStatus {
Expand Down
2 changes: 1 addition & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ fn build_with_store_internal(
Arc::clone(&channel_manager),
message_router,
Arc::clone(&channel_manager),
IgnoringMessageHandler {},
Arc::clone(&channel_manager),
IgnoringMessageHandler {},
IgnoringMessageHandler {},
));
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ pub struct Config {
/// **Note:** If unset, default parameters will be used, and you will be able to override the
/// parameters on a per-payment basis in the corresponding method calls.
pub route_parameters: Option<RouteParametersConfig>,
/// Whether to enable the static invoice service to support async payment reception for clients.
pub async_payment_services_enabled: bool,
}

impl Default for Config {
Expand All @@ -193,6 +195,7 @@ impl Default for Config {
anchor_channels_config: Some(AnchorChannelsConfig::default()),
route_parameters: None,
node_alias: None,
async_payment_services_enabled: false,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub enum Error {
LiquiditySourceUnavailable,
/// The given operation failed due to the LSP's required opening fee being too high.
LiquidityFeeTooHigh,
/// The given blinded paths are invalid.
InvalidBlindedPaths,
/// Asynchronous payment services are disabled.
AsyncPaymentServicesDisabled,
}

impl fmt::Display for Error {
Expand Down Expand Up @@ -193,6 +197,10 @@ impl fmt::Display for Error {
Self::LiquidityFeeTooHigh => {
write!(f, "The given operation failed due to the LSP's required opening fee being too high.")
},
Self::InvalidBlindedPaths => write!(f, "The given blinded paths are invalid."),
Self::AsyncPaymentServicesDisabled => {
write!(f, "Asynchronous payment services are disabled.")
},
}
}
}
Expand Down
63 changes: 55 additions & 8 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// accordance with one or both of these licenses.

use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};

use crate::{
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
UserChannelId,
Expand All @@ -19,6 +18,7 @@ use crate::fee_estimator::ConfirmationTarget;
use crate::liquidity::LiquiditySource;
use crate::logger::Logger;

use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
use crate::payment::store::{
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
};
Expand All @@ -27,7 +27,7 @@ use crate::io::{
EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger};

use crate::runtime::Runtime;

Expand Down Expand Up @@ -458,6 +458,7 @@ where
runtime: Arc<Runtime>,
logger: L,
config: Arc<Config>,
static_invoice_store: Option<StaticInvoiceStore>,
}

impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
Expand All @@ -470,8 +471,9 @@ where
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
logger: L, config: Arc<Config>,
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
config: Arc<Config>,
) -> Self {
Self {
event_queue,
Expand All @@ -487,6 +489,7 @@ where
logger,
runtime,
config,
static_invoice_store,
}
}

Expand Down Expand Up @@ -1494,11 +1497,55 @@ where
LdkEvent::OnionMessagePeerConnected { .. } => {
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
},
LdkEvent::PersistStaticInvoice { .. } => {
debug_assert!(false, "We currently don't support static invoice persistence, so this event should never be emitted.");

LdkEvent::PersistStaticInvoice {
invoice,
invoice_slot,
recipient_id,
invoice_persisted_path,
} => {
if let Some(store) = self.static_invoice_store.as_ref() {
match store
.handle_persist_static_invoice(invoice, invoice_slot, recipient_id)
.await
{
Ok(_) => {
self.channel_manager.static_invoice_persisted(invoice_persisted_path);
},
Err(e) => {
log_error!(self.logger, "Failed to persist static invoice: {}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to return Err(ReplayEvent()) here and below in case of persistence failure to be able to retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Also static_invoice_persisted shouldn't be called in that case. Fixed here and below.

return Err(ReplayEvent());
},
};
}
},
LdkEvent::StaticInvoiceRequested { .. } => {
debug_assert!(false, "We currently don't support static invoice persistence, so this event should never be emitted.");
LdkEvent::StaticInvoiceRequested { recipient_id, invoice_slot, reply_path } => {
if let Some(store) = self.static_invoice_store.as_ref() {
let invoice =
store.handle_static_invoice_requested(&recipient_id, invoice_slot).await;

match invoice {
Ok(Some(invoice)) => {
if let Err(e) =
self.channel_manager.send_static_invoice(invoice, reply_path)
{
log_error!(self.logger, "Failed to send static invoice: {:?}", e);
}
},
Ok(None) => {
log_trace!(
self.logger,
"No static invoice found for recipient {} and slot {}",
hex_utils::to_string(&recipient_id),
invoice_slot
);
},
Err(e) => {
log_error!(self.logger, "Failed to retrieve static invoice: {}", e);
return Err(ReplayEvent());
},
}
}
},
LdkEvent::FundingTransactionReadyForSigning { .. } => {
debug_assert!(false, "We currently don't support interactive-tx, so this event should never be emitted.");
Expand Down
5 changes: 5 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ pub(crate) const BDK_WALLET_TX_GRAPH_KEY: &str = "tx_graph";
pub(crate) const BDK_WALLET_INDEXER_PRIMARY_NAMESPACE: &str = "bdk_wallet";
pub(crate) const BDK_WALLET_INDEXER_SECONDARY_NAMESPACE: &str = "";
pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";

/// [`StaticInvoice`]s will be persisted under this key.
///
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ use gossip::GossipSource;
use graph::NetworkGraph;
use io::utils::write_node_metrics;
use liquidity::{LSPS1Liquidity, LiquiditySource};
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
use payment::{
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
UnifiedQrPayment,
Expand Down Expand Up @@ -498,6 +499,12 @@ impl Node {
Arc::clone(&self.logger),
));

let static_invoice_store = if self.config.async_payment_services_enabled {
Some(StaticInvoiceStore::new(Arc::clone(&self.kv_store)))
} else {
None
};

let event_handler = Arc::new(EventHandler::new(
Arc::clone(&self.event_queue),
Arc::clone(&self.wallet),
Expand All @@ -509,6 +516,7 @@ impl Node {
self.liquidity_source.clone(),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
static_invoice_store,
Arc::clone(&self.runtime),
Arc::clone(&self.logger),
Arc::clone(&self.config),
Expand Down Expand Up @@ -818,6 +826,7 @@ impl Node {
Bolt12Payment::new(
Arc::clone(&self.channel_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.config),
Arc::clone(&self.is_running),
Arc::clone(&self.logger),
)
Expand All @@ -831,6 +840,7 @@ impl Node {
Arc::new(Bolt12Payment::new(
Arc::clone(&self.channel_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.config),
Arc::clone(&self.is_running),
Arc::clone(&self.logger),
))
Expand Down
9 changes: 9 additions & 0 deletions src/payment/asynchronous/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// This file is Copyright its original authors, visible in version control history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

mod rate_limiter;
pub(crate) mod static_invoice_store;
96 changes: 96 additions & 0 deletions src/payment/asynchronous/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// This file is Copyright its original authors, visible in version control history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

//! [`RateLimiter`] to control the rate of requests from users.

use std::collections::HashMap;
use std::time::{Duration, Instant};

/// Implements a leaky-bucket style rate limiter parameterized by the max capacity of the bucket, the refill interval,
/// and the max idle duration.
///
/// For every passing of the refill interval, one token is added to the bucket, up to the maximum capacity. When the
/// bucket has remained at the maximum capacity for longer than the max idle duration, it is removed to prevent memory
/// leakage.
pub(crate) struct RateLimiter {
users: HashMap<Vec<u8>, Bucket>,
capacity: u32,
refill_interval: Duration,
max_idle: Duration,
}

struct Bucket {
tokens: u32,
last_refill: Instant,
}

impl RateLimiter {
pub(crate) fn new(capacity: u32, refill_interval: Duration, max_idle: Duration) -> Self {
Self { users: HashMap::new(), capacity, refill_interval, max_idle }
}

pub(crate) fn allow(&mut self, user_id: &[u8]) -> bool {
let now = Instant::now();

let entry = self.users.entry(user_id.to_vec());
let is_new_user = matches!(entry, std::collections::hash_map::Entry::Vacant(_));

let bucket = entry.or_insert(Bucket { tokens: self.capacity, last_refill: now });

let elapsed = now.duration_since(bucket.last_refill);
let tokens_to_add = (elapsed.as_secs_f64() / self.refill_interval.as_secs_f64()) as u32;

if tokens_to_add > 0 {
bucket.tokens = (bucket.tokens + tokens_to_add).min(self.capacity);
bucket.last_refill = now;
}

let allow = if bucket.tokens > 0 {
bucket.tokens -= 1;
true
} else {
false
};

// Each time a new user is added, we take the opportunity to clean up old rate limits.
if is_new_user {
self.garbage_collect(self.max_idle);
}

allow
}

fn garbage_collect(&mut self, max_idle: Duration) {
let now = Instant::now();
self.users.retain(|_, bucket| now.duration_since(bucket.last_refill) < max_idle);
}
}

#[cfg(test)]
mod tests {
use crate::payment::asynchronous::rate_limiter::RateLimiter;

use std::time::Duration;

#[test]
fn rate_limiter_test() {
// Test
let mut rate_limiter =
RateLimiter::new(3, Duration::from_millis(100), Duration::from_secs(1));

assert!(rate_limiter.allow(b"user1"));
assert!(rate_limiter.allow(b"user1"));
assert!(rate_limiter.allow(b"user1"));
assert!(!rate_limiter.allow(b"user1"));
assert!(rate_limiter.allow(b"user2"));

std::thread::sleep(Duration::from_millis(150));

assert!(rate_limiter.allow(b"user1"));
assert!(rate_limiter.allow(b"user2"));
}
}
Loading
Loading