Skip to content

Handle initial_routing_sync requests from peers in their Init messages (issue 148) #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pub struct AnnouncementSignatures {
}

/// An address which can be used to connect to a remote peer
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum NetAddress {
/// An IPv4 address/port on which the peer is listenting.
IPv4 {
Expand Down Expand Up @@ -381,7 +381,7 @@ impl NetAddress {
}
}
}

#[derive(Clone, PartialEq)]
// Only exposed as broadcast of node_announcement should be filtered by node_id
/// The unsigned part of a node_announcement
pub struct UnsignedNodeAnnouncement {
Expand All @@ -398,6 +398,7 @@ pub struct UnsignedNodeAnnouncement {
pub(crate) excess_address_data: Vec<u8>,
pub(crate) excess_data: Vec<u8>,
}
#[derive(Clone, PartialEq)]
/// A node_announcement message to be sent or received from a peer
pub struct NodeAnnouncement {
pub(crate) signature: Signature,
Expand Down Expand Up @@ -574,6 +575,18 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
/// Handle an incoming error message from the given peer.
fn handle_error(&self, their_node_id: &PublicKey, msg: &ErrorMessage);
}
#[derive(PartialEq)]
///Enum used to keep track of syncing information/state of peer and if a sync is required
pub enum InitSyncTracker{
///This indicates if a sync is required or not, false is no sync required, true is sync required but not started
Sync(bool),
///This is the last synced node's public key
///During this state it is syncing nodes
NodeCounter(PublicKey),
///This is the last synced channel _id
///During this state it is syncing nodes
ChannelCounter(u64),
}

/// A trait to describe an object which can receive routing messages.
pub trait RoutingMessageHandler : Send + Sync {
Expand All @@ -588,6 +601,12 @@ pub trait RoutingMessageHandler : Send + Sync {
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, HandleError>;
/// Handle some updates to the route graph that we learned due to an outbound failed payment.
fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
///Gets a subset of the channel announcements and updates required to dump our routing table to a remote node, starting at the short_channel_id indicated by starting_point.channelcounter and including batch_amount entries
/// This function will start iterating at 0 if the starting_point is < 0.
fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(ChannelAnnouncement, ChannelUpdate,ChannelUpdate)>);
///Gets a subset of the node announcements required to dump our routing table to a remote node, starting at the PublicKey indicated by starting_point.nodecounter and including batch_amount entries
/// This function will start iterating at 0 if the starting_point is < 0.
fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<NodeAnnouncement>);
}

pub(crate) struct OnionRealm0HopData {
Expand Down
95 changes: 72 additions & 23 deletions src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ struct Peer {
pending_read_buffer: Vec<u8>,
pending_read_buffer_pos: usize,
pending_read_is_header: bool,
sync_status : msgs::InitSyncTracker,
}

impl Peer {
pub fn require_sync(&self)->bool{
if let msgs::InitSyncTracker::Sync(i) = self.sync_status {i} else {false}
}

/// this function checks if the the channel announcements and updates are allowed to be forwarded to a specific peer.
/// If the peer is in syncing state and the channel_id has not been synced then the function returns false as this info will forward at a later stage and
/// we dont want to send duplicate messages. If the channel was already synced then we can forward those messages and the function will then return true.
pub fn is_channel_allowed_to_forward(&self, channel_id : u64)->bool{
match self.sync_status {
msgs::InitSyncTracker::Sync(i) => !i,
msgs::InitSyncTracker::NodeCounter(_i) => false,
msgs::InitSyncTracker::ChannelCounter(i) => (i < channel_id),
}
}
}

struct PeerHolder<Descriptor: SocketDescriptor> {
Expand Down Expand Up @@ -221,6 +239,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
sync_status : msgs::InitSyncTracker::Sync(false),
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Expand Down Expand Up @@ -255,22 +274,47 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
sync_status : msgs::InitSyncTracker::Sync(false),
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
}

fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) {
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
macro_rules! encode_and_send_msg {
($msg: expr, $msg_code: expr) => {
{
log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
}
}
}
while !peer.awaiting_write_event {
if {
let should_be_reading = peer.pending_outbound_buffer.len() < 10;
if (peer.require_sync()) &&(should_be_reading){
match peer.sync_status{
msgs::InitSyncTracker::ChannelCounter(_c) => {
let all_messages_tuple = self.message_handler.route_handler.get_next_channel_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
for tuple in all_messages_tuple.iter(){
encode_and_send_msg!(tuple.0, 256);
encode_and_send_msg!(tuple.1, 258);
encode_and_send_msg!(tuple.2, 258);
}
},
_=>{let all_messages = self.message_handler.route_handler.get_next_node_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
for message in all_messages.iter(){
encode_and_send_msg!(message, 256);
}},
};
}
let next_buff = match peer.pending_outbound_buffer.front() {
None => return,
Some(buff) => buff,
};
let should_be_reading = peer.pending_outbound_buffer.len() < 10;

let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
let data_sent = descriptor.send_data(&next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
} {
Expand All @@ -297,7 +341,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
None => panic!("Descriptor for write_event is not already known to PeerManager"),
Some(peer) => {
peer.awaiting_write_event = false;
Self::do_attempt_write_data(descriptor, peer);
self.do_attempt_write_data(descriptor, peer);
}
};
Ok(())
Expand Down Expand Up @@ -522,6 +566,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
if msg.local_features.supports_unknown_bits() { "present" } else { "none" },
if msg.global_features.supports_unknown_bits() { "present" } else { "none" });

if msg.local_features.initial_routing_sync() {
peer.sync_status = msgs::InitSyncTracker::Sync(true);
peers.peers_needing_send.insert(peer_descriptor.clone());
}
peer.their_global_features = Some(msg.global_features);
peer.their_local_features = Some(msg.local_features);

Expand All @@ -531,6 +579,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
local_features.set_initial_routing_sync();
}

encode_and_send_msg!(msgs::Init {
global_features: msgs::GlobalFeatures::new(),
local_features,
Expand Down Expand Up @@ -678,7 +727,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
}

Self::do_attempt_write_data(peer_descriptor, peer);
self.do_attempt_write_data(peer_descriptor, peer);

peer.pending_outbound_buffer.len() > 10 // pause_read
}
Expand Down Expand Up @@ -735,7 +784,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
Expand All @@ -745,7 +794,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
Expand All @@ -757,7 +806,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
Expand All @@ -768,7 +817,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
Expand All @@ -778,7 +827,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
Expand All @@ -789,7 +838,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
Expand Down Expand Up @@ -817,7 +866,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
Expand All @@ -827,7 +876,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
Expand All @@ -837,7 +886,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
Expand All @@ -847,7 +896,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
Expand All @@ -857,7 +906,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
Expand All @@ -866,7 +915,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
let encoded_update_msg = encode_msg!(update_msg, 258);

for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||!peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
Expand All @@ -879,7 +928,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
Expand All @@ -889,11 +938,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
let encoded_msg = encode_msg!(msg, 258);

for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() || !peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
Expand All @@ -914,7 +963,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
Self::do_attempt_write_data(&mut descriptor, &mut peer);
self.do_attempt_write_data(&mut descriptor, &mut peer);
} else {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
Expand All @@ -932,7 +981,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
Self::do_attempt_write_data(&mut descriptor, peer);
self.do_attempt_write_data(&mut descriptor, peer);
},
}
} else {
Expand All @@ -944,7 +993,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {

for mut descriptor in peers.peers_needing_send.drain() {
match peers.peers.get_mut(&descriptor) {
Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
None => panic!("Inconsistent peers set state!"),
}
}
Expand Down
Loading