@@ -75,11 +75,11 @@ type Network interface {
75
75
76
76
// Should only be called once, will run until either a fatal error occurs,
77
77
// or the network is closed.
78
- Dispatch () error
78
+ Dispatch (ctx context. Context ) error
79
79
80
80
// Attempt to connect to this IP. The network will never stop attempting to
81
81
// connect to this ID.
82
- ManuallyTrack (nodeID ids.NodeID , ip netip.AddrPort )
82
+ ManuallyTrack (ctx context. Context , nodeID ids.NodeID , ip netip.AddrPort )
83
83
84
84
// PeerInfo returns information about peers. If [nodeIDs] is empty, returns
85
85
// info about all peers that have finished the handshake. Otherwise, returns
@@ -506,10 +506,13 @@ func (n *network) AllowConnection(nodeID ids.NodeID) bool {
506
506
return areWeAPrimaryNetworkAValidator || n .ipTracker .WantsConnection (nodeID )
507
507
}
508
508
509
- func (n * network ) Track (claimedIPPorts []* ips.ClaimedIPPort ) error {
509
+ func (n * network ) Track (
510
+ ctx context.Context ,
511
+ claimedIPPorts []* ips.ClaimedIPPort ,
512
+ ) error {
510
513
_ , areWeAPrimaryNetworkAValidator := n .config .Validators .GetValidator (constants .PrimaryNetworkID , n .config .MyNodeID )
511
514
for _ , ip := range claimedIPPorts {
512
- if err := n .track (ip , areWeAPrimaryNetworkAValidator ); err != nil {
515
+ if err := n .track (ctx , ip , areWeAPrimaryNetworkAValidator ); err != nil {
513
516
return err
514
517
}
515
518
}
@@ -521,17 +524,17 @@ func (n *network) Track(claimedIPPorts []*ips.ClaimedIPPort) error {
521
524
// It is guaranteed that [Connected] will not be called with [nodeID] after this
522
525
// call. Note that this is from the perspective of a single peer object, because
523
526
// a peer with the same ID can reconnect to this network instance.
524
- func (n * network ) Disconnected (nodeID ids.NodeID ) {
527
+ func (n * network ) Disconnected (ctx context. Context , nodeID ids.NodeID ) {
525
528
n .peersLock .RLock ()
526
529
_ , connecting := n .connectingPeers .GetByID (nodeID )
527
530
peer , connected := n .connectedPeers .GetByID (nodeID )
528
531
n .peersLock .RUnlock ()
529
532
530
533
if connecting {
531
- n .disconnectedFromConnecting (nodeID )
534
+ n .disconnectedFromConnecting (ctx , nodeID )
532
535
}
533
536
if connected {
534
- n .disconnectedFromConnected (peer , nodeID )
537
+ n .disconnectedFromConnected (ctx , peer , nodeID )
535
538
}
536
539
}
537
540
@@ -599,7 +602,7 @@ func (n *network) Peers(
599
602
600
603
// Dispatch starts accepting connections from other nodes attempting to connect
601
604
// to this node.
602
- func (n * network ) Dispatch () error {
605
+ func (n * network ) Dispatch (ctx context. Context ) error {
603
606
go n .runTimers () // Periodically perform operations
604
607
go n .inboundConnUpgradeThrottler .Dispatch ()
605
608
for n .onCloseCtx .Err () == nil { // Continuously accept new connections
@@ -647,7 +650,7 @@ func (n *network) Dispatch() error {
647
650
zap .Stringer ("peerIP" , ip ),
648
651
)
649
652
650
- if err := n .upgrade (conn , n .serverUpgrader , true ); err != nil {
653
+ if err := n .upgrade (ctx , conn , n .serverUpgrader , true ); err != nil {
651
654
n .peerConfig .Log .Verbo ("failed to upgrade connection" ,
652
655
zap .String ("direction" , "inbound" ),
653
656
zap .Error (err ),
@@ -670,7 +673,11 @@ func (n *network) Dispatch() error {
670
673
return errs .Err
671
674
}
672
675
673
- func (n * network ) ManuallyTrack (nodeID ids.NodeID , ip netip.AddrPort ) {
676
+ func (n * network ) ManuallyTrack (
677
+ ctx context.Context ,
678
+ nodeID ids.NodeID ,
679
+ ip netip.AddrPort ,
680
+ ) {
674
681
n .ipTracker .ManuallyTrack (nodeID )
675
682
676
683
n .peersLock .Lock ()
@@ -688,11 +695,15 @@ func (n *network) ManuallyTrack(nodeID ids.NodeID, ip netip.AddrPort) {
688
695
if ! isTracked {
689
696
tracked := newTrackedIP (ip )
690
697
n .trackedIPs [nodeID ] = tracked
691
- n .dial (nodeID , tracked )
698
+ n .dial (ctx , nodeID , tracked )
692
699
}
693
700
}
694
701
695
- func (n * network ) track (ip * ips.ClaimedIPPort , trackAllSubnets bool ) error {
702
+ func (n * network ) track (
703
+ ctx context.Context ,
704
+ ip * ips.ClaimedIPPort ,
705
+ trackAllSubnets bool ,
706
+ ) error {
696
707
// To avoid signature verification when the IP isn't needed, we
697
708
// optimistically filter out IPs. This can result in us not tracking an IP
698
709
// that we otherwise would have. This case can only happen if the node
@@ -741,7 +752,7 @@ func (n *network) track(ip *ips.ClaimedIPPort, trackAllSubnets bool) error {
741
752
tracked = newTrackedIP (ip .AddrPort )
742
753
}
743
754
n .trackedIPs [ip .NodeID ] = tracked
744
- n .dial (ip .NodeID , tracked )
755
+ n .dial (ctx , ip .NodeID , tracked )
745
756
return nil
746
757
}
747
758
@@ -834,7 +845,10 @@ func (n *network) samplePeers(
834
845
)
835
846
}
836
847
837
- func (n * network ) disconnectedFromConnecting (nodeID ids.NodeID ) {
848
+ func (n * network ) disconnectedFromConnecting (
849
+ ctx context.Context ,
850
+ nodeID ids.NodeID ,
851
+ ) {
838
852
n .peersLock .Lock ()
839
853
defer n .peersLock .Unlock ()
840
854
@@ -846,7 +860,7 @@ func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) {
846
860
if n .ipTracker .WantsConnection (nodeID ) {
847
861
tracked := tracked .trackNewIP (tracked .ip )
848
862
n .trackedIPs [nodeID ] = tracked
849
- n .dial (nodeID , tracked )
863
+ n .dial (ctx , nodeID , tracked )
850
864
} else {
851
865
tracked .stopTracking ()
852
866
delete (n .trackedIPs , nodeID )
@@ -856,7 +870,11 @@ func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) {
856
870
n .metrics .disconnected .Inc ()
857
871
}
858
872
859
- func (n * network ) disconnectedFromConnected (peer peer.Peer , nodeID ids.NodeID ) {
873
+ func (n * network ) disconnectedFromConnected (
874
+ ctx context.Context ,
875
+ peer peer.Peer ,
876
+ nodeID ids.NodeID ,
877
+ ) {
860
878
n .ipTracker .Disconnected (nodeID )
861
879
n .router .Disconnected (nodeID )
862
880
@@ -869,7 +887,7 @@ func (n *network) disconnectedFromConnected(peer peer.Peer, nodeID ids.NodeID) {
869
887
if ip , wantsConnection := n .ipTracker .GetIP (nodeID ); wantsConnection {
870
888
tracked := newTrackedIP (ip .AddrPort )
871
889
n .trackedIPs [nodeID ] = tracked
872
- n .dial (nodeID , tracked )
890
+ n .dial (ctx , nodeID , tracked )
873
891
}
874
892
875
893
n .metrics .markDisconnected (peer )
@@ -894,7 +912,7 @@ func (n *network) disconnectedFromConnected(peer peer.Peer, nodeID ids.NodeID) {
894
912
// If initiating a connection to [ip] fails, then dial will reattempt. However,
895
913
// there is a randomized exponential backoff to avoid spamming connection
896
914
// attempts.
897
- func (n * network ) dial (nodeID ids.NodeID , ip * trackedIP ) {
915
+ func (n * network ) dial (ctx context. Context , nodeID ids.NodeID , ip * trackedIP ) {
898
916
n .peerConfig .Log .Verbo ("attempting to dial node" ,
899
917
zap .Stringer ("nodeID" , nodeID ),
900
918
zap .Stringer ("ip" , ip .ip ),
@@ -994,7 +1012,7 @@ func (n *network) dial(nodeID ids.NodeID, ip *trackedIP) {
994
1012
zap .Stringer ("peerIP" , ip .ip ),
995
1013
)
996
1014
997
- err = n .upgrade (conn , n .clientUpgrader , false )
1015
+ err = n .upgrade (ctx , conn , n .clientUpgrader , false )
998
1016
if err != nil {
999
1017
n .peerConfig .Log .Verbo (
1000
1018
"failed to upgrade, attempting again" ,
@@ -1017,7 +1035,12 @@ func (n *network) dial(nodeID ids.NodeID, ip *trackedIP) {
1017
1035
// If the connection is desired by the node, then the resulting upgraded
1018
1036
// connection will be used to create a new peer. Otherwise the connection will
1019
1037
// be immediately closed.
1020
- func (n * network ) upgrade (conn net.Conn , upgrader peer.Upgrader , isIngress bool ) error {
1038
+ func (n * network ) upgrade (
1039
+ ctx context.Context ,
1040
+ conn net.Conn ,
1041
+ upgrader peer.Upgrader ,
1042
+ isIngress bool ,
1043
+ ) error {
1021
1044
upgradeTimeout := n .peerConfig .Clock .Time ().Add (n .config .ReadHandshakeTimeout )
1022
1045
if err := conn .SetReadDeadline (upgradeTimeout ); err != nil {
1023
1046
_ = conn .Close ()
@@ -1027,7 +1050,7 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader, isIngress bool)
1027
1050
return err
1028
1051
}
1029
1052
1030
- nodeID , tlsConn , cert , err := upgrader .Upgrade (conn )
1053
+ nodeID , tlsConn , cert , err := upgrader .Upgrade (ctx , conn )
1031
1054
if err != nil {
1032
1055
_ = conn .Close ()
1033
1056
n .peerConfig .Log .Verbo ("failed to upgrade connection" ,
@@ -1107,6 +1130,7 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader, isIngress bool)
1107
1130
// same [peerConfig.InboundMsgThrottler]. This is guaranteed by the above
1108
1131
// de-duplications for [connectingPeers] and [connectedPeers].
1109
1132
peer := peer .Start (
1133
+ ctx ,
1110
1134
n .peerConfig ,
1111
1135
tlsConn ,
1112
1136
cert ,
0 commit comments