Skip to content

#2995 followups #3193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2318,8 +2318,8 @@ mod tests {

begin_open_channel!(nodes[0], nodes[1], channel_value);
assert_eq!(
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap(),
second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap()
);

if !std::thread::panicking() {
Expand Down
5 changes: 3 additions & 2 deletions lightning/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,9 @@ pub enum Event {
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
///
/// # Failure Behavior and Persistence
/// This event will eventually be replayed after failures-to-handle (i.e., the event handler
/// returning `Err(ReplayEvent ())`), but won't be persisted across restarts.
/// This event won't be replayed after failures-to-handle
/// (i.e., the event handler returning `Err(ReplayEvent ())`), and also won't be persisted
/// across restarts.
///
/// [`OnionMessage`]: msgs::OnionMessage
/// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter
Expand Down
67 changes: 53 additions & 14 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,21 +1047,25 @@ where
}
}

macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
// successfully handled events from the given queue, reset the events processing flag, and
// return, to have the events eventually replayed upon next invocation.
{
let mut queue_lock = $event_queue.lock().unwrap();

// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
let mut res_iter = $res.iter().skip($offset);

// Keep all events which previously error'd *or* any that have been added since we dropped
// the Mutex before.
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
let mut any_error = false;
queue_lock.retain(|_| {
$res_iter.next().map_or(true, |r| {
let is_err = r.is_err();
any_error |= is_err;
is_err
})
});

if $res.iter().any(|r| r.is_err()) {
if any_error {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
$self.event_notifier.notify();
Expand Down Expand Up @@ -1426,7 +1430,8 @@ where
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
let res = MultiResultFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
let mut res_iter = res.iter().skip(intercepted_msgs_offset);
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
}

{
Expand All @@ -1449,7 +1454,8 @@ where
futures.push(future);
}
let res = MultiResultFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
let mut res_iter = res.iter();
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
}
}
self.pending_events_processor.store(false, Ordering::Release);
Expand Down Expand Up @@ -1508,7 +1514,7 @@ where
{
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
intercepted_msgs = pending_intercepted_msgs_events.clone();
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
peer_connecteds = pending_peer_connected_events.clone();
#[cfg(debug_assertions)] {
for ev in pending_intercepted_msgs_events.iter() {
Expand All @@ -1518,14 +1524,47 @@ where
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}

let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
let mut handling_intercepted_msgs_failed = false;
let mut num_handled_intercepted_events = 0;
for ev in intercepted_msgs {
match handler.handle_event(ev) {
Ok(()) => num_handled_intercepted_events += 1,
Err(ReplayEvent ()) => {
handling_intercepted_msgs_failed = true;
break;
}
}
}

{
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
}

let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
if handling_intercepted_msgs_failed {
self.pending_events_processor.store(false, Ordering::Release);
self.event_notifier.notify();
return;
}

let mut num_handled_peer_connecteds = 0;
for ev in peer_connecteds {
match handler.handle_event(ev) {
Ok(()) => num_handled_peer_connecteds += 1,
Err(ReplayEvent ()) => {
self.event_notifier.notify();
break;
}
}
}

{
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
pending_peer_connected_events.drain(..num_handled_peer_connecteds);
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}

self.pending_events_processor.store(false, Ordering::Release);
}
Expand Down
Loading