Skip to content

Commit 9762dc8

Browse files
committed
Apply backpressure when we have too many gossip checks in-flight
Now that the `RoutingMessageHandler` can signal that it needs to apply message backpressure, we implement it here in the `PeerManager`. There's not much complicated here, aside from noting that we need to add the ability to call `send_data` with no data to indicate that reading should resume (and track when we may need to make such calls when updating the routing-backpressure state).
1 parent a326b99 commit 9762dc8

File tree

1 file changed

+42
-8
lines changed

1 file changed

+42
-8
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
563563

564564
peer_counter: AtomicCounter,
565565

566+
gossip_processing_backlogged: AtomicBool,
567+
gossip_processing_backlog_lifted: AtomicBool,
568+
566569
node_signer: NS,
567570

568571
logger: L,
@@ -721,6 +724,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
721724
blocked_event_processors: AtomicBool::new(false),
722725
ephemeral_key_midstate,
723726
peer_counter: AtomicCounter::new(),
727+
gossip_processing_backlogged: AtomicBool::new(false),
728+
gossip_processing_backlog_lifted: AtomicBool::new(false),
724729
last_node_announcement_serial: AtomicU32::new(current_time),
725730
logger,
726731
custom_message_handler,
@@ -847,7 +852,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
847852
Ok(())
848853
}
849854

850-
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
855+
fn peer_should_read(&self, peer: &Peer) -> bool {
856+
!self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read()
857+
}
858+
859+
fn update_gossip_backlogged(&self) {
860+
let new_state = self.message_handler.route_handler.processing_queue_high();
861+
let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
862+
if prev_state && !new_state {
863+
self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
864+
}
865+
}
866+
867+
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) {
868+
let mut have_written = false;
851869
while !peer.awaiting_write_event {
852870
if peer.should_buffer_onion_message() {
853871
if let Some(peer_node_id) = peer.their_node_id {
@@ -905,12 +923,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
905923
}
906924

907925
let next_buff = match peer.pending_outbound_buffer.front() {
908-
None => return,
926+
None => {
927+
if force_one_write && !have_written {
928+
let should_read = self.peer_should_read(&peer);
929+
if should_read {
930+
let data_sent = descriptor.send_data(&[], should_read);
931+
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
932+
}
933+
}
934+
return
935+
},
909936
Some(buff) => buff,
910937
};
911938

912939
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
913-
let data_sent = descriptor.send_data(pending, peer.should_read());
940+
let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer));
941+
have_written = true;
914942
peer.pending_outbound_buffer_first_msg_offset += data_sent;
915943
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
916944
peer.pending_outbound_buffer_first_msg_offset = 0;
@@ -945,7 +973,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
945973
Some(peer_mutex) => {
946974
let mut peer = peer_mutex.lock().unwrap();
947975
peer.awaiting_write_event = false;
948-
self.do_attempt_write_data(descriptor, &mut peer);
976+
self.do_attempt_write_data(descriptor, &mut peer, false);
949977
}
950978
};
951979
Ok(())
@@ -1192,7 +1220,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11921220
}
11931221
}
11941222
}
1195-
pause_read = !peer.should_read();
1223+
pause_read = !self.peer_should_read(&peer);
11961224

11971225
if let Some(message) = msg_to_handle {
11981226
match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1404,19 +1432,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
14041432
.map_err(|e| -> MessageHandlingError { e.into() })? {
14051433
should_forward = Some(wire::Message::ChannelAnnouncement(msg));
14061434
}
1435+
self.update_gossip_backlogged();
14071436
},
14081437
wire::Message::NodeAnnouncement(msg) => {
14091438
if self.message_handler.route_handler.handle_node_announcement(&msg)
14101439
.map_err(|e| -> MessageHandlingError { e.into() })? {
14111440
should_forward = Some(wire::Message::NodeAnnouncement(msg));
14121441
}
1442+
self.update_gossip_backlogged();
14131443
},
14141444
wire::Message::ChannelUpdate(msg) => {
14151445
self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg);
14161446
if self.message_handler.route_handler.handle_channel_update(&msg)
14171447
.map_err(|e| -> MessageHandlingError { e.into() })? {
14181448
should_forward = Some(wire::Message::ChannelUpdate(msg));
14191449
}
1450+
self.update_gossip_backlogged();
14201451
},
14211452
wire::Message::QueryShortChannelIds(msg) => {
14221453
self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?;
@@ -1564,6 +1595,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
15641595
}
15651596
}
15661597

1598+
self.update_gossip_backlogged();
1599+
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
1600+
15671601
let mut peers_to_disconnect = HashMap::new();
15681602
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
15691603
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
@@ -1793,7 +1827,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17931827
}
17941828

17951829
for (descriptor, peer_mutex) in peers.iter() {
1796-
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
1830+
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap(), flush_read_disabled);
17971831
}
17981832
}
17991833
if !peers_to_disconnect.is_empty() {
@@ -1815,7 +1849,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18151849
self.enqueue_message(&mut *peer, &msg);
18161850
// This isn't guaranteed to work, but if there is enough free
18171851
// room in the send buffer, put the error message there...
1818-
self.do_attempt_write_data(&mut descriptor, &mut *peer);
1852+
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
18191853
} else {
18201854
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
18211855
}
@@ -1965,7 +1999,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19651999
byteslen: 64,
19662000
};
19672001
self.enqueue_message(&mut *peer, &ping);
1968-
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
2002+
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, false);
19692003
}
19702004
}
19712005

0 commit comments

Comments
 (0)