@@ -102,6 +102,7 @@ struct Peer {
102
102
pending_read_buffer : Vec < u8 > ,
103
103
pending_read_buffer_pos : usize ,
104
104
pending_read_is_header : bool ,
105
+ last_synced_channel : i64 ,
105
106
}
106
107
107
108
struct PeerHolder < Descriptor : SocketDescriptor > {
@@ -221,6 +222,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
221
222
pending_read_buffer : pending_read_buffer,
222
223
pending_read_buffer_pos : 0 ,
223
224
pending_read_is_header : false ,
225
+ last_synced_channel : -1 ,
224
226
} ) . is_some ( ) {
225
227
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
226
228
} ;
@@ -255,22 +257,41 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
255
257
pending_read_buffer : pending_read_buffer,
256
258
pending_read_buffer_pos : 0 ,
257
259
pending_read_is_header : false ,
260
+ last_synced_channel : -1 ,
258
261
} ) . is_some ( ) {
259
262
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
260
263
} ;
261
264
Ok ( ( ) )
262
265
}
263
266
264
- fn do_attempt_write_data ( descriptor : & mut Descriptor , peer : & mut Peer ) {
267
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
268
+ macro_rules! encode_and_send_msg {
269
+ ( $msg: expr, $msg_code: expr) => {
270
+ {
271
+ log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
272
+ peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
273
+ }
274
+ }
275
+ }
265
276
while !peer. awaiting_write_event {
266
277
if {
278
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
279
+ if ( peer. last_synced_channel > -1 ) &&( should_be_reading) {
280
+ let all_messages_tuple = self . message_handler . route_handler . get_next_announcements ( peer. last_synced_channel , ( peer. pending_outbound_buffer . len ( ) -10 ) as u8 ) ;
281
+ for i in 0 ..all_messages_tuple. 0 . len ( ) {
282
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 0 . clone( ) , 256 ) ;
283
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 2 . clone( ) , 257 ) ;
284
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 3 . clone( ) , 257 ) ;
285
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 1 . clone( ) , 258 ) ;
286
+ }
287
+ peer. last_synced_channel = all_messages_tuple. 1 ;
288
+ }
267
289
let next_buff = match peer. pending_outbound_buffer . front ( ) {
268
290
None => return ,
269
291
Some ( buff) => buff,
270
292
} ;
271
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
272
293
273
- let data_sent = descriptor. send_data ( next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
294
+ let data_sent = descriptor. send_data ( & next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
274
295
peer. pending_outbound_buffer_first_msg_offset += data_sent;
275
296
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
276
297
} {
@@ -297,7 +318,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
297
318
None => panic ! ( "Descriptor for write_event is not already known to PeerManager" ) ,
298
319
Some ( peer) => {
299
320
peer. awaiting_write_event = false ;
300
- Self :: do_attempt_write_data ( descriptor, peer) ;
321
+ self . do_attempt_write_data ( descriptor, peer) ;
301
322
}
302
323
} ;
303
324
Ok ( ( ) )
@@ -511,6 +532,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
511
532
if msg. local_features. supports_unknown_bits( ) { "present" } else { "none" } ,
512
533
if msg. global_features. supports_unknown_bits( ) { "present" } else { "none" } ) ;
513
534
535
+ let do_they_require_sync = msg. local_features . initial_routing_sync ( ) ;
514
536
peer. their_global_features = Some ( msg. global_features ) ;
515
537
peer. their_local_features = Some ( msg. local_features ) ;
516
538
@@ -520,6 +542,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
520
542
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
521
543
local_features. set_initial_routing_sync ( ) ;
522
544
}
545
+
546
+ if do_they_require_sync {
547
+ peer. last_synced_channel = 0 ; //set to larger than -1, means next sent message will trigger slow trickle of sync data
548
+ }
523
549
encode_and_send_msg ! ( msgs:: Init {
524
550
global_features: msgs:: GlobalFeatures :: new( ) ,
525
551
local_features,
@@ -667,7 +693,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
667
693
}
668
694
}
669
695
670
- Self :: do_attempt_write_data ( peer_descriptor, peer) ;
696
+ self . do_attempt_write_data ( peer_descriptor, peer) ;
671
697
672
698
peer. pending_outbound_buffer . len ( ) > 10 // pause_read
673
699
}
@@ -724,7 +750,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
724
750
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
725
751
} ) ;
726
752
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 33 ) ) ) ;
727
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
753
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
728
754
} ,
729
755
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
730
756
log_trace ! ( self , "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
@@ -734,7 +760,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
734
760
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
735
761
} ) ;
736
762
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 32 ) ) ) ;
737
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
763
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
738
764
} ,
739
765
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
740
766
log_trace ! ( self , "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -746,7 +772,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
746
772
//they should just throw away this funding transaction
747
773
} ) ;
748
774
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 34 ) ) ) ;
749
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
775
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
750
776
} ,
751
777
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
752
778
log_trace ! ( self , "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
@@ -757,7 +783,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
757
783
//they should just throw away this funding transaction
758
784
} ) ;
759
785
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 35 ) ) ) ;
760
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
786
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
761
787
} ,
762
788
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
763
789
log_trace ! ( self , "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
@@ -767,7 +793,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
767
793
//TODO: Do whatever we're gonna do for handling dropped messages
768
794
} ) ;
769
795
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 36 ) ) ) ;
770
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
796
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
771
797
} ,
772
798
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
773
799
log_trace ! ( self , "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
@@ -778,7 +804,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
778
804
//they should just throw away this funding transaction
779
805
} ) ;
780
806
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 259 ) ) ) ;
781
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
807
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
782
808
} ,
783
809
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
784
810
log_trace ! ( self , "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -806,7 +832,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
806
832
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 134 ) ) ) ;
807
833
}
808
834
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed, 132 ) ) ) ;
809
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
835
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
810
836
} ,
811
837
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
812
838
log_trace ! ( self , "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
@@ -816,7 +842,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
816
842
//TODO: Do whatever we're gonna do for handling dropped messages
817
843
} ) ;
818
844
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 133 ) ) ) ;
819
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
845
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
820
846
} ,
821
847
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
822
848
log_trace ! ( self , "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
@@ -826,7 +852,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
826
852
//TODO: Do whatever we're gonna do for handling dropped messages
827
853
} ) ;
828
854
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 39 ) ) ) ;
829
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
855
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
830
856
} ,
831
857
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
832
858
log_trace ! ( self , "Handling Shutdown event in peer_handler for node {} for channel {}" ,
@@ -836,7 +862,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
836
862
//TODO: Do whatever we're gonna do for handling dropped messages
837
863
} ) ;
838
864
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 38 ) ) ) ;
839
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
865
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
840
866
} ,
841
867
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
842
868
log_trace ! ( self , "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
@@ -846,7 +872,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
846
872
//TODO: Do whatever we're gonna do for handling dropped messages
847
873
} ) ;
848
874
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 136 ) ) ) ;
849
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
875
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
850
876
} ,
851
877
MessageSendEvent :: BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
852
878
log_trace ! ( self , "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -868,7 +894,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
868
894
}
869
895
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
870
896
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
871
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
897
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
872
898
}
873
899
}
874
900
} ,
@@ -882,7 +908,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
882
908
continue
883
909
}
884
910
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
885
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
911
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
886
912
}
887
913
}
888
914
} ,
@@ -903,7 +929,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
903
929
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
904
930
// This isn't guaranteed to work, but if there is enough free
905
931
// room in the send buffer, put the error message there...
906
- Self :: do_attempt_write_data ( & mut descriptor, & mut peer) ;
932
+ self . do_attempt_write_data ( & mut descriptor, & mut peer) ;
907
933
} else {
908
934
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
909
935
}
@@ -921,7 +947,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
921
947
//TODO: Do whatever we're gonna do for handling dropped messages
922
948
} ) ;
923
949
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
924
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
950
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
925
951
} ,
926
952
}
927
953
} else {
@@ -933,7 +959,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
933
959
934
960
for mut descriptor in peers. peers_needing_send . drain ( ) {
935
961
match peers. peers . get_mut ( & descriptor) {
936
- Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
962
+ Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
937
963
None => panic ! ( "Inconsistent peers set state!" ) ,
938
964
}
939
965
}
0 commit comments