Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
pin-project = "1.0"
tower-service = "0.3"
tokio = { version = "0.3", features = ["sync", "stream"] }
tokio = { version = "0.3.4", features = ["sync", "stream"] }
want = "0.3"

# Optional
Expand Down
56 changes: 29 additions & 27 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,29 +569,30 @@ fn connect(
connect_timeout: Option<Duration>,
) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> {
// TODO(eliza): if Tokio's `TcpSocket` gains support for setting the
// keepalive timeout and send/recv buffer size, it would be nice to use that
// instead of socket2, and avoid the unsafe `into_raw_fd`/`from_raw_fd`
// dance...
// keepalive timeout, it would be nice to use that instead of socket2,
// and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance...
use socket2::{Domain, Protocol, Socket, Type};
use std::convert::TryInto;

let domain = match *addr {
SocketAddr::V4(_) => Domain::ipv4(),
SocketAddr::V6(_) => Domain::ipv6(),
};
let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))
.map_err(ConnectError::m("tcp open error"))?;

if config.reuse_address {
socket
.set_reuse_address(true)
.map_err(ConnectError::m("tcp set_reuse_address error"))?;
}

// When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is
// responsible for ensuring O_NONBLOCK is set.
socket
.set_nonblocking(true)
.map_err(ConnectError::m("tcp set_nonblocking error"))?;

if let Some(dur) = config.keep_alive_timeout {
socket
.set_keepalive(Some(dur))
.map_err(ConnectError::m("tcp set_keepalive error"))?;
}

Comment on lines +590 to +595
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar should I just remove keepalive from the hyper API instead? That way, we could just drop the socket2 dep entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humm.... maybe. It is a confusing property, but people do enable it to notice when a connection has died... 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...it's looking like Tokio will probably not add support for more sophisticated configuration (e.g. keepalive retries/probe interval/etc). But it would be nice if it at least had an API for enabling/disabling keepalive and maybe setting the initial timeout before keepalive probes are sent, which should be more portable.

bind_local_address(
&socket,
addr,
Expand All @@ -600,24 +601,6 @@ fn connect(
)
.map_err(ConnectError::m("tcp bind local error"))?;

if let Some(dur) = config.keep_alive_timeout {
socket
.set_keepalive(Some(dur))
.map_err(ConnectError::m("tcp set_keepalive error"))?;
}

if let Some(size) = config.send_buffer_size {
socket
.set_send_buffer_size(size)
.map_err(ConnectError::m("tcp set_send_buffer_size error"))?;
}

if let Some(size) = config.recv_buffer_size {
socket
.set_recv_buffer_size(size)
.map_err(ConnectError::m("tcp set_recv_buffer_size error"))?;
}

#[cfg(unix)]
let socket = unsafe {
// Safety: `from_raw_fd` is only safe to call if ownership of the raw
Expand All @@ -636,6 +619,25 @@ fn connect(
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
TcpSocket::from_raw_socket(socket.into_raw_socket())
};

if config.reuse_address {
socket
.set_reuseaddr(true)
.map_err(ConnectError::m("tcp set_reuse_address error"))?;
}

if let Some(size) = config.send_buffer_size {
socket
.set_send_buffer_size(size.try_into().unwrap_or(std::u32::MAX))
.map_err(ConnectError::m("tcp set_send_buffer_size error"))?;
}

if let Some(size) = config.recv_buffer_size {
socket
.set_recv_buffer_size(size.try_into().unwrap_or(std::u32::MAX))
.map_err(ConnectError::m("tcp set_recv_buffer_size error"))?;
}

let connect = socket.connect(*addr);
Ok(async move {
match connect_timeout {
Expand Down