@@ -19,6 +19,7 @@ use std::collections::{HashMap,hash_map,LinkedList};
19
19
use std:: sync:: { Arc , Mutex } ;
20
20
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
21
21
use std:: { cmp, error, mem, hash, fmt} ;
22
+ use std:: { thread, time} ;
22
23
23
24
/// Provides references to trait impls which handle different types of messages.
24
25
pub struct MessageHandler {
@@ -102,6 +103,7 @@ struct Peer {
102
103
pending_read_buffer : Vec < u8 > ,
103
104
pending_read_buffer_pos : usize ,
104
105
pending_read_is_header : bool ,
106
+ last_synced_channel : u64 ,
105
107
}
106
108
107
109
struct PeerHolder < Descriptor : SocketDescriptor > {
@@ -214,6 +216,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
214
216
pending_read_buffer : pending_read_buffer,
215
217
pending_read_buffer_pos : 0 ,
216
218
pending_read_is_header : false ,
219
+ last_synced_channel : 0 ,
217
220
} ) . is_some ( ) {
218
221
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
219
222
} ;
@@ -248,6 +251,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
248
251
pending_read_buffer : pending_read_buffer,
249
252
pending_read_buffer_pos : 0 ,
250
253
pending_read_is_header : false ,
254
+ last_synced_channel : 0 ,
251
255
} ) . is_some ( ) {
252
256
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
253
257
} ;
@@ -507,6 +511,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
507
511
if msg. local_features. supports_unknown_bits( ) { "present" } else { "none" } ,
508
512
if msg. global_features. supports_unknown_bits( ) { "present" } else { "none" } ) ;
509
513
514
+ let do_they_require_sync = msg. local_features . initial_routing_sync ( ) ;
510
515
peer. their_global_features = Some ( msg. global_features ) ;
511
516
peer. their_local_features = Some ( msg. local_features ) ;
512
517
@@ -516,10 +521,30 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
516
521
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
517
522
local_features. set_initial_routing_sync ( ) ;
518
523
}
524
+ //we need to let them know about us, before we can send stuff to them
519
525
encode_and_send_msg ! ( msgs:: Init {
520
526
global_features: msgs:: GlobalFeatures :: new( ) ,
521
527
local_features,
522
528
} , 16 ) ;
529
+ if do_they_require_sync {
530
+ peer. last_synced_channel = 0 ;
531
+ let all_messages_tuple = self . message_handler . route_handler . get_next_announcements ( peer. last_synced_channel , 5 ) ;
532
+ while ( all_messages_tuple. 0 ) . len ( ) >0 {
533
+ for i in 0 ..all_messages_tuple. 0 . len ( ) {
534
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 0 . clone( ) , 256 ) ;
535
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 2 . clone( ) , 257 ) ;
536
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 3 . clone( ) , 257 ) ;
537
+ encode_and_send_msg ! ( all_messages_tuple. 0 [ i] . 1 . clone( ) , 258 ) ;
538
+ }
539
+ //we wait to see if the buffer clears before we retrieve more message to send
540
+ //we push up to 15 message out, 5 remaining means they have cleared 2/3rds
541
+ while peer. pending_outbound_buffer . len ( ) >5 {
542
+ thread:: sleep ( time:: Duration :: from_millis ( 50 ) ) ;
543
+ } ;
544
+ let all_messages_tuple = self . message_handler . route_handler . get_next_announcements ( peer. last_synced_channel , 5 ) ;
545
+ peer. last_synced_channel = all_messages_tuple. 1 ;
546
+ }
547
+ }
523
548
}
524
549
525
550
for msg in self . message_handler . chan_handler . peer_connected ( & peer. their_node_id . unwrap ( ) ) {
0 commit comments