From b0d0f3d749485a89ec65b5994cf3df541759d4f3 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 31 Oct 2022 16:07:41 -0700 Subject: [PATCH 1/7] Apply network graph updates through NetworkUpdate's instead of Event's --- lightning-background-processor/src/lib.rs | 12 +- lightning/src/routing/gossip.rs | 142 +++++++--------------- 2 files changed, 52 insertions(+), 102 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index a481ad3bd92..a8102c370bb 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -192,6 +192,16 @@ where } } +fn handle_network_graph_update( + network_graph: &NetworkGraph, event: &Event +) where L::Target: Logger { + if let Event::PaymentPathFailed { ref network_update, .. } = event { + if let Some(network_update) = network_update { + network_graph.handle_network_update(&network_update); + } + } +} + /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. struct DecoratingEventHandler< 'a, @@ -219,7 +229,7 @@ impl< where A::Target: chain::Access, L::Target: Logger { fn handle_event(&self, event: &Event) { if let Some(network_graph) = self.gossip_sync.network_graph() { - network_graph.handle_event(event); + handle_network_graph_update(network_graph, &event) } self.event_handler.handle_event(event); } diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 00846a7ba79..947df9edae0 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -29,7 +29,7 @@ use crate::ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds use crate::ln::msgs; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, MaybeReadable}; use crate::util::logger::{Logger, Level}; -use crate::util::events::{Event, EventHandler, MessageSendEvent, MessageSendEventsProvider}; +use crate::util::events::{MessageSendEvent, MessageSendEventsProvider}; use crate::util::scid_utils::{block_from_scid, scid_from_parts, MAX_SCID_BLOCK}; use crate::util::string::PrintableString; @@ -213,9 +213,6 @@ impl_writeable_tlv_based_enum_upgradable!(NetworkUpdate, /// This network graph is then used for routing payments. /// Provides interface to help with initial routing sync by /// serving historical announcements. -/// -/// Serves as an [`EventHandler`] for applying updates from [`Event::PaymentPathFailed`] to the -/// [`NetworkGraph`]. pub struct P2PGossipSync>, C: Deref, L: Deref> where C::Target: chain::Access, L::Target: Logger { @@ -275,32 +272,31 @@ where C::Target: chain::Access, L::Target: Logger } } -impl EventHandler for NetworkGraph where L::Target: Logger { - fn handle_event(&self, event: &Event) { - if let Event::PaymentPathFailed { network_update, .. } = event { - if let Some(network_update) = network_update { - match *network_update { - NetworkUpdate::ChannelUpdateMessage { ref msg } => { - let short_channel_id = msg.contents.short_channel_id; - let is_enabled = msg.contents.flags & (1 << 1) != (1 << 1); - let status = if is_enabled { "enabled" } else { "disabled" }; - log_debug!(self.logger, "Updating channel with channel_update from a payment failure. Channel {} is {}.", short_channel_id, status); - let _ = self.update_channel(msg); - }, - NetworkUpdate::ChannelFailure { short_channel_id, is_permanent } => { - let action = if is_permanent { "Removing" } else { "Disabling" }; - log_debug!(self.logger, "{} channel graph entry for {} due to a payment failure.", action, short_channel_id); - self.channel_failed(short_channel_id, is_permanent); - }, - NetworkUpdate::NodeFailure { ref node_id, is_permanent } => { - if is_permanent { - log_debug!(self.logger, - "Removed node graph entry for {} due to a payment failure.", log_pubkey!(node_id)); - self.node_failed_permanent(node_id); - }; - }, - } - } +impl NetworkGraph where L::Target: Logger { + /// Handles any network updates originating from [`Event`]s. + /// + /// [`Event`]: crate::util::events::Event + pub fn handle_network_update(&self, network_update: &NetworkUpdate) { + match *network_update { + NetworkUpdate::ChannelUpdateMessage { ref msg } => { + let short_channel_id = msg.contents.short_channel_id; + let is_enabled = msg.contents.flags & (1 << 1) != (1 << 1); + let status = if is_enabled { "enabled" } else { "disabled" }; + log_debug!(self.logger, "Updating channel with channel_update from a payment failure. Channel {} is {}.", short_channel_id, status); + let _ = self.update_channel(msg); + }, + NetworkUpdate::ChannelFailure { short_channel_id, is_permanent } => { + let action = if is_permanent { "Removing" } else { "Disabling" }; + log_debug!(self.logger, "{} channel graph entry for {} due to a payment failure.", action, short_channel_id); + self.channel_failed(short_channel_id, is_permanent); + }, + NetworkUpdate::NodeFailure { ref node_id, is_permanent } => { + if is_permanent { + log_debug!(self.logger, + "Removed node graph entry for {} due to a payment failure.", log_pubkey!(node_id)); + self.node_failed_permanent(node_id); + }; + }, } } } @@ -1931,7 +1927,6 @@ mod tests { use crate::chain; use crate::ln::channelmanager; use crate::ln::chan_utils::make_funding_redeemscript; - use crate::ln::PaymentHash; use crate::ln::features::InitFeatures; use crate::routing::gossip::{P2PGossipSync, NetworkGraph, NetworkUpdate, NodeAlias, MAX_EXCESS_BYTES_FOR_RELAY, NodeId, RoutingFees, ChannelUpdateInfo, ChannelInfo, NodeAnnouncementInfo, NodeInfo}; use crate::ln::msgs::{RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, @@ -1939,7 +1934,7 @@ mod tests { ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; use crate::util::test_utils; use crate::util::ser::{ReadableArgs, Writeable}; - use crate::util::events::{Event, EventHandler, MessageSendEvent, MessageSendEventsProvider}; + use crate::util::events::{MessageSendEvent, MessageSendEventsProvider}; use crate::util::scid_utils::scid_from_parts; use crate::routing::gossip::REMOVED_ENTRIES_TRACKING_AGE_LIMIT_SECS; @@ -2383,19 +2378,8 @@ mod tests { let valid_channel_update = get_signed_channel_update(|_| {}, node_1_privkey, &secp_ctx); assert!(network_graph.read_only().channels().get(&short_channel_id).unwrap().one_to_two.is_none()); - network_graph.handle_event(&Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([0; 32]), - payment_failed_permanently: false, - all_paths_failed: true, - path: vec![], - network_update: Some(NetworkUpdate::ChannelUpdateMessage { - msg: valid_channel_update, - }), - short_channel_id: None, - retry: None, - error_code: None, - error_data: None, + network_graph.handle_network_update(&NetworkUpdate::ChannelUpdateMessage { + msg: valid_channel_update, }); assert!(network_graph.read_only().channels().get(&short_channel_id).unwrap().one_to_two.is_some()); @@ -2410,20 +2394,9 @@ mod tests { } }; - network_graph.handle_event(&Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([0; 32]), - payment_failed_permanently: false, - all_paths_failed: true, - path: vec![], - network_update: Some(NetworkUpdate::ChannelFailure { - short_channel_id, - is_permanent: false, - }), - short_channel_id: None, - retry: None, - error_code: None, - error_data: None, + network_graph.handle_network_update(&NetworkUpdate::ChannelFailure { + short_channel_id, + is_permanent: false, }); match network_graph.read_only().channels().get(&short_channel_id) { @@ -2435,20 +2408,9 @@ mod tests { } // Permanent closing deletes a channel - network_graph.handle_event(&Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([0; 32]), - payment_failed_permanently: false, - all_paths_failed: true, - path: vec![], - network_update: Some(NetworkUpdate::ChannelFailure { - short_channel_id, - is_permanent: true, - }), - short_channel_id: None, - retry: None, - error_code: None, - error_data: None, + network_graph.handle_network_update(&NetworkUpdate::ChannelFailure { + short_channel_id, + is_permanent: true, }); assert_eq!(network_graph.read_only().channels().len(), 0); @@ -2467,40 +2429,18 @@ mod tests { assert!(network_graph.read_only().channels().get(&short_channel_id).is_some()); // Non-permanent node failure does not delete any nodes or channels - network_graph.handle_event(&Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([0; 32]), - payment_failed_permanently: false, - all_paths_failed: true, - path: vec![], - network_update: Some(NetworkUpdate::NodeFailure { - node_id: node_2_id, - is_permanent: false, - }), - short_channel_id: None, - retry: None, - error_code: None, - error_data: None, + network_graph.handle_network_update(&NetworkUpdate::NodeFailure { + node_id: node_2_id, + is_permanent: false, }); assert!(network_graph.read_only().channels().get(&short_channel_id).is_some()); assert!(network_graph.read_only().nodes().get(&NodeId::from_pubkey(&node_2_id)).is_some()); // Permanent node failure deletes node and its channels - network_graph.handle_event(&Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([0; 32]), - payment_failed_permanently: false, - all_paths_failed: true, - path: vec![], - network_update: Some(NetworkUpdate::NodeFailure { - node_id: node_2_id, - is_permanent: true, - }), - short_channel_id: None, - retry: None, - error_code: None, - error_data: None, + network_graph.handle_network_update(&NetworkUpdate::NodeFailure { + node_id: node_2_id, + is_permanent: true, }); assert_eq!(network_graph.read_only().nodes().len(), 0); From f0059f5083f900bfab319fd87ca0f4e6779c0a91 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 7 Nov 2022 18:09:16 -0800 Subject: [PATCH 2/7] Use BaseEventHandler to expose async event handling on InvoicePayer We introduce a new sealed trait BaseEventHandler that has a blanket implementation for any T. Since the trait cannot be implemented outside of the crate, this allow us to expose specific implementations of InvoicePayer that allow for synchronous and asynchronous event handling. --- lightning-invoice/src/payment.rs | 113 ++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 33 deletions(-) diff --git a/lightning-invoice/src/payment.rs b/lightning-invoice/src/payment.rs index 96b20958fb7..08000b4dcf8 100644 --- a/lightning-invoice/src/payment.rs +++ b/lightning-invoice/src/payment.rs @@ -157,6 +157,7 @@ use secp256k1::PublicKey; use core::fmt; use core::fmt::{Debug, Display, Formatter}; +use core::future::Future; use core::ops::Deref; use core::time::Duration; #[cfg(feature = "std")] @@ -176,9 +177,21 @@ use crate::time_utils; #[cfg(feature = "no-std")] type ConfiguredTime = time_utils::Eternity; +/// Sealed trait with a blanket implementation to allow both sync and async implementations of event +/// handling to exist within the InvoicePayer. +mod sealed { + pub trait BaseEventHandler {} + impl BaseEventHandler for T {} +} + /// (C-not exported) generally all users should use the [`InvoicePayer`] type alias. -pub struct InvoicePayerUsingTime -where +pub struct InvoicePayerUsingTime< + P: Deref, + R: ScoringRouter, + L: Deref, + E: sealed::BaseEventHandler, + T: Time +> where P::Target: Payer, L::Target: Logger, { @@ -342,7 +355,8 @@ pub enum PaymentError { Sending(PaymentSendFailure), } -impl InvoicePayerUsingTime +impl + InvoicePayerUsingTime where P::Target: Payer, L::Target: Logger, @@ -744,12 +758,15 @@ fn has_expired(route_params: &RouteParameters) -> bool { } else { false } } -impl EventHandler for InvoicePayerUsingTime +impl + InvoicePayerUsingTime where P::Target: Payer, L::Target: Logger, { - fn handle_event(&self, event: &Event) { + /// Returns a bool indicating whether the processed event should be forwarded to a user-provided + /// event handler. + fn handle_event_internal(&self, event: &Event) -> bool { match event { Event::PaymentPathFailed { payment_hash, path, .. } | Event::PaymentPathSuccessful { path, payment_hash: Some(payment_hash), .. } @@ -779,7 +796,7 @@ where self.payer.abandon_payment(payment_id.unwrap()); } else if self.retry_payment(payment_id.unwrap(), *payment_hash, retry.as_ref().unwrap()).is_ok() { // We retried at least somewhat, don't provide the PaymentPathFailed event to the user. - return; + return false; } else { self.payer.abandon_payment(payment_id.unwrap()); } @@ -814,7 +831,37 @@ where } // Delegate to the decorated event handler unless the payment is retried. - self.event_handler.handle_event(event) + true + } +} + +impl + EventHandler for InvoicePayerUsingTime +where + P::Target: Payer, + L::Target: Logger, +{ + fn handle_event(&self, event: &Event) { + let should_forward = self.handle_event_internal(&event); + if should_forward { + self.event_handler.handle_event(&event) + } + } +} + +impl F> + InvoicePayerUsingTime +where + P::Target: Payer, + L::Target: Logger, +{ + /// Intercepts events required by the [`InvoicePayer`] and forwards them to the underlying event + /// handler, if necessary, to handle them asynchronously. + pub async fn handle_event_async(&self, event: Event) { + let should_forward = self.handle_event_internal(&event); + if should_forward { + (self.event_handler)(event).await; + } } } @@ -913,7 +960,7 @@ mod tests { #[test] fn pays_invoice_on_first_attempt() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -939,7 +986,7 @@ mod tests { #[test] fn pays_invoice_on_retry() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -980,7 +1027,7 @@ mod tests { #[test] fn pays_invoice_on_partial_failure() { - let event_handler = |_: &_| { panic!() }; + let event_handler = |_: &Event| { panic!() }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1004,7 +1051,7 @@ mod tests { #[test] fn retries_payment_path_for_unknown_payment() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1048,7 +1095,7 @@ mod tests { #[test] fn fails_paying_invoice_after_max_retry_counts() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1105,7 +1152,7 @@ mod tests { #[test] fn fails_paying_invoice_after_max_retry_timeout() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1149,7 +1196,7 @@ mod tests { #[test] fn fails_paying_invoice_with_missing_retry_params() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1184,7 +1231,7 @@ mod tests { #[test] fn fails_paying_invoice_after_expiration() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payer = TestPayer::new(); let router = TestRouter::new(TestScorer::new()); @@ -1204,7 +1251,7 @@ mod tests { #[test] fn fails_retrying_invoice_after_expiration() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1241,7 +1288,7 @@ mod tests { #[test] fn fails_paying_invoice_after_retry_error() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1277,7 +1324,7 @@ mod tests { #[test] fn fails_paying_invoice_after_rejected_by_payee() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1310,7 +1357,7 @@ mod tests { #[test] fn fails_repaying_invoice_with_pending_payment() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1360,7 +1407,7 @@ mod tests { let router = FailingRouter {}; let logger = TestLogger::new(); let invoice_payer = - InvoicePayer::new(&payer, router, &logger, |_: &_| {}, Retry::Attempts(0)); + InvoicePayer::new(&payer, router, &logger, |_: &Event| {}, Retry::Attempts(0)); let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1383,7 +1430,7 @@ mod tests { let router = TestRouter::new(TestScorer::new()); let logger = TestLogger::new(); let invoice_payer = - InvoicePayer::new(&payer, router, &logger, |_: &_| {}, Retry::Attempts(0)); + InvoicePayer::new(&payer, router, &logger, |_: &Event| {}, Retry::Attempts(0)); match invoice_payer.pay_invoice(&invoice) { Err(PaymentError::Sending(_)) => {}, @@ -1395,7 +1442,7 @@ mod tests { #[test] fn pays_zero_value_invoice_using_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = zero_value_invoice(payment_preimage); @@ -1422,7 +1469,7 @@ mod tests { #[test] fn fails_paying_zero_value_invoice_with_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payer = TestPayer::new(); let router = TestRouter::new(TestScorer::new()); @@ -1444,7 +1491,7 @@ mod tests { #[test] fn pays_pubkey_with_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let pubkey = pubkey(); let payment_preimage = PaymentPreimage([1; 32]); @@ -1494,7 +1541,7 @@ mod tests { #[test] fn scores_failed_channel() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1532,7 +1579,7 @@ mod tests { #[test] fn scores_successful_channels() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1564,7 +1611,7 @@ mod tests { #[test] fn generates_correct_inflight_map_data() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1610,7 +1657,7 @@ mod tests { fn considers_inflight_htlcs_between_invoice_payments_when_path_succeeds() { // First, let's just send a payment through, but only make sure one of the path completes let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let payment_invoice = invoice(payment_preimage); @@ -1661,7 +1708,7 @@ mod tests { fn considers_inflight_htlcs_between_retries() { // First, let's just send a payment through, but only make sure one of the path completes let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let payment_invoice = invoice(payment_preimage); @@ -1732,7 +1779,7 @@ mod tests { #[test] fn accounts_for_some_inflight_htlcs_sent_during_partial_failure() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice_to_pay = invoice(payment_preimage); @@ -1763,7 +1810,7 @@ mod tests { #[test] fn accounts_for_all_inflight_htlcs_sent_during_partial_failure() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &_| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice_to_pay = invoice(payment_preimage); @@ -2260,7 +2307,7 @@ mod tests { route.paths[1][0].fee_msat = 50_000_000; router.expect_find_route(Ok(route.clone())); - let event_handler = |_: &_| { panic!(); }; + let event_handler = |_: &Event| { panic!(); }; let invoice_payer = InvoicePayer::new(nodes[0].node, router, nodes[0].logger, event_handler, Retry::Attempts(1)); assert!(invoice_payer.pay_invoice(&create_invoice_from_channelmanager_and_duration_since_epoch( @@ -2305,7 +2352,7 @@ mod tests { route.paths[1][0].fee_msat = 50_000_001; router.expect_find_route(Ok(route.clone())); - let event_handler = |_: &_| { panic!(); }; + let event_handler = |_: &Event| { panic!(); }; let invoice_payer = InvoicePayer::new(nodes[0].node, router, nodes[0].logger, event_handler, Retry::Attempts(1)); assert!(invoice_payer.pay_invoice(&create_invoice_from_channelmanager_and_duration_since_epoch( From 05cb46723489a91cf14e6e1ada27589a30bd9040 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 31 Oct 2022 10:36:12 -0700 Subject: [PATCH 3/7] Consume events by value in EventHandler's handle_event --- lightning-background-processor/src/lib.rs | 26 ++--- lightning-invoice/src/payment.rs | 120 +++++++++++----------- lightning-net-tokio/src/lib.rs | 4 +- lightning/src/chain/chainmonitor.rs | 10 +- lightning/src/ln/channelmanager.rs | 8 +- lightning/src/util/events.rs | 8 +- 6 files changed, 88 insertions(+), 88 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index a8102c370bb..b394a2311c3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -227,7 +227,7 @@ impl< L: Deref, > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L> where A::Target: chain::Access, L::Target: Logger { - fn handle_event(&self, event: &Event) { + fn handle_event(&self, event: Event) { if let Some(network_graph) = self.gossip_sync.network_graph() { handle_network_graph_update(network_graph, &event) } @@ -779,7 +779,7 @@ mod tests { begin_open_channel!($node_a, $node_b, $channel_value); let events = $node_a.node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); - let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value); + let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value); end_open_channel!($node_a, $node_b, temporary_channel_id, tx); tx }} @@ -796,7 +796,7 @@ mod tests { macro_rules! handle_funding_generation_ready { ($event: expr, $channel_value: expr) => {{ match $event { - &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => { + Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); @@ -857,7 +857,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); macro_rules! check_persisted_data { @@ -919,7 +919,7 @@ mod tests { let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); @@ -942,7 +942,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), @@ -959,7 +959,7 @@ mod tests { let nodes = create_nodes(2, "test_persist_network_graph_error".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { @@ -977,7 +977,7 @@ mod tests { let nodes = create_nodes(2, "test_persist_scorer_error".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { @@ -998,7 +998,7 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| match event { + let event_handler = move |event: Event| match event { Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(), Event::ChannelReady { .. } => {}, _ => panic!("Unexpected event: {:?}", event), @@ -1027,7 +1027,7 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| match event { + let event_handler = move |event: Event| match event { Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(), Event::ChannelReady { .. } => {}, Event::ChannelClosed { .. } => {}, @@ -1057,7 +1057,7 @@ mod tests { let nodes = create_nodes(2, "test_scorer_persistence".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { @@ -1085,7 +1085,7 @@ mod tests { assert!(original_graph_description.contains("42: features: 0000, node_one:")); assert_eq!(network_graph.read_only().channels().len(), 1); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { @@ -1138,7 +1138,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)); - let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2))); + let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2))); let event_handler = Arc::clone(&invoice_payer); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); assert!(bg_processor.stop().is_ok()); diff --git a/lightning-invoice/src/payment.rs b/lightning-invoice/src/payment.rs index 08000b4dcf8..cbd760c37d0 100644 --- a/lightning-invoice/src/payment.rs +++ b/lightning-invoice/src/payment.rs @@ -105,7 +105,7 @@ //! # } //! # //! # fn main() { -//! let event_handler = |event: &Event| { +//! let event_handler = |event: Event| { //! match event { //! Event::PaymentPathFailed { .. } => println!("payment failed after retries"), //! Event::PaymentSent { .. } => println!("payment successful"), @@ -841,10 +841,10 @@ where P::Target: Payer, L::Target: Logger, { - fn handle_event(&self, event: &Event) { + fn handle_event(&self, event: Event) { let should_forward = self.handle_event_internal(&event); if should_forward { - self.event_handler.handle_event(&event) + self.event_handler.handle_event(event) } } } @@ -960,7 +960,7 @@ mod tests { #[test] fn pays_invoice_on_first_attempt() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -976,7 +976,7 @@ mod tests { let payment_id = Some(invoice_payer.pay_invoice(&invoice).unwrap()); assert_eq!(*payer.attempts.borrow(), 1); - invoice_payer.handle_event(&Event::PaymentSent { + invoice_payer.handle_event(Event::PaymentSent { payment_id, payment_preimage, payment_hash, fee_paid_msat: None }); assert_eq!(*event_handled.borrow(), true); @@ -986,7 +986,7 @@ mod tests { #[test] fn pays_invoice_on_retry() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1014,11 +1014,11 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 2); - invoice_payer.handle_event(&Event::PaymentSent { + invoice_payer.handle_event(Event::PaymentSent { payment_id, payment_preimage, payment_hash, fee_paid_msat: None }); assert_eq!(*event_handled.borrow(), true); @@ -1027,7 +1027,7 @@ mod tests { #[test] fn pays_invoice_on_partial_failure() { - let event_handler = |_: &Event| { panic!() }; + let event_handler = |_: Event| { panic!() }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1051,7 +1051,7 @@ mod tests { #[test] fn retries_payment_path_for_unknown_payment() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1077,15 +1077,15 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 1); - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 2); - invoice_payer.handle_event(&Event::PaymentSent { + invoice_payer.handle_event(Event::PaymentSent { payment_id, payment_preimage, payment_hash, fee_paid_msat: None }); assert_eq!(*event_handled.borrow(), true); @@ -1095,7 +1095,7 @@ mod tests { #[test] fn fails_paying_invoice_after_max_retry_counts() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1123,7 +1123,7 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 2); @@ -1139,11 +1139,11 @@ mod tests { final_value_msat: final_value_msat / 2, ..TestRouter::retry_for_invoice(&invoice) }), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 3); - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 3); } @@ -1152,7 +1152,7 @@ mod tests { #[test] fn fails_paying_invoice_after_max_retry_timeout() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1182,13 +1182,13 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 2); SinceEpoch::advance(Duration::from_secs(121)); - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event.clone()); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 2); } @@ -1196,7 +1196,7 @@ mod tests { #[test] fn fails_paying_invoice_with_missing_retry_params() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1221,7 +1221,7 @@ mod tests { short_channel_id: None, retry: None, }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 1); } @@ -1231,7 +1231,7 @@ mod tests { #[test] fn fails_paying_invoice_after_expiration() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payer = TestPayer::new(); let router = TestRouter::new(TestScorer::new()); @@ -1251,7 +1251,7 @@ mod tests { #[test] fn fails_retrying_invoice_after_expiration() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1280,7 +1280,7 @@ mod tests { short_channel_id: None, retry: Some(retry_data), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 1); } @@ -1288,7 +1288,7 @@ mod tests { #[test] fn fails_paying_invoice_after_retry_error() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1316,7 +1316,7 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 2); } @@ -1324,7 +1324,7 @@ mod tests { #[test] fn fails_paying_invoice_after_rejected_by_payee() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1349,7 +1349,7 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), true); assert_eq!(*payer.attempts.borrow(), 1); } @@ -1357,7 +1357,7 @@ mod tests { #[test] fn fails_repaying_invoice_with_pending_payment() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1397,7 +1397,7 @@ mod tests { short_channel_id: None, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), true); } @@ -1407,7 +1407,7 @@ mod tests { let router = FailingRouter {}; let logger = TestLogger::new(); let invoice_payer = - InvoicePayer::new(&payer, router, &logger, |_: &Event| {}, Retry::Attempts(0)); + InvoicePayer::new(&payer, router, &logger, |_: Event| {}, Retry::Attempts(0)); let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1430,7 +1430,7 @@ mod tests { let router = TestRouter::new(TestScorer::new()); let logger = TestLogger::new(); let invoice_payer = - InvoicePayer::new(&payer, router, &logger, |_: &Event| {}, Retry::Attempts(0)); + InvoicePayer::new(&payer, router, &logger, |_: Event| {}, Retry::Attempts(0)); match invoice_payer.pay_invoice(&invoice) { Err(PaymentError::Sending(_)) => {}, @@ -1442,7 +1442,7 @@ mod tests { #[test] fn pays_zero_value_invoice_using_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = zero_value_invoice(payment_preimage); @@ -1459,7 +1459,7 @@ mod tests { Some(invoice_payer.pay_zero_value_invoice(&invoice, final_value_msat).unwrap()); assert_eq!(*payer.attempts.borrow(), 1); - invoice_payer.handle_event(&Event::PaymentSent { + invoice_payer.handle_event(Event::PaymentSent { payment_id, payment_preimage, payment_hash, fee_paid_msat: None }); assert_eq!(*event_handled.borrow(), true); @@ -1469,7 +1469,7 @@ mod tests { #[test] fn fails_paying_zero_value_invoice_with_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payer = TestPayer::new(); let router = TestRouter::new(TestScorer::new()); @@ -1491,7 +1491,7 @@ mod tests { #[test] fn pays_pubkey_with_amount() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let pubkey = pubkey(); let payment_preimage = PaymentPreimage([1; 32]); @@ -1527,11 +1527,11 @@ mod tests { short_channel_id: None, retry: Some(retry), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); assert_eq!(*event_handled.borrow(), false); assert_eq!(*payer.attempts.borrow(), 2); - invoice_payer.handle_event(&Event::PaymentSent { + invoice_payer.handle_event(Event::PaymentSent { payment_id, payment_preimage, payment_hash, fee_paid_msat: None }); assert_eq!(*event_handled.borrow(), true); @@ -1541,7 +1541,7 @@ mod tests { #[test] fn scores_failed_channel() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1573,13 +1573,13 @@ mod tests { short_channel_id, retry: Some(TestRouter::retry_for_invoice(&invoice)), }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); } #[test] fn scores_successful_channels() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1601,17 +1601,17 @@ mod tests { let event = Event::PaymentPathSuccessful { payment_id, payment_hash, path: route.paths[0].clone() }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); let event = Event::PaymentPathSuccessful { payment_id, payment_hash, path: route.paths[1].clone() }; - invoice_payer.handle_event(&event); + invoice_payer.handle_event(event); } #[test] fn generates_correct_inflight_map_data() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice = invoice(payment_preimage); @@ -1638,7 +1638,7 @@ mod tests { assert_eq!(inflight_map.0.get(&(3, false)).unwrap().clone(), 74); assert_eq!(inflight_map.0.get(&(4, false)).unwrap().clone(), 64); - invoice_payer.handle_event(&Event::PaymentPathSuccessful { + invoice_payer.handle_event(Event::PaymentPathSuccessful { payment_id, payment_hash, path: route.paths[0].clone() }); @@ -1657,7 +1657,7 @@ mod tests { fn considers_inflight_htlcs_between_invoice_payments_when_path_succeeds() { // First, let's just send a payment through, but only make sure one of the path completes let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let payment_invoice = invoice(payment_preimage); @@ -1691,7 +1691,7 @@ mod tests { // Succeed 1st path, leave 2nd path inflight let payment_id = invoice_payer.pay_invoice(&payment_invoice).unwrap(); - invoice_payer.handle_event(&Event::PaymentPathSuccessful { + invoice_payer.handle_event(Event::PaymentPathSuccessful { payment_id, payment_hash, path: route.paths[0].clone() }); @@ -1708,7 +1708,7 @@ mod tests { fn considers_inflight_htlcs_between_retries() { // First, let's just send a payment through, but only make sure one of the path completes let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let payment_invoice = invoice(payment_preimage); @@ -1749,7 +1749,7 @@ mod tests { // Fail 1st path, leave 2nd path inflight let payment_id = Some(invoice_payer.pay_invoice(&payment_invoice).unwrap()); - invoice_payer.handle_event(&Event::PaymentPathFailed { + invoice_payer.handle_event(Event::PaymentPathFailed { payment_id, payment_hash, network_update: None, @@ -1761,7 +1761,7 @@ mod tests { }); // Fails again the 1st path of our retry - invoice_payer.handle_event(&Event::PaymentPathFailed { + invoice_payer.handle_event(Event::PaymentPathFailed { payment_id, payment_hash, network_update: None, @@ -1779,7 +1779,7 @@ mod tests { #[test] fn accounts_for_some_inflight_htlcs_sent_during_partial_failure() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice_to_pay = invoice(payment_preimage); @@ -1810,7 +1810,7 @@ mod tests { #[test] fn accounts_for_all_inflight_htlcs_sent_during_partial_failure() { let event_handled = core::cell::RefCell::new(false); - let event_handler = |_: &Event| { *event_handled.borrow_mut() = true; }; + let event_handler = |_: Event| { *event_handled.borrow_mut() = true; }; let payment_preimage = PaymentPreimage([1; 32]); let invoice_to_pay = invoice(payment_preimage); @@ -2307,7 +2307,7 @@ mod tests { route.paths[1][0].fee_msat = 50_000_000; router.expect_find_route(Ok(route.clone())); - let event_handler = |_: &Event| { panic!(); }; + let event_handler = |_: Event| { panic!(); }; let invoice_payer = InvoicePayer::new(nodes[0].node, router, nodes[0].logger, event_handler, Retry::Attempts(1)); assert!(invoice_payer.pay_invoice(&create_invoice_from_channelmanager_and_duration_since_epoch( @@ -2352,7 +2352,7 @@ mod tests { route.paths[1][0].fee_msat = 50_000_001; router.expect_find_route(Ok(route.clone())); - let event_handler = |_: &Event| { panic!(); }; + let event_handler = |_: Event| { panic!(); }; let invoice_payer = InvoicePayer::new(nodes[0].node, router, nodes[0].logger, event_handler, Retry::Attempts(1)); assert!(invoice_payer.pay_invoice(&create_invoice_from_channelmanager_and_duration_since_epoch( @@ -2429,8 +2429,8 @@ mod tests { route.paths.remove(1); router.expect_find_route(Ok(route.clone())); - let expected_events: RefCell> = RefCell::new(VecDeque::new()); - let event_handler = |event: &Event| { + let expected_events: RefCell> = RefCell::new(VecDeque::new()); + let event_handler = |event: Event| { let event_checker = expected_events.borrow_mut().pop_front().unwrap(); event_checker(event); }; @@ -2505,7 +2505,7 @@ mod tests { // `PaymentPathFailed` being passed up to the user (us, in this case). Previously, we'd // treated this as "HTLC complete" and dropped the retry counter, causing us to retry again // if the final HTLC failed. - expected_events.borrow_mut().push_back(&|ev: &Event| { + expected_events.borrow_mut().push_back(&|ev: Event| { if let Event::PaymentPathFailed { payment_failed_permanently, all_paths_failed, .. } = ev { assert!(!payment_failed_permanently); assert!(all_paths_failed); @@ -2523,13 +2523,13 @@ mod tests { nodes[0].node.handle_update_fail_htlc(&nodes[1].node.get_our_node_id(), &bs_fail_update.update_fail_htlcs[0]); commitment_signed_dance!(nodes[0], nodes[1], &bs_fail_update.commitment_signed, false, true); - expected_events.borrow_mut().push_back(&|ev: &Event| { + expected_events.borrow_mut().push_back(&|ev: Event| { if let Event::PaymentPathFailed { payment_failed_permanently, all_paths_failed, .. } = ev { assert!(!payment_failed_permanently); assert!(all_paths_failed); } else { panic!("Unexpected event"); } }); - expected_events.borrow_mut().push_back(&|ev: &Event| { + expected_events.borrow_mut().push_back(&|ev: Event| { if let Event::PaymentFailed { .. } = ev { } else { panic!("Unexpected event"); } }); diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 3fbe6aab949..7a7cc4bb099 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -43,7 +43,7 @@ //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { //! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; //! loop { -//! let event_handler = |event: &Event| { +//! let event_handler = |event: Event| { //! // Handle the event! //! }; //! channel_manager.await_persistable_update(); @@ -56,7 +56,7 @@ //! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, socket: TcpStream) { //! lightning_net_tokio::setup_inbound(peer_manager, socket); //! loop { -//! let event_handler = |event: &Event| { +//! let event_handler = |event: Event| { //! // Handle the event! //! }; //! channel_manager.await_persistable_update(); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index fc4caef8afd..c70eed1a1c8 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -492,7 +492,7 @@ where C::Target: chain::Filter, pub fn get_and_clear_pending_events(&self) -> Vec { use crate::util::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } @@ -736,8 +736,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } #[cfg(anchors)] @@ -759,8 +759,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1458d0c50df..7f8c82b41bd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5714,7 +5714,7 @@ impl ChannelManager Vec { let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } @@ -5791,13 +5791,13 @@ where result = NotifyOption::DoPersist; } - let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } result diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index a9687d28802..c369a376a87 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1438,17 +1438,17 @@ pub trait EventHandler { /// Handles the given [`Event`]. /// /// See [`EventsProvider`] for details that must be considered when implementing this method. - fn handle_event(&self, event: &Event); + fn handle_event(&self, event: Event); } -impl EventHandler for F where F: Fn(&Event) { - fn handle_event(&self, event: &Event) { +impl EventHandler for F where F: Fn(Event) { + fn handle_event(&self, event: Event) { self(event) } } impl EventHandler for Arc { - fn handle_event(&self, event: &Event) { + fn handle_event(&self, event: Event) { self.deref().handle_event(event) } } From 55b714c01d059b637bd9d913dd4f5d8c0dc24c3e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 2 Nov 2022 12:39:07 -0700 Subject: [PATCH 4/7] Implement async versions of process_pending_events --- lightning/src/chain/chainmonitor.rs | 20 ++++++++++++++++- lightning/src/ln/channelmanager.rs | 35 ++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index c70eed1a1c8..17fe69182ee 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::events::{Event, EventHandler}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -496,6 +496,24 @@ where C::Target: chain::Filter, self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7f8c82b41bd..06ef2aa0dc3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA use crate::ln::wire::Encode; use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient}; use crate::util::config::{UserConfig, ChannelConfig}; -use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use crate::util::{byte_utils, events}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; @@ -5728,6 +5728,39 @@ impl ChannelManager Future>( + &self, handler: H + ) { + // We'll acquire our total consistency lock until the returned future completes so that + // we can be sure no other persists happen while processing events. + let _read_guard = self.total_consistency_lock.read().unwrap(); + + let mut result = NotifyOption::SkipPersist; + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events { + handler(event).await; + } + + if result == NotifyOption::DoPersist { + self.persistence_notifier.notify(); + } + } } impl MessageSendEventsProvider for ChannelManager From 8d20ebc376fa4c821b9b7e3f63164b7143986bbc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 20 Oct 2022 15:51:37 -0700 Subject: [PATCH 5/7] Handle events asynchronously in the BackgroundProcessor's async variant --- lightning-background-processor/src/lib.rs | 38 ++++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index b394a2311c3..9e2c7203e9e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -236,15 +236,11 @@ where A::Target: chain::Access, L::Target: Logger { } macro_rules! define_run_body { - ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr) => { { - let event_handler = DecoratingEventHandler { - event_handler: $event_handler, - gossip_sync: &$gossip_sync, - }; - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -255,8 +251,8 @@ macro_rules! define_run_body { let mut have_pruned = false; loop { - $channel_manager.process_pending_events(&event_handler); - $chain_monitor.process_pending_events(&event_handler); + $process_channel_manager_events; + $process_chain_monitor_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -389,7 +385,8 @@ pub async fn process_events_async< CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, OMH: 'static + Deref + Send + Sync, - EH: 'static + EventHandler + Send, + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, @@ -402,7 +399,7 @@ pub async fn process_events_async< SleepFuture: core::future::Future, Sleeper: Fn(Duration) -> SleepFuture >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), std::io::Error> @@ -422,7 +419,19 @@ where PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, { let mut should_break = true; - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let async_event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + let event_handler = &event_handler; + async move { + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler(event).await; + } + }; + define_run_body!(persister, + chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { select_biased! { _ = channel_manager.get_persistable_update_future().fuse() => true, @@ -527,7 +536,12 @@ impl BackgroundProcessor { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let event_handler = DecoratingEventHandler { + event_handler, + gossip_sync: &gossip_sync, + }; + define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) }); From 9787b28e89d9d988a5c2cc7593a6dde58b00227a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 2 Nov 2022 16:29:00 -0700 Subject: [PATCH 6/7] Drop DecoratingEventHandler in favor of inline closure --- lightning-background-processor/src/lib.rs | 42 ++++------------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 9e2c7203e9e..ec7d29256e6 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -202,39 +202,6 @@ fn handle_network_graph_update( } } -/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. -struct DecoratingEventHandler< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> -where A::Target: chain::Access, L::Target: Logger { - event_handler: E, - gossip_sync: &'a GossipSync, -} - -impl< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L> -where A::Target: chain::Access, L::Target: Logger { - fn handle_event(&self, event: Event) { - if let Some(network_graph) = self.gossip_sync.network_graph() { - handle_network_graph_update(network_graph, &event) - } - self.event_handler.handle_event(event); - } -} - macro_rules! define_run_body { ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, @@ -536,9 +503,12 @@ impl BackgroundProcessor { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let event_handler = DecoratingEventHandler { - event_handler, - gossip_sync: &gossip_sync, + let event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler.handle_event(event); }; define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), From d3f1e25f65bd67305a6e3fd3e824cc750dec1ad9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 2 Nov 2022 16:29:17 -0700 Subject: [PATCH 7/7] Note async versions of event handling within EventsProvider --- lightning/src/util/events.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index c369a376a87..9818bfc4c8d 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1400,6 +1400,10 @@ pub trait OnionMessageProvider { /// /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`]. /// +/// Implementations of this trait may also feature an async version of event handling, as shown with +/// [`ChannelManager::process_pending_events_async`] and +/// [`ChainMonitor::process_pending_events_async`]. +/// /// # Requirements /// /// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending @@ -1426,6 +1430,8 @@ pub trait OnionMessageProvider { /// [`handle_event`]: EventHandler::handle_event /// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events /// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events +/// [`ChannelManager::process_pending_events_async`]: crate::ln::channelmanager::ChannelManager::process_pending_events_async +/// [`ChainMonitor::process_pending_events_async`]: crate::chain::chainmonitor::ChainMonitor::process_pending_events_async pub trait EventsProvider { /// Processes any events generated since the last call using the given event handler. /// @@ -1434,6 +1440,10 @@ pub trait EventsProvider { } /// A trait implemented for objects handling events from [`EventsProvider`]. +/// +/// An async variation also exists for implementations of [`EventsProvider`] that support async +/// event handling. The async event handler should satisfy the generic bounds: `F: +/// core::future::Future, H: Fn(Event) -> F`. pub trait EventHandler { /// Handles the given [`Event`]. ///