From dc6768532793704ccacf4d2358555aba8f8cf1c7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 13 Jan 2025 12:45:24 +0100 Subject: [PATCH 1/7] Prefactor: Introduce `PaymentQueueEntry` .. which streamlines the `PaymentQueue` API a bit, but most importantly can more easily get persisted using macros in the next step. --- .../src/lsps2/payment_queue.rs | 76 +++++++++++-------- lightning-liquidity/src/lsps2/service.rs | 18 ++--- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/lightning-liquidity/src/lsps2/payment_queue.rs b/lightning-liquidity/src/lsps2/payment_queue.rs index 30413537a9c..aff9f5103bc 100644 --- a/lightning-liquidity/src/lsps2/payment_queue.rs +++ b/lightning-liquidity/src/lsps2/payment_queue.rs @@ -8,7 +8,7 @@ use lightning_types::payment::PaymentHash; /// remaining payments forwarded. #[derive(Clone, Default, PartialEq, Eq, Debug)] pub(crate) struct PaymentQueue { - payments: Vec<(PaymentHash, Vec)>, + payments: Vec, } impl PaymentQueue { @@ -17,37 +17,48 @@ impl PaymentQueue { } pub(crate) fn add_htlc(&mut self, new_htlc: InterceptedHTLC) -> (u64, usize) { - let payment = self.payments.iter_mut().find(|(p, _)| p == &new_htlc.payment_hash); - if let Some((payment_hash, htlcs)) = payment { + let payment = + self.payments.iter_mut().find(|entry| entry.payment_hash == new_htlc.payment_hash); + if let Some(entry) = payment { // HTLCs within a payment should have the same payment hash. - debug_assert!(htlcs.iter().all(|htlc| htlc.payment_hash == *payment_hash)); + debug_assert!(entry.htlcs.iter().all(|htlc| htlc.payment_hash == entry.payment_hash)); // The given HTLC should not already be present. - debug_assert!(htlcs.iter().all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); - htlcs.push(new_htlc); + debug_assert!(entry + .htlcs + .iter() + .all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); + entry.htlcs.push(new_htlc); let total_expected_outbound_amount_msat = - htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); - (total_expected_outbound_amount_msat, htlcs.len()) + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); + (total_expected_outbound_amount_msat, entry.htlcs.len()) } else { let expected_outbound_amount_msat = new_htlc.expected_outbound_amount_msat; - self.payments.push((new_htlc.payment_hash, vec![new_htlc])); + let entry = + PaymentQueueEntry { payment_hash: new_htlc.payment_hash, htlcs: vec![new_htlc] }; + self.payments.push(entry); (expected_outbound_amount_msat, 1) } } - pub(crate) fn pop_greater_than_msat( - &mut self, amount_msat: u64, - ) -> Option<(PaymentHash, Vec)> { - let position = self.payments.iter().position(|(_payment_hash, htlcs)| { - htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum::() >= amount_msat + pub(crate) fn pop_greater_than_msat(&mut self, amount_msat: u64) -> Option { + let position = self.payments.iter().position(|entry| { + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum::() + >= amount_msat }); position.map(|position| self.payments.remove(position)) } pub(crate) fn clear(&mut self) -> Vec { - self.payments.drain(..).map(|(_k, v)| v).flatten().collect() + self.payments.drain(..).map(|entry| entry.htlcs).flatten().collect() } } +#[derive(Clone, PartialEq, Eq, Debug)] +pub(crate) struct PaymentQueueEntry { + pub(crate) payment_hash: PaymentHash, + pub(crate) htlcs: Vec, +} + #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub(crate) struct InterceptedHTLC { pub(crate) intercept_id: InterceptId, @@ -90,24 +101,23 @@ mod tests { }), (500_000_000, 2), ); - assert_eq!( - payment_queue.pop_greater_than_msat(500_000_000), - Some(( - PaymentHash([100; 32]), - vec![ - InterceptedHTLC { - intercept_id: InterceptId([0; 32]), - expected_outbound_amount_msat: 200_000_000, - payment_hash: PaymentHash([100; 32]), - }, - InterceptedHTLC { - intercept_id: InterceptId([2; 32]), - expected_outbound_amount_msat: 300_000_000, - payment_hash: PaymentHash([100; 32]), - }, - ] - )) - ); + + let expected_entry = PaymentQueueEntry { + payment_hash: PaymentHash([100; 32]), + htlcs: vec![ + InterceptedHTLC { + intercept_id: InterceptId([0; 32]), + expected_outbound_amount_msat: 200_000_000, + payment_hash: PaymentHash([100; 32]), + }, + InterceptedHTLC { + intercept_id: InterceptId([2; 32]), + expected_outbound_amount_msat: 300_000_000, + payment_hash: PaymentHash([100; 32]), + }, + ], + }; + assert_eq!(payment_queue.pop_greater_than_msat(500_000_000), Some(expected_entry),); assert_eq!( payment_queue.clear(), vec![InterceptedHTLC { diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 309d7ae1755..114ed8b250d 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -242,12 +242,10 @@ impl OutboundJITChannelState { } => { let mut payment_queue = core::mem::take(payment_queue); payment_queue.add_htlc(htlc); - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = HTLCInterceptedAction::ForwardPayment( *channel_id, - FeePayment { htlcs, opening_fee_msat: *opening_fee_msat }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue, @@ -277,12 +275,10 @@ impl OutboundJITChannelState { ) -> Result { match self { OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => { - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = ForwardPaymentAction( channel_id, - FeePayment { opening_fee_msat: *opening_fee_msat, htlcs }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue: core::mem::take(payment_queue), @@ -311,12 +307,10 @@ impl OutboundJITChannelState { opening_fee_msat, channel_id, } => { - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = ForwardPaymentAction( *channel_id, - FeePayment { htlcs, opening_fee_msat: *opening_fee_msat }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue: core::mem::take(payment_queue), From e6ca20855a7391e9e48c3b437cc7ace9250ba258 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 4 Aug 2025 10:27:28 +0200 Subject: [PATCH 2/7] Prefactor: Rename `StoredWebhook` to `Webhook` --- lightning-liquidity/src/lsps5/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 2b86ad3ac08..bb02509b480 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -47,7 +47,7 @@ pub const PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS: Duration = Duration::from_secs(24 /// A stored webhook. #[derive(Debug, Clone)] -struct StoredWebhook { +struct Webhook { _app_name: LSPS5AppName, url: LSPS5WebhookUrl, _counterparty_node_id: PublicKey, @@ -117,7 +117,7 @@ where TP::Target: TimeProvider, { config: LSPS5ServiceConfig, - webhooks: Mutex>>, + webhooks: Mutex>>, event_queue: Arc, pending_messages: Arc, time_provider: TP, @@ -189,7 +189,7 @@ where } else { (now, new_hash_map()) }; - entry.insert(StoredWebhook { + entry.insert(Webhook { _app_name: params.app_name.clone(), url: params.webhook.clone(), _counterparty_node_id: counterparty_node_id, @@ -212,7 +212,7 @@ where }); } - entry.insert(StoredWebhook { + entry.insert(Webhook { _app_name: params.app_name.clone(), url: params.webhook.clone(), _counterparty_node_id: counterparty_node_id, From bf91c4edc51220a2d073957dce5f90ade08affac Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 12 Aug 2025 09:58:33 +0200 Subject: [PATCH 3/7] Prefactor: Simplify `last_notification_sent` tracking While bLIP-55 describes that the service should wait at least some cooldown between sending notifications per individual `method`, there is nothing that keeps us from simplifying our approach to apply the cooldown to *any* notifications sent, especially since we just reduced the cooldown period to 1 minute elsewhere. Here, we therefore simplify the `last_notification_sent` field to just be a `Option`. --- lightning-liquidity/src/lsps5/service.rs | 29 +++++------- .../tests/lsps5_integration_tests.rs | 47 ++++++++++--------- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index bb02509b480..e956ebe2cd4 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -54,9 +54,9 @@ struct Webhook { // Timestamp used for tracking when the webhook was created / updated, or when the last notification was sent. // This is used to determine if the webhook is stale and should be pruned. last_used: LSPSDateTime, - // Map of last notification sent timestamps for each notification method. - // This is used to enforce notification cooldowns. - last_notification_sent: HashMap, + // Timestamp when we last sent a notification to the client. This is used to enforce + // notification cooldowns. + last_notification_sent: Option, } /// Server-side configuration options for LSPS5 Webhook Registration. @@ -184,11 +184,8 @@ where match client_webhooks.entry(params.app_name.clone()) { Entry::Occupied(mut entry) => { no_change = entry.get().url == params.webhook; - let (last_used, last_notification_sent) = if no_change { - (entry.get().last_used, entry.get().last_notification_sent.clone()) - } else { - (now, new_hash_map()) - }; + let last_used = if no_change { entry.get().last_used } else { now }; + let last_notification_sent = entry.get().last_notification_sent; entry.insert(Webhook { _app_name: params.app_name.clone(), url: params.webhook.clone(), @@ -217,7 +214,7 @@ where url: params.webhook.clone(), _counterparty_node_id: counterparty_node_id, last_used: now, - last_notification_sent: new_hash_map(), + last_notification_sent: None, }); }, } @@ -425,11 +422,9 @@ where // (other than lsps5.webhook_registered) close in time. if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered { let rate_limit_applies = client_webhooks.iter().any(|(_, webhook)| { - webhook - .last_notification_sent - .get(¬ification.method) - .map(|last_sent| now.duration_since(&last_sent)) - .map_or(false, |duration| duration < NOTIFICATION_COOLDOWN_TIME) + webhook.last_notification_sent.as_ref().map_or(false, |last_sent| { + now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME + }) }); if rate_limit_applies { @@ -438,14 +433,14 @@ where } for (app_name, webhook) in client_webhooks.iter_mut() { - webhook.last_notification_sent.insert(notification.method.clone(), now); - webhook.last_used = now; self.send_notification( client_id, app_name.clone(), webhook.url.clone(), notification.clone(), )?; + webhook.last_used = now; + webhook.last_notification_sent = Some(now); } Ok(()) } @@ -527,7 +522,7 @@ where let mut webhooks = self.webhooks.lock().unwrap(); if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) { for webhook in client_webhooks.values_mut() { - webhook.last_notification_sent.clear(); + webhook.last_notification_sent = None; } } } diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 9035755e89a..7b24826f57a 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -411,11 +411,13 @@ fn webhook_error_handling_test() { #[test] fn webhook_notification_delivery_test() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -499,6 +501,8 @@ fn webhook_notification_delivery_test() { "No event should be emitted due to cooldown" ); + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + let timeout_block = 700000; // Some future block height let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); @@ -719,11 +723,13 @@ fn idempotency_set_webhook_test() { #[test] fn replay_prevention_test() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -774,6 +780,9 @@ fn replay_prevention_test() { // Fill up the validator's signature cache to push out the original signature. for i in 0..MAX_RECENT_SIGNATURES { + // Advance time, allowing for another notification + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + let timeout_block = 700000 + i as u32; let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); let event = service_node.liquidity_manager.next_event().unwrap(); @@ -871,11 +880,13 @@ fn stale_webhooks() { #[test] fn test_all_notifications() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -894,9 +905,16 @@ fn test_all_notifications() { // consume initial SendWebhookNotification let _ = service_node.liquidity_manager.next_event().unwrap(); + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_onion_message_incoming(client_node_id); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_payment_incoming(client_node_id); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_expiry_soon(client_node_id, 1000); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_liquidity_management_request(client_node_id); let expected_notifications = vec![ @@ -1101,24 +1119,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { "Should not emit event due to cooldown" ); - // 3. Notification of a different method CAN be sent - let timeout_block = 424242; - let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); - let event = service_node.liquidity_manager.next_event().unwrap(); - match event { - LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { - notification, - .. - }) => { - assert!(matches!( - notification.method, - WebhookNotificationMethod::LSPS5ExpirySoon { timeout } if timeout == timeout_block - )); - }, - _ => panic!("Expected SendWebhookNotification event for expiry_soon"), - } - - // 4. Advance time past cooldown and ensure payment_incoming can be sent again + // 3. Advance time past cooldown and ensure payment_incoming can be sent again mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_payment_incoming(client_node_id); @@ -1133,7 +1134,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { _ => panic!("Expected SendWebhookNotification event after cooldown"), } - // 5. Can't send payment_incoming notification again immediately after cooldown + // 4. Can't send payment_incoming notification again immediately after cooldown let result = service_handler.notify_payment_incoming(client_node_id); let error = result.unwrap_err(); @@ -1144,7 +1145,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { "Should not emit event due to cooldown" ); - // 6. After peer_connected, notification should be sent again immediately + // 5. After peer_connected, notification should be sent again immediately let init_msg = Init { features: lightning_types::features::InitFeatures::empty(), remote_network_address: None, From b3e9bc6fdb1a2b1f6edfd7fa25ce8b5b78bea233 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 12 Aug 2025 10:02:10 +0200 Subject: [PATCH 4/7] Also reset notification cooldown on peer disconnection If we happened to send a notification while the client is connected to us, we would previously only reset the cooldown once the client connects again. While theoretically it would be preferable to never set the `last_notification_sent` field to begin with if the client is connected to us, allowing the service handler to query the peer connection state would be unnecessarily complex. Here, we therefore simply opt to also reset the `last_notification_sent` state once the peer disconnects from us. --- lightning-liquidity/src/lsps5/service.rs | 9 +++++++++ lightning-liquidity/src/manager.rs | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index e956ebe2cd4..5d492ff5d79 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -526,6 +526,15 @@ where } } } + + pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { + let mut webhooks = self.webhooks.lock().unwrap(); + if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) { + for webhook in client_webhooks.values_mut() { + webhook.last_notification_sent = None; + } + } + } } impl LSPSProtocolMessageHandler for LSPS5ServiceHandler diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 6452bd32df3..835cc9d30c1 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -712,6 +712,10 @@ where if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() { lsps2_service_handler.peer_disconnected(counterparty_node_id); } + + if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() { + lsps5_service_handler.peer_disconnected(&counterparty_node_id); + } } fn peer_connected( &self, counterparty_node_id: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init, From ce89e6b34e052516f2965d2fafa2c64eb2d4edbf Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 12 Aug 2025 12:25:42 +0200 Subject: [PATCH 5/7] Refactor `LSPS5ServiceHandler` to hold a `PeerState` Going forward, we'll add serialization logic for LSPS5 types. To contain the persisted state a bit better (and to align the model with LSPS1/2), we refactor the `LSPS5ServiceHandler` to hold a `PeerState` object. --- lightning-liquidity/src/lsps5/service.rs | 245 +++++++++++------- .../tests/lsps5_integration_tests.rs | 62 +++++ 2 files changed, 207 insertions(+), 100 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 5d492ff5d79..189953a2897 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -17,9 +17,8 @@ use crate::lsps5::msgs::{ SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod, }; use crate::message_queue::MessageQueue; -use crate::prelude::hash_map::Entry; use crate::prelude::*; -use crate::sync::{Arc, Mutex}; +use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; use crate::utils::time::TimeProvider; use bitcoin::secp256k1::PublicKey; @@ -117,7 +116,7 @@ where TP::Target: TimeProvider, { config: LSPS5ServiceConfig, - webhooks: Mutex>>, + per_peer_state: RwLock>, event_queue: Arc, pending_messages: Arc, time_provider: TP, @@ -140,7 +139,7 @@ where assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0"); Self { config, - webhooks: Mutex::new(new_hash_map()), + per_peer_state: RwLock::new(new_hash_map()), event_queue, pending_messages, time_provider, @@ -150,18 +149,26 @@ where } } - fn check_prune_stale_webhooks(&self) { + fn check_prune_stale_webhooks<'a>( + &self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap>, + ) { + let mut last_pruning = self.last_pruning.lock().unwrap(); let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let should_prune = { - let last_pruning = self.last_pruning.lock().unwrap(); - last_pruning.as_ref().map_or(true, |last_time| { - now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS - }) - }; + + let should_prune = last_pruning.as_ref().map_or(true, |last_time| { + now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS + }); if should_prune { - self.prune_stale_webhooks(); + outer_state_lock.retain(|client_id, peer_state| { + if self.client_has_open_channel(client_id) { + // Don't prune clients with open channels + return true; + } + !peer_state.prune_stale_webhooks(now) + }); + *last_pruning = Some(now); } } @@ -171,58 +178,56 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + self.check_prune_stale_webhooks(&mut outer_state_lock); - let mut webhooks = self.webhooks.lock().unwrap(); + let peer_state = + outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default); - let client_webhooks = webhooks.entry(counterparty_node_id).or_insert_with(new_hash_map); let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let num_webhooks = client_webhooks.len(); + let num_webhooks = peer_state.webhooks_len(); let mut no_change = false; - match client_webhooks.entry(params.app_name.clone()) { - Entry::Occupied(mut entry) => { - no_change = entry.get().url == params.webhook; - let last_used = if no_change { entry.get().last_used } else { now }; - let last_notification_sent = entry.get().last_notification_sent; - entry.insert(Webhook { - _app_name: params.app_name.clone(), - url: params.webhook.clone(), - _counterparty_node_id: counterparty_node_id, - last_used, - last_notification_sent, - }); - }, - Entry::Vacant(entry) => { - if num_webhooks >= self.config.max_webhooks_per_client as usize { - let error = LSPS5ProtocolError::TooManyWebhooks; - let msg = LSPS5Message::Response( - request_id, - LSPS5Response::SetWebhookError(error.clone().into()), - ) - .into(); - message_queue_notifier.enqueue(&counterparty_node_id, msg); - return Err(LightningError { - err: error.message().into(), - action: ErrorAction::IgnoreAndLog(Level::Info), - }); - } - entry.insert(Webhook { - _app_name: params.app_name.clone(), - url: params.webhook.clone(), - _counterparty_node_id: counterparty_node_id, - last_used: now, - last_notification_sent: None, + if let Some(webhook) = peer_state.webhook_mut(¶ms.app_name) { + no_change = webhook.url == params.webhook; + if !no_change { + // The URL was updated. + webhook.url = params.webhook.clone(); + webhook.last_used = now; + webhook.last_notification_sent = None; + } + } else { + if num_webhooks >= self.config.max_webhooks_per_client as usize { + let error = LSPS5ProtocolError::TooManyWebhooks; + let msg = LSPS5Message::Response( + request_id, + LSPS5Response::SetWebhookError(error.clone().into()), + ) + .into(); + message_queue_notifier.enqueue(&counterparty_node_id, msg); + return Err(LightningError { + err: error.message().into(), + action: ErrorAction::IgnoreAndLog(Level::Info), }); - }, + } + + let webhook = Webhook { + _app_name: params.app_name.clone(), + url: params.webhook.clone(), + _counterparty_node_id: counterparty_node_id, + last_used: now, + last_notification_sent: None, + }; + + peer_state.insert_webhook(params.app_name.clone(), webhook); } if !no_change { self.send_webhook_registered_notification( counterparty_node_id, - params.app_name, + params.app_name.clone(), params.webhook, ) .map_err(|e| { @@ -242,7 +247,7 @@ where let msg = LSPS5Message::Response( request_id, LSPS5Response::SetWebhook(SetWebhookResponse { - num_webhooks: client_webhooks.len() as u32, + num_webhooks: peer_state.webhooks_len() as u32, max_webhooks: self.config.max_webhooks_per_client, no_change, }), @@ -258,14 +263,11 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); - - let webhooks = self.webhooks.lock().unwrap(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + self.check_prune_stale_webhooks(&mut outer_state_lock); - let app_names = webhooks - .get(&counterparty_node_id) - .map(|client_webhooks| client_webhooks.keys().cloned().collect::>()) - .unwrap_or_else(Vec::new); + let app_names = + outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default(); let max_webhooks = self.config.max_webhooks_per_client; @@ -282,12 +284,11 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + self.check_prune_stale_webhooks(&mut outer_state_lock); - let mut webhooks = self.webhooks.lock().unwrap(); - - if let Some(client_webhooks) = webhooks.get_mut(&counterparty_node_id) { - if client_webhooks.remove(¶ms.app_name).is_some() { + if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) { + if peer_state.remove_webhook(¶ms.app_name) { let response = RemoveWebhookResponse {}; let msg = LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response)) @@ -408,11 +409,13 @@ where fn send_notifications_to_client_webhooks( &self, client_id: PublicKey, notification: WebhookNotification, ) -> Result<(), LSPS5ProtocolError> { - let mut webhooks = self.webhooks.lock().unwrap(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + self.check_prune_stale_webhooks(&mut outer_state_lock); - let client_webhooks = match webhooks.get_mut(&client_id) { - Some(webhooks) if !webhooks.is_empty() => webhooks, - _ => return Ok(()), + let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) { + peer_state + } else { + return Ok(()); }; let now = @@ -421,7 +424,7 @@ where // We must avoid sending multiple notifications of the same method // (other than lsps5.webhook_registered) close in time. if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered { - let rate_limit_applies = client_webhooks.iter().any(|(_, webhook)| { + let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| { webhook.last_notification_sent.as_ref().map_or(false, |last_sent| { now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME }) @@ -432,7 +435,7 @@ where } } - for (app_name, webhook) in client_webhooks.iter_mut() { + for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() { self.send_notification( client_id, app_name.clone(), @@ -490,26 +493,6 @@ where .map_err(|_| LSPS5ProtocolError::UnknownError) } - fn prune_stale_webhooks(&self) { - let now = - LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let mut webhooks = self.webhooks.lock().unwrap(); - - webhooks.retain(|client_id, client_webhooks| { - if !self.client_has_open_channel(client_id) { - client_webhooks.retain(|_, webhook| { - now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS - }); - !client_webhooks.is_empty() - } else { - true - } - }); - - let mut last_pruning = self.last_pruning.lock().unwrap(); - *last_pruning = Some(now); - } - fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { self.channel_manager .get_cm() @@ -519,20 +502,16 @@ where } pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) { - let mut webhooks = self.webhooks.lock().unwrap(); - if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) { - for webhook in client_webhooks.values_mut() { - webhook.last_notification_sent = None; - } + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { + peer_state.reset_notification_cooldown(); } } pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let mut webhooks = self.webhooks.lock().unwrap(); - if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) { - for webhook in client_webhooks.values_mut() { - webhook.last_notification_sent = None; - } + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { + peer_state.reset_notification_cooldown(); } } } @@ -578,3 +557,69 @@ where } } } + +#[derive(Debug, Default)] +struct PeerState { + webhooks: Vec<(LSPS5AppName, Webhook)>, +} + +impl PeerState { + fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> { + self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None }) + } + + fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> { + &self.webhooks + } + + fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> { + &mut self.webhooks + } + + fn webhooks_len(&self) -> usize { + self.webhooks.len() + } + + fn app_names(&self) -> Vec { + self.webhooks.iter().map(|(n, _)| n).cloned().collect() + } + + fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) { + for (n, h) in self.webhooks.iter_mut() { + if *n == name { + *h = hook; + return; + } + } + + self.webhooks.push((name, hook)); + } + + fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool { + let mut removed = false; + self.webhooks.retain(|(n, _)| { + if n != name { + true + } else { + removed = true; + false + } + }); + removed + } + + fn reset_notification_cooldown(&mut self) { + for (_, h) in self.webhooks.iter_mut() { + h.last_notification_sent = None; + } + } + + // Returns whether the entire state is empty and can be pruned. + fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool { + self.webhooks.retain(|(_, webhook)| { + now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS + }); + + self.webhooks.is_empty() + } +} diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 7b24826f57a..9f6e5201bcd 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -1164,3 +1164,65 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { _ => panic!("Expected SendWebhookNotification event after peer_connected"), } } + +#[test] +fn webhook_update_affects_future_notifications() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, _) = lsps5_test_setup(nodes, time_provider); + let LSPSNodes { service_node, client_node } = lsps_nodes; + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); + let service_handler = service_node.liquidity_manager.lsps5_service_handler().unwrap(); + + let app = "UpdateTestApp".to_string(); + let url_v1 = "https://example.org/v1".to_string(); + let url_v2 = "https://example.org/v2".to_string(); + + // register v1 + client_handler.set_webhook(service_node_id, app.clone(), url_v1).unwrap(); + let req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(req, client_node_id).unwrap(); + let _ = service_node.liquidity_manager.next_event().unwrap(); // initial webhook_registered + let resp = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(resp, service_node_id).unwrap(); + let _ = client_node.liquidity_manager.next_event().unwrap(); + + // update to v2 + client_handler.set_webhook(service_node_id, app, url_v2.clone()).unwrap(); + let upd_req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(upd_req, client_node_id).unwrap(); + let update_event = service_node.liquidity_manager.next_event().unwrap(); + match update_event { + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + url, .. + }) => { + assert_eq!(url.as_str(), url_v2); + }, + _ => panic!("Expected webhook_registered for update"), + } + let upd_resp = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(upd_resp, service_node_id).unwrap(); + let _ = client_node.liquidity_manager.next_event().unwrap(); + + // Advance past cooldown and send a notification again + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + service_handler.notify_payment_incoming(client_node_id).unwrap(); + let ev = service_node.liquidity_manager.next_event().unwrap(); + match ev { + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + url, + notification, + .. + }) => { + assert_eq!(notification.method, WebhookNotificationMethod::LSPS5PaymentIncoming); + assert_eq!(url.as_str(), url_v2, "Should target updated URL"); + }, + _ => panic!("Expected SendWebhookNotification after update"), + } +} From 9065f3114bd084136bae290f24a30ce45c19fd25 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 12 Aug 2025 12:38:46 +0200 Subject: [PATCH 6/7] Only prune on `peer_{dis}connected` Previously, we'd constantly check whether or not we can prune stale webhooks. While not wrong, it lead to a bunch of ~unnecessary operations, especially given that we only prune once a day currently. Here we move pruning to `peer_connected`/`peer_disconnected`, which is similar to what we do for LSPS2, and should still be more than enough. --- lightning-liquidity/src/lsps5/service.rs | 10 +++------- lightning-liquidity/tests/lsps5_integration_tests.rs | 12 ++++++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 189953a2897..72c3d83b3fe 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -179,7 +179,6 @@ where let mut message_queue_notifier = self.pending_messages.notifier(); let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); let peer_state = outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default); @@ -263,9 +262,7 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); - + let outer_state_lock = self.per_peer_state.read().unwrap(); let app_names = outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default(); @@ -285,7 +282,6 @@ where let mut message_queue_notifier = self.pending_messages.notifier(); let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) { if peer_state.remove_webhook(¶ms.app_name) { @@ -410,8 +406,6 @@ where &self, client_id: PublicKey, notification: WebhookNotification, ) -> Result<(), LSPS5ProtocolError> { let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); - let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) { peer_state } else { @@ -506,6 +500,7 @@ where if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { peer_state.reset_notification_cooldown(); } + self.check_prune_stale_webhooks(&mut outer_state_lock); } pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { @@ -513,6 +508,7 @@ where if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { peer_state.reset_notification_cooldown(); } + self.check_prune_stale_webhooks(&mut outer_state_lock); } } diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 9f6e5201bcd..e526d3eda5e 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -861,7 +861,15 @@ fn stale_webhooks() { MIN_WEBHOOK_RETENTION_DAYS.as_secs() + PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS.as_secs(), ); - // LIST calls prune before executing -> should be empty after advancing time + // LIST should be empty after advancing time and reconnection + service_node.liquidity_manager.peer_disconnected(client_node_id); + let init_msg = Init { + features: lightning_types::features::InitFeatures::empty(), + remote_network_address: None, + networks: None, + }; + service_node.liquidity_manager.peer_connected(client_node_id, &init_msg, false).unwrap(); + let _ = client_handler.list_webhooks(service_node_id); let list_req2 = get_lsps_message!(client_node, service_node_id); service_node.liquidity_manager.handle_custom_message(list_req2, client_node_id).unwrap(); @@ -1068,7 +1076,7 @@ fn test_notify_without_webhooks_does_nothing() { } #[test] -fn test_send_notifications_and_peer_connected_resets_cooldown() { +fn test_notifications_and_peer_connected_resets_cooldown() { let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); From f95ba6945a7872e011c7233636c589039518fb51 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 13 Aug 2025 14:07:29 +0200 Subject: [PATCH 7/7] Add missing license headers We add the license header to all files in `lightning-liquidity` where it was absent. --- lightning-liquidity/src/lsps1/msgs.rs | 9 +++++++++ lightning-liquidity/src/lsps2/client.rs | 3 ++- lightning-liquidity/src/lsps2/msgs.rs | 9 +++++++++ lightning-liquidity/src/lsps2/payment_queue.rs | 9 +++++++++ lightning-liquidity/src/lsps2/utils.rs | 7 +++++++ lightning-liquidity/src/manager.rs | 9 +++++++++ lightning-liquidity/src/message_queue.rs | 9 +++++++++ 7 files changed, 54 insertions(+), 1 deletion(-) diff --git a/lightning-liquidity/src/lsps1/msgs.rs b/lightning-liquidity/src/lsps1/msgs.rs index 9b9b94d7cd2..8402827a4a6 100644 --- a/lightning-liquidity/src/lsps1/msgs.rs +++ b/lightning-liquidity/src/lsps1/msgs.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Message, request, and other primitive types used to implement bLIP-51 / LSPS1. use alloc::string::String; diff --git a/lightning-liquidity/src/lsps2/client.rs b/lightning-liquidity/src/lsps2/client.rs index fa08093108b..7008d42e345 100644 --- a/lightning-liquidity/src/lsps2/client.rs +++ b/lightning-liquidity/src/lsps2/client.rs @@ -3,7 +3,8 @@ // // This file is licensed under the Apache License, Version 2.0 or the MIT license -// , at your option. You may not use this file except in accordance with one or both of these +// , at your option. +// You may not use this file except in accordance with one or both of these // licenses. //! Contains the main bLIP-52 / LSPS2 client object, [`LSPS2ClientHandler`]. diff --git a/lightning-liquidity/src/lsps2/msgs.rs b/lightning-liquidity/src/lsps2/msgs.rs index 8fb9536b6d4..2a01d6ee32f 100644 --- a/lightning-liquidity/src/lsps2/msgs.rs +++ b/lightning-liquidity/src/lsps2/msgs.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Message, request, and other primitive types used to implement bLIP-52 / LSPS2. use alloc::string::String; diff --git a/lightning-liquidity/src/lsps2/payment_queue.rs b/lightning-liquidity/src/lsps2/payment_queue.rs index aff9f5103bc..d6474dc97a0 100644 --- a/lightning-liquidity/src/lsps2/payment_queue.rs +++ b/lightning-liquidity/src/lsps2/payment_queue.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + use alloc::vec::Vec; use lightning::ln::channelmanager::InterceptId; diff --git a/lightning-liquidity/src/lsps2/utils.rs b/lightning-liquidity/src/lsps2/utils.rs index a2c4d65936d..e4620043424 100644 --- a/lightning-liquidity/src/lsps2/utils.rs +++ b/lightning-liquidity/src/lsps2/utils.rs @@ -1,3 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. +// You may not use this file except in accordance with one or both of these licenses. + //! Utilities for implementing the bLIP-52 / LSPS2 standard. use crate::lsps2::msgs::LSPS2OpeningFeeParams; diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 835cc9d30c1..4cf97786d02 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + use alloc::string::ToString; use alloc::vec::Vec; diff --git a/lightning-liquidity/src/message_queue.rs b/lightning-liquidity/src/message_queue.rs index 2e99d545438..d097573cf04 100644 --- a/lightning-liquidity/src/message_queue.rs +++ b/lightning-liquidity/src/message_queue.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Holds types and traits used to implement message queues for [`LSPSMessage`]s. use alloc::collections::VecDeque;