@@ -102,6 +102,24 @@ struct Peer {
102
102
pending_read_buffer : Vec < u8 > ,
103
103
pending_read_buffer_pos : usize ,
104
104
pending_read_is_header : bool ,
105
+ sync_status : msgs:: InitSyncTracker ,
106
+ }
107
+
108
+ impl Peer {
109
+ pub fn require_sync ( & self ) ->bool {
110
+ if let msgs:: InitSyncTracker :: Sync ( i) = self . sync_status { i} else { false }
111
+ }
112
+
113
+ /// this function checks if the the channel announcements and updates are allowed to be forwarded to a specific peer.
114
+ /// If the peer is in syncing state and the channel_id has not been synced then the function returns false as this info will forward at a later stage and
115
+ /// we dont want to send duplicate messages. If the channel was already synced then we can forward those messages and the function will then return true.
116
+ pub fn is_channel_allowed_to_forward ( & self , channel_id : u64 ) ->bool {
117
+ match self . sync_status {
118
+ msgs:: InitSyncTracker :: Sync ( i) => !i,
119
+ msgs:: InitSyncTracker :: NodeCounter ( _i) => false ,
120
+ msgs:: InitSyncTracker :: ChannelCounter ( i) => ( i < channel_id) ,
121
+ }
122
+ }
105
123
}
106
124
107
125
struct PeerHolder < Descriptor : SocketDescriptor > {
@@ -221,6 +239,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
221
239
pending_read_buffer : pending_read_buffer,
222
240
pending_read_buffer_pos : 0 ,
223
241
pending_read_is_header : false ,
242
+ sync_status : msgs:: InitSyncTracker :: Sync ( false ) ,
224
243
} ) . is_some ( ) {
225
244
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
226
245
} ;
@@ -255,22 +274,47 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
255
274
pending_read_buffer : pending_read_buffer,
256
275
pending_read_buffer_pos : 0 ,
257
276
pending_read_is_header : false ,
277
+ sync_status : msgs:: InitSyncTracker :: Sync ( false ) ,
258
278
} ) . is_some ( ) {
259
279
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
260
280
} ;
261
281
Ok ( ( ) )
262
282
}
263
283
264
- fn do_attempt_write_data ( descriptor : & mut Descriptor , peer : & mut Peer ) {
284
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
285
+ macro_rules! encode_and_send_msg {
286
+ ( $msg: expr, $msg_code: expr) => {
287
+ {
288
+ log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
289
+ peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
290
+ }
291
+ }
292
+ }
265
293
while !peer. awaiting_write_event {
266
294
if {
295
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
296
+ if ( peer. require_sync ( ) ) &&( should_be_reading) {
297
+ match peer. sync_status {
298
+ msgs:: InitSyncTracker :: ChannelCounter ( _c) => {
299
+ let all_messages_tuple = self . message_handler . route_handler . get_next_channel_announcements ( & mut peer. sync_status , ( 10 -peer. pending_outbound_buffer . len ( ) ) as u8 ) ;
300
+ for tuple in all_messages_tuple. iter ( ) {
301
+ encode_and_send_msg ! ( tuple. 0 , 256 ) ;
302
+ encode_and_send_msg ! ( tuple. 1 , 258 ) ;
303
+ encode_and_send_msg ! ( tuple. 2 , 258 ) ;
304
+ }
305
+ } ,
306
+ _=>{ let all_messages = self . message_handler . route_handler . get_next_node_announcements ( & mut peer. sync_status , ( 10 -peer. pending_outbound_buffer . len ( ) ) as u8 ) ;
307
+ for message in all_messages. iter ( ) {
308
+ encode_and_send_msg ! ( message, 256 ) ;
309
+ } } ,
310
+ } ;
311
+ }
267
312
let next_buff = match peer. pending_outbound_buffer . front ( ) {
268
313
None => return ,
269
314
Some ( buff) => buff,
270
315
} ;
271
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
272
316
273
- let data_sent = descriptor. send_data ( next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
317
+ let data_sent = descriptor. send_data ( & next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
274
318
peer. pending_outbound_buffer_first_msg_offset += data_sent;
275
319
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
276
320
} {
@@ -297,7 +341,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
297
341
None => panic ! ( "Descriptor for write_event is not already known to PeerManager" ) ,
298
342
Some ( peer) => {
299
343
peer. awaiting_write_event = false ;
300
- Self :: do_attempt_write_data ( descriptor, peer) ;
344
+ self . do_attempt_write_data ( descriptor, peer) ;
301
345
}
302
346
} ;
303
347
Ok ( ( ) )
@@ -522,6 +566,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
522
566
if msg. local_features. supports_unknown_bits( ) { "present" } else { "none" } ,
523
567
if msg. global_features. supports_unknown_bits( ) { "present" } else { "none" } ) ;
524
568
569
+ if msg. local_features . initial_routing_sync ( ) {
570
+ peer. sync_status = msgs:: InitSyncTracker :: Sync ( true ) ;
571
+ peers. peers_needing_send . insert ( peer_descriptor. clone ( ) ) ;
572
+ }
525
573
peer. their_global_features = Some ( msg. global_features ) ;
526
574
peer. their_local_features = Some ( msg. local_features ) ;
527
575
@@ -531,6 +579,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
531
579
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
532
580
local_features. set_initial_routing_sync ( ) ;
533
581
}
582
+
534
583
encode_and_send_msg ! ( msgs:: Init {
535
584
global_features: msgs:: GlobalFeatures :: new( ) ,
536
585
local_features,
@@ -678,7 +727,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
678
727
}
679
728
}
680
729
681
- Self :: do_attempt_write_data ( peer_descriptor, peer) ;
730
+ self . do_attempt_write_data ( peer_descriptor, peer) ;
682
731
683
732
peer. pending_outbound_buffer . len ( ) > 10 // pause_read
684
733
}
@@ -735,7 +784,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
735
784
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
736
785
} ) ;
737
786
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 33 ) ) ) ;
738
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
787
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
739
788
} ,
740
789
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
741
790
log_trace ! ( self , "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
@@ -745,7 +794,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
745
794
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
746
795
} ) ;
747
796
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 32 ) ) ) ;
748
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
797
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
749
798
} ,
750
799
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
751
800
log_trace ! ( self , "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -757,7 +806,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
757
806
//they should just throw away this funding transaction
758
807
} ) ;
759
808
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 34 ) ) ) ;
760
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
809
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
761
810
} ,
762
811
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
763
812
log_trace ! ( self , "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
@@ -768,7 +817,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
768
817
//they should just throw away this funding transaction
769
818
} ) ;
770
819
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 35 ) ) ) ;
771
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
820
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
772
821
} ,
773
822
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
774
823
log_trace ! ( self , "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
@@ -778,7 +827,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
778
827
//TODO: Do whatever we're gonna do for handling dropped messages
779
828
} ) ;
780
829
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 36 ) ) ) ;
781
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
830
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
782
831
} ,
783
832
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
784
833
log_trace ! ( self , "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
@@ -789,7 +838,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
789
838
//they should just throw away this funding transaction
790
839
} ) ;
791
840
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 259 ) ) ) ;
792
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
841
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
793
842
} ,
794
843
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
795
844
log_trace ! ( self , "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -817,7 +866,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
817
866
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 134 ) ) ) ;
818
867
}
819
868
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed, 132 ) ) ) ;
820
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
869
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
821
870
} ,
822
871
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
823
872
log_trace ! ( self , "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
@@ -827,7 +876,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
827
876
//TODO: Do whatever we're gonna do for handling dropped messages
828
877
} ) ;
829
878
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 133 ) ) ) ;
830
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
879
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
831
880
} ,
832
881
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
833
882
log_trace ! ( self , "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
@@ -837,7 +886,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
837
886
//TODO: Do whatever we're gonna do for handling dropped messages
838
887
} ) ;
839
888
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 39 ) ) ) ;
840
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
889
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
841
890
} ,
842
891
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
843
892
log_trace ! ( self , "Handling Shutdown event in peer_handler for node {} for channel {}" ,
@@ -847,7 +896,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
847
896
//TODO: Do whatever we're gonna do for handling dropped messages
848
897
} ) ;
849
898
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 38 ) ) ) ;
850
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
899
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
851
900
} ,
852
901
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
853
902
log_trace ! ( self , "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
@@ -857,7 +906,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
857
906
//TODO: Do whatever we're gonna do for handling dropped messages
858
907
} ) ;
859
908
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 136 ) ) ) ;
860
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
909
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
861
910
} ,
862
911
MessageSendEvent :: BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
863
912
log_trace ! ( self , "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -866,7 +915,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
866
915
let encoded_update_msg = encode_msg ! ( update_msg, 258 ) ;
867
916
868
917
for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
869
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) {
918
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) ||!peer . is_channel_allowed_to_forward ( msg . contents . short_channel_id ) {
870
919
continue
871
920
}
872
921
match peer. their_node_id {
@@ -879,7 +928,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
879
928
}
880
929
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
881
930
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
882
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
931
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
883
932
}
884
933
}
885
934
} ,
@@ -889,11 +938,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
889
938
let encoded_msg = encode_msg ! ( msg, 258 ) ;
890
939
891
940
for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
892
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) {
941
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) || !peer . is_channel_allowed_to_forward ( msg . contents . short_channel_id ) {
893
942
continue
894
943
}
895
944
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
896
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
945
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
897
946
}
898
947
}
899
948
} ,
@@ -914,7 +963,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
914
963
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
915
964
// This isn't guaranteed to work, but if there is enough free
916
965
// room in the send buffer, put the error message there...
917
- Self :: do_attempt_write_data ( & mut descriptor, & mut peer) ;
966
+ self . do_attempt_write_data ( & mut descriptor, & mut peer) ;
918
967
} else {
919
968
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
920
969
}
@@ -932,7 +981,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
932
981
//TODO: Do whatever we're gonna do for handling dropped messages
933
982
} ) ;
934
983
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
935
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
984
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
936
985
} ,
937
986
}
938
987
} else {
@@ -944,7 +993,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
944
993
945
994
for mut descriptor in peers. peers_needing_send . drain ( ) {
946
995
match peers. peers . get_mut ( & descriptor) {
947
- Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
996
+ Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
948
997
None => panic ! ( "Inconsistent peers set state!" ) ,
949
998
}
950
999
}
0 commit comments