Skip to content

Commit c3be839

Browse files
committed
add local_addr binding to connector service
1 parent 8d74cf3 commit c3be839

File tree

4 files changed

+99
-13
lines changed

4 files changed

+99
-13
lines changed

actix-rt/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pub mod net {
7676

7777
use tokio::io::{AsyncRead, AsyncWrite};
7878
pub use tokio::net::UdpSocket;
79-
pub use tokio::net::{TcpListener, TcpStream};
79+
pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
8080

8181
#[cfg(unix)]
8282
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};

actix-tls/src/connect/connect.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
fmt,
44
iter::{self, FromIterator as _},
55
mem,
6-
net::SocketAddr,
6+
net::{IpAddr, SocketAddr},
77
};
88

99
/// Parse a host into parts (hostname and port).
@@ -67,6 +67,7 @@ pub struct Connect<T> {
6767
pub(crate) req: T,
6868
pub(crate) port: u16,
6969
pub(crate) addr: ConnectAddrs,
70+
pub(crate) local_addr: Option<IpAddr>,
7071
}
7172

7273
impl<T: Address> Connect<T> {
@@ -78,6 +79,7 @@ impl<T: Address> Connect<T> {
7879
req,
7980
port: port.unwrap_or(0),
8081
addr: ConnectAddrs::None,
82+
local_addr: None,
8183
}
8284
}
8385

@@ -88,6 +90,7 @@ impl<T: Address> Connect<T> {
8890
req,
8991
port: 0,
9092
addr: ConnectAddrs::One(addr),
93+
local_addr: None,
9194
}
9295
}
9396

@@ -119,6 +122,12 @@ impl<T: Address> Connect<T> {
119122
self
120123
}
121124

125+
/// Set local_addr of connect.
126+
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
127+
self.local_addr = Some(addr.into());
128+
self
129+
}
130+
122131
/// Get hostname.
123132
pub fn hostname(&self) -> &str {
124133
self.req.hostname()
@@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option<u16>) {
285294

286295
#[cfg(test)]
287296
mod tests {
288-
use std::net::{IpAddr, Ipv4Addr};
297+
use std::net::Ipv4Addr;
289298

290299
use super::*;
291300

@@ -329,4 +338,13 @@ mod tests {
329338
let mut iter = ConnectAddrsIter::None;
330339
assert_eq!(iter.next(), None);
331340
}
341+
342+
#[test]
343+
fn test_local_addr() {
344+
let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]);
345+
assert_eq!(
346+
conn.local_addr.unwrap(),
347+
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
348+
)
349+
}
332350
}

actix-tls/src/connect/connector.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use std::{
22
collections::VecDeque,
33
future::Future,
44
io,
5-
net::SocketAddr,
5+
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
66
pin::Pin,
77
task::{Context, Poll},
88
};
99

10-
use actix_rt::net::TcpStream;
10+
use actix_rt::net::{TcpSocket, TcpStream};
1111
use actix_service::{Service, ServiceFactory};
1212
use futures_core::{future::LocalBoxFuture, ready};
1313
use log::{error, trace};
@@ -54,9 +54,14 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
5454

5555
fn call(&self, req: Connect<T>) -> Self::Future {
5656
let port = req.port();
57-
let Connect { req, addr, .. } = req;
58-
59-
TcpConnectorResponse::new(req, port, addr)
57+
let Connect {
58+
req,
59+
addr,
60+
local_addr,
61+
..
62+
} = req;
63+
64+
TcpConnectorResponse::new(req, port, local_addr, addr)
6065
}
6166
}
6267

@@ -65,14 +70,20 @@ pub enum TcpConnectorResponse<T> {
6570
Response {
6671
req: Option<T>,
6772
port: u16,
73+
local_addr: Option<IpAddr>,
6874
addrs: Option<VecDeque<SocketAddr>>,
6975
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
7076
},
7177
Error(Option<ConnectError>),
7278
}
7379

7480
impl<T: Address> TcpConnectorResponse<T> {
75-
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> {
81+
pub(crate) fn new(
82+
req: T,
83+
port: u16,
84+
local_addr: Option<IpAddr>,
85+
addr: ConnectAddrs,
86+
) -> TcpConnectorResponse<T> {
7687
if addr.is_none() {
7788
error!("TCP connector: unresolved connection address");
7889
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
@@ -90,15 +101,17 @@ impl<T: Address> TcpConnectorResponse<T> {
90101
ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
91102
req: Some(req),
92103
port,
104+
local_addr,
93105
addrs: None,
94-
stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
106+
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
95107
},
96108

97109
// when resolver returns multiple socket addr for request they would be popped from
98110
// front end of queue and returns with the first successful tcp connection.
99111
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
100112
req: Some(req),
101113
port,
114+
local_addr,
102115
addrs: Some(addrs),
103116
stream: None,
104117
},
@@ -116,6 +129,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
116129
TcpConnectorResponse::Response {
117130
req,
118131
port,
132+
local_addr,
119133
addrs,
120134
stream,
121135
} => loop {
@@ -148,11 +162,40 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
148162
// try to connect
149163
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
150164

165+
let fut = connect(addr, *local_addr);
151166
match stream {
152-
Some(rbf) => rbf.set(TcpStream::connect(addr)),
153-
None => *stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
167+
Some(rbf) => rbf.set(fut),
168+
None => *stream = Some(ReusableBoxFuture::new(fut)),
154169
}
155170
},
156171
}
157172
}
158173
}
174+
175+
async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
176+
// use local addr if connect asks for it.
177+
match local_addr {
178+
Some(ip_addr) => {
179+
let socket = match ip_addr {
180+
IpAddr::V4(ip_addr) => {
181+
let socket = TcpSocket::new_v4()?;
182+
let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
183+
socket.bind(addr)?;
184+
socket
185+
}
186+
IpAddr::V6(ip_addr) => {
187+
let socket = TcpSocket::new_v6()?;
188+
let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
189+
socket.bind(addr)?;
190+
socket
191+
}
192+
};
193+
194+
socket.set_reuseaddr(true)?;
195+
196+
socket.connect(addr).await
197+
}
198+
199+
None => TcpStream::connect(addr).await,
200+
}
201+
}

actix-tls/tests/test_connect.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::io;
1+
use std::{
2+
io,
3+
net::{IpAddr, Ipv4Addr},
4+
};
25

36
use actix_codec::{BytesCodec, Framed};
47
use actix_rt::net::TcpStream;
@@ -125,3 +128,25 @@ async fn test_rustls_uri() {
125128
let con = conn.call(addr.into()).await.unwrap();
126129
assert_eq!(con.peer_addr().unwrap(), srv.addr());
127130
}
131+
132+
#[actix_rt::test]
133+
async fn test_local_addr() {
134+
let srv = TestServer::with(|| {
135+
fn_service(|io: TcpStream| async {
136+
let mut framed = Framed::new(io, BytesCodec);
137+
framed.send(Bytes::from_static(b"test")).await?;
138+
Ok::<_, io::Error>(())
139+
})
140+
});
141+
142+
let conn = actix_connect::default_connector();
143+
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
144+
145+
let (con, _) = conn
146+
.call(Connect::with_addr("10", srv.addr()).set_local_addr(local))
147+
.await
148+
.unwrap()
149+
.into_parts();
150+
151+
assert_eq!(con.local_addr().unwrap().ip(), local)
152+
}

0 commit comments

Comments
 (0)