@@ -102,6 +102,7 @@ struct Peer {
102
102
pending_read_buffer : Vec < u8 > ,
103
103
pending_read_buffer_pos : usize ,
104
104
pending_read_is_header : bool ,
105
+ last_synced_channel : i64 ,
105
106
}
106
107
107
108
struct PeerHolder < Descriptor : SocketDescriptor > {
@@ -221,6 +222,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
221
222
pending_read_buffer : pending_read_buffer,
222
223
pending_read_buffer_pos : 0 ,
223
224
pending_read_is_header : false ,
225
+ last_synced_channel : -1 ,
224
226
} ) . is_some ( ) {
225
227
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
226
228
} ;
@@ -255,22 +257,42 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
255
257
pending_read_buffer : pending_read_buffer,
256
258
pending_read_buffer_pos : 0 ,
257
259
pending_read_is_header : false ,
260
+ last_synced_channel : -1 ,
258
261
} ) . is_some ( ) {
259
262
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
260
263
} ;
261
264
Ok ( ( ) )
262
265
}
263
266
264
- fn do_attempt_write_data ( descriptor : & mut Descriptor , peer : & mut Peer ) {
267
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
268
+ macro_rules! encode_and_send_msg {
269
+ ( $msg: expr, $msg_code: expr) => {
270
+ {
271
+ log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
272
+ peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
273
+ }
274
+ }
275
+ }
265
276
while !peer. awaiting_write_event {
266
277
if {
278
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
279
+ if ( peer. last_synced_channel > -1 ) &&( should_be_reading) {
280
+ let all_messages_tuple = self . message_handler . route_handler . get_next_announcements ( peer. last_synced_channel as u64 , ( 10 -peer. pending_outbound_buffer . len ( ) ) as u8 ) ;
281
+ for tuple in all_messages_tuple. 0 . iter ( ) {
282
+ encode_and_send_msg ! ( tuple. 0 , 256 ) ;
283
+ encode_and_send_msg ! ( tuple. 1 , 257 ) ;
284
+ encode_and_send_msg ! ( tuple. 2 , 257 ) ;
285
+ encode_and_send_msg ! ( tuple. 3 , 258 ) ;
286
+ encode_and_send_msg ! ( tuple. 4 , 258 ) ;
287
+ }
288
+ peer. last_synced_channel = all_messages_tuple. 1 ;
289
+ }
267
290
let next_buff = match peer. pending_outbound_buffer . front ( ) {
268
291
None => return ,
269
292
Some ( buff) => buff,
270
293
} ;
271
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
272
294
273
- let data_sent = descriptor. send_data ( next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
295
+ let data_sent = descriptor. send_data ( & next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
274
296
peer. pending_outbound_buffer_first_msg_offset += data_sent;
275
297
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
276
298
} {
@@ -297,7 +319,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
297
319
None => panic ! ( "Descriptor for write_event is not already known to PeerManager" ) ,
298
320
Some ( peer) => {
299
321
peer. awaiting_write_event = false ;
300
- Self :: do_attempt_write_data ( descriptor, peer) ;
322
+ self . do_attempt_write_data ( descriptor, peer) ;
301
323
}
302
324
} ;
303
325
Ok ( ( ) )
@@ -511,6 +533,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
511
533
if msg. local_features. supports_unknown_bits( ) { "present" } else { "none" } ,
512
534
if msg. global_features. supports_unknown_bits( ) { "present" } else { "none" } ) ;
513
535
536
+ let do_they_require_sync = msg. local_features . initial_routing_sync ( ) ;
514
537
peer. their_global_features = Some ( msg. global_features ) ;
515
538
peer. their_local_features = Some ( msg. local_features ) ;
516
539
@@ -520,6 +543,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
520
543
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
521
544
local_features. set_initial_routing_sync ( ) ;
522
545
}
546
+
547
+ if do_they_require_sync {
548
+ peer. last_synced_channel = 0 ; //set to larger than -1, means next sent message will trigger slow trickle of sync data
549
+ }
523
550
encode_and_send_msg ! ( msgs:: Init {
524
551
global_features: msgs:: GlobalFeatures :: new( ) ,
525
552
local_features,
@@ -667,7 +694,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
667
694
}
668
695
}
669
696
670
- Self :: do_attempt_write_data ( peer_descriptor, peer) ;
697
+ self . do_attempt_write_data ( peer_descriptor, peer) ;
671
698
672
699
peer. pending_outbound_buffer . len ( ) > 10 // pause_read
673
700
}
@@ -724,7 +751,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
724
751
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
725
752
} ) ;
726
753
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 33 ) ) ) ;
727
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
754
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
728
755
} ,
729
756
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
730
757
log_trace ! ( self , "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
@@ -734,7 +761,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
734
761
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
735
762
} ) ;
736
763
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 32 ) ) ) ;
737
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
764
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
738
765
} ,
739
766
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
740
767
log_trace ! ( self , "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -746,7 +773,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
746
773
//they should just throw away this funding transaction
747
774
} ) ;
748
775
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 34 ) ) ) ;
749
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
776
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
750
777
} ,
751
778
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
752
779
log_trace ! ( self , "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
@@ -757,7 +784,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
757
784
//they should just throw away this funding transaction
758
785
} ) ;
759
786
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 35 ) ) ) ;
760
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
787
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
761
788
} ,
762
789
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
763
790
log_trace ! ( self , "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
@@ -767,7 +794,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
767
794
//TODO: Do whatever we're gonna do for handling dropped messages
768
795
} ) ;
769
796
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 36 ) ) ) ;
770
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
797
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
771
798
} ,
772
799
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
773
800
log_trace ! ( self , "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
@@ -778,7 +805,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
778
805
//they should just throw away this funding transaction
779
806
} ) ;
780
807
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 259 ) ) ) ;
781
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
808
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
782
809
} ,
783
810
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
784
811
log_trace ! ( self , "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -806,7 +833,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
806
833
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 134 ) ) ) ;
807
834
}
808
835
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed, 132 ) ) ) ;
809
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
836
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
810
837
} ,
811
838
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
812
839
log_trace ! ( self , "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
@@ -816,7 +843,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
816
843
//TODO: Do whatever we're gonna do for handling dropped messages
817
844
} ) ;
818
845
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 133 ) ) ) ;
819
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
846
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
820
847
} ,
821
848
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
822
849
log_trace ! ( self , "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
@@ -826,7 +853,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
826
853
//TODO: Do whatever we're gonna do for handling dropped messages
827
854
} ) ;
828
855
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 39 ) ) ) ;
829
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
856
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
830
857
} ,
831
858
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
832
859
log_trace ! ( self , "Handling Shutdown event in peer_handler for node {} for channel {}" ,
@@ -836,7 +863,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
836
863
//TODO: Do whatever we're gonna do for handling dropped messages
837
864
} ) ;
838
865
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 38 ) ) ) ;
839
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
866
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
840
867
} ,
841
868
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
842
869
log_trace ! ( self , "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
@@ -846,7 +873,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
846
873
//TODO: Do whatever we're gonna do for handling dropped messages
847
874
} ) ;
848
875
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 136 ) ) ) ;
849
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
876
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
850
877
} ,
851
878
MessageSendEvent :: BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
852
879
log_trace ! ( self , "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -868,7 +895,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
868
895
}
869
896
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
870
897
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
871
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
898
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
872
899
}
873
900
}
874
901
} ,
@@ -882,7 +909,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
882
909
continue
883
910
}
884
911
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
885
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
912
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
886
913
}
887
914
}
888
915
} ,
@@ -903,7 +930,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
903
930
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
904
931
// This isn't guaranteed to work, but if there is enough free
905
932
// room in the send buffer, put the error message there...
906
- Self :: do_attempt_write_data ( & mut descriptor, & mut peer) ;
933
+ self . do_attempt_write_data ( & mut descriptor, & mut peer) ;
907
934
} else {
908
935
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
909
936
}
@@ -921,7 +948,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
921
948
//TODO: Do whatever we're gonna do for handling dropped messages
922
949
} ) ;
923
950
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
924
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
951
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
925
952
} ,
926
953
}
927
954
} else {
@@ -933,7 +960,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
933
960
934
961
for mut descriptor in peers. peers_needing_send . drain ( ) {
935
962
match peers. peers . get_mut ( & descriptor) {
936
- Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
963
+ Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
937
964
None => panic ! ( "Inconsistent peers set state!" ) ,
938
965
}
939
966
}
0 commit comments