@@ -264,16 +264,34 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
264
264
Ok ( ( ) )
265
265
}
266
266
267
- fn do_attempt_write_data ( descriptor : & mut Descriptor , peer : & mut Peer ) {
267
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
268
+ macro_rules! encode_and_send_msg {
269
+ ( $msg: expr, $msg_code: expr) => {
270
+ {
271
+ log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
272
+ peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
273
+ }
274
+ }
275
+ }
268
276
while !peer. awaiting_write_event {
269
277
if {
278
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
279
+ if ( peer. last_synced_channel > -1 ) &&( should_be_reading) {
280
+ let all_messages_tuple = self . message_handler . route_handler . get_next_announcements ( peer. last_synced_channel , ( peer. pending_outbound_buffer . len ( ) -10 ) as u8 ) ;
281
+ for i in 0 ..all_messages_tuple. 0 . len ( ) {
282
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 0 . clone( ) , 256 ) ;
283
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 2 . clone( ) , 257 ) ;
284
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 3 . clone( ) , 257 ) ;
285
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 1 . clone( ) , 258 ) ;
286
+ }
287
+ peer. last_synced_channel = all_messages_tuple. 1 ;
288
+ }
270
289
let next_buff = match peer. pending_outbound_buffer . front ( ) {
271
290
None => return ,
272
291
Some ( buff) => buff,
273
292
} ;
274
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
275
293
276
- let data_sent = descriptor. send_data ( next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
294
+ let data_sent = descriptor. send_data ( & next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
277
295
peer. pending_outbound_buffer_first_msg_offset += data_sent;
278
296
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
279
297
} {
@@ -300,7 +318,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
300
318
None => panic ! ( "Descriptor for write_event is not already known to PeerManager" ) ,
301
319
Some ( peer) => {
302
320
peer. awaiting_write_event = false ;
303
- Self :: do_attempt_write_data ( descriptor, peer) ;
321
+ self . do_attempt_write_data ( descriptor, peer) ;
304
322
}
305
323
} ;
306
324
Ok ( ( ) )
@@ -351,22 +369,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
351
369
peer. pending_read_buffer_pos = 0 ;
352
370
353
371
macro_rules! encode_and_send_msg {
354
- ( $msg: expr, $msg_code: expr) => {
355
- {
356
- encode_and_send_actual_msg!( $msg, $msg_code) ;
357
- if peer. last_synced_channel > -1 {
358
- let all_messages_tuple = self . message_handler. route_handler. get_next_announcements( peer. last_synced_channel, 5 ) ;
359
- for i in 0 ..all_messages_tuple. 0 . len( ) {
360
- encode_and_send_actual_msg!( all_messages_tuple. 0 [ i] . 0 . clone( ) , 256 ) ;
361
- encode_and_send_actual_msg!( all_messages_tuple. 0 [ i] . 2 . clone( ) , 257 ) ;
362
- encode_and_send_actual_msg!( all_messages_tuple. 0 [ i] . 3 . clone( ) , 257 ) ;
363
- encode_and_send_actual_msg!( all_messages_tuple. 0 [ i] . 1 . clone( ) , 258 ) ;
364
- }
365
- peer. last_synced_channel = all_messages_tuple. 1 ;
366
- }
367
- }
368
- }
369
- } macro_rules! encode_and_send_actual_msg {
370
372
( $msg: expr, $msg_code: expr) => {
371
373
{
372
374
log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
@@ -691,7 +693,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
691
693
}
692
694
}
693
695
694
- Self :: do_attempt_write_data ( peer_descriptor, peer) ;
696
+ self . do_attempt_write_data ( peer_descriptor, peer) ;
695
697
696
698
peer. pending_outbound_buffer . len ( ) > 10 // pause_read
697
699
}
@@ -748,7 +750,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
748
750
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
749
751
} ) ;
750
752
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 33 ) ) ) ;
751
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
753
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
752
754
} ,
753
755
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
754
756
log_trace ! ( self , "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
@@ -758,7 +760,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
758
760
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
759
761
} ) ;
760
762
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 32 ) ) ) ;
761
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
763
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
762
764
} ,
763
765
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
764
766
log_trace ! ( self , "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -770,7 +772,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
770
772
//they should just throw away this funding transaction
771
773
} ) ;
772
774
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 34 ) ) ) ;
773
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
775
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
774
776
} ,
775
777
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
776
778
log_trace ! ( self , "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
@@ -781,7 +783,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
781
783
//they should just throw away this funding transaction
782
784
} ) ;
783
785
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 35 ) ) ) ;
784
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
786
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
785
787
} ,
786
788
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
787
789
log_trace ! ( self , "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
@@ -791,7 +793,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
791
793
//TODO: Do whatever we're gonna do for handling dropped messages
792
794
} ) ;
793
795
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 36 ) ) ) ;
794
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
796
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
795
797
} ,
796
798
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
797
799
log_trace ! ( self , "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
@@ -802,7 +804,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
802
804
//they should just throw away this funding transaction
803
805
} ) ;
804
806
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 259 ) ) ) ;
805
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
807
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
806
808
} ,
807
809
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
808
810
log_trace ! ( self , "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -830,7 +832,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
830
832
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 134 ) ) ) ;
831
833
}
832
834
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed, 132 ) ) ) ;
833
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
835
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
834
836
} ,
835
837
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
836
838
log_trace ! ( self , "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
@@ -840,7 +842,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
840
842
//TODO: Do whatever we're gonna do for handling dropped messages
841
843
} ) ;
842
844
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 133 ) ) ) ;
843
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
845
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
844
846
} ,
845
847
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
846
848
log_trace ! ( self , "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
@@ -850,7 +852,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
850
852
//TODO: Do whatever we're gonna do for handling dropped messages
851
853
} ) ;
852
854
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 39 ) ) ) ;
853
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
855
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
854
856
} ,
855
857
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
856
858
log_trace ! ( self , "Handling Shutdown event in peer_handler for node {} for channel {}" ,
@@ -860,7 +862,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
860
862
//TODO: Do whatever we're gonna do for handling dropped messages
861
863
} ) ;
862
864
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 38 ) ) ) ;
863
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
865
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
864
866
} ,
865
867
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
866
868
log_trace ! ( self , "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
@@ -870,7 +872,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
870
872
//TODO: Do whatever we're gonna do for handling dropped messages
871
873
} ) ;
872
874
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 136 ) ) ) ;
873
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
875
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
874
876
} ,
875
877
MessageSendEvent :: BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
876
878
log_trace ! ( self , "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -892,7 +894,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
892
894
}
893
895
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
894
896
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
895
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
897
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
896
898
}
897
899
}
898
900
} ,
@@ -906,7 +908,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
906
908
continue
907
909
}
908
910
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
909
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
911
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
910
912
}
911
913
}
912
914
} ,
@@ -927,7 +929,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
927
929
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
928
930
// This isn't guaranteed to work, but if there is enough free
929
931
// room in the send buffer, put the error message there...
930
- Self :: do_attempt_write_data ( & mut descriptor, & mut peer) ;
932
+ self . do_attempt_write_data ( & mut descriptor, & mut peer) ;
931
933
} else {
932
934
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
933
935
}
@@ -945,7 +947,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
945
947
//TODO: Do whatever we're gonna do for handling dropped messages
946
948
} ) ;
947
949
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
948
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
950
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
949
951
} ,
950
952
}
951
953
} else {
@@ -957,7 +959,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
957
959
958
960
for mut descriptor in peers. peers_needing_send . drain ( ) {
959
961
match peers. peers . get_mut ( & descriptor) {
960
- Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
962
+ Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
961
963
None => panic ! ( "Inconsistent peers set state!" ) ,
962
964
}
963
965
}
0 commit comments