Skip to content

Commit 09776f0

Browse files
committed
feat(client): HttpConnector::tcp_keepalive_interval and HttpConnector::tcp_keepalive_retries (#2991)
If the platform supports setting the options, otherwise it's a noop. Port from hyperium/hyper#2991 #9
1 parent 85aade4 commit 09776f0

File tree

1 file changed

+136
-11
lines changed

1 file changed

+136
-11
lines changed

src/client/connect/http.rs

Lines changed: 136 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::time::Duration;
1212
use futures_util::future::Either;
1313
use http::uri::{Scheme, Uri};
1414
use pin_project_lite::pin_project;
15+
use socket2::TcpKeepalive;
1516
use tokio::net::{TcpSocket, TcpStream};
1617
use tokio::time::Sleep;
1718
use tracing::{debug, trace, warn};
@@ -67,7 +68,7 @@ struct Config {
6768
connect_timeout: Option<Duration>,
6869
enforce_http: bool,
6970
happy_eyeballs_timeout: Option<Duration>,
70-
keep_alive_timeout: Option<Duration>,
71+
tcp_keepalive_config: TcpKeepaliveConfig,
7172
local_address_ipv4: Option<Ipv4Addr>,
7273
local_address_ipv6: Option<Ipv6Addr>,
7374
nodelay: bool,
@@ -76,6 +77,68 @@ struct Config {
7677
recv_buffer_size: Option<usize>,
7778
}
7879

80+
#[derive(Default, Debug, Clone, Copy)]
81+
struct TcpKeepaliveConfig {
82+
time: Option<Duration>,
83+
interval: Option<Duration>,
84+
retries: Option<u32>,
85+
}
86+
87+
impl TcpKeepaliveConfig {
88+
/// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration.
89+
fn into_tcpkeepalive(self) -> Option<TcpKeepalive> {
90+
let mut dirty = false;
91+
let mut ka = TcpKeepalive::new();
92+
if let Some(time) = self.time {
93+
ka = ka.with_time(time);
94+
dirty = true
95+
}
96+
if let Some(interval) = self.interval {
97+
ka = Self::ka_with_interval(ka, interval, &mut dirty)
98+
};
99+
if let Some(retries) = self.retries {
100+
ka = Self::ka_with_retries(ka, retries, &mut dirty)
101+
};
102+
if dirty {
103+
Some(ka)
104+
} else {
105+
None
106+
}
107+
}
108+
109+
#[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
110+
fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive {
111+
*dirty = true;
112+
ka.with_interval(interval)
113+
}
114+
115+
#[cfg(any(target_os = "openbsd", target_os = "redox", target_os = "solaris"))]
116+
fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive {
117+
ka // no-op as keepalive interval is not supported on this platform
118+
}
119+
120+
#[cfg(not(any(
121+
target_os = "openbsd",
122+
target_os = "redox",
123+
target_os = "solaris",
124+
target_os = "windows"
125+
)))]
126+
fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive {
127+
*dirty = true;
128+
ka.with_retries(retries)
129+
}
130+
131+
#[cfg(any(
132+
target_os = "openbsd",
133+
target_os = "redox",
134+
target_os = "solaris",
135+
target_os = "windows"
136+
))]
137+
fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive {
138+
ka // no-op as keepalive retries is not supported on this platform
139+
}
140+
}
141+
79142
// ===== impl HttpConnector =====
80143

81144
impl HttpConnector {
@@ -95,7 +158,7 @@ impl<R> HttpConnector<R> {
95158
connect_timeout: None,
96159
enforce_http: true,
97160
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
98-
keep_alive_timeout: None,
161+
tcp_keepalive_config: TcpKeepaliveConfig::default(),
99162
local_address_ipv4: None,
100163
local_address_ipv6: None,
101164
nodelay: false,
@@ -115,14 +178,28 @@ impl<R> HttpConnector<R> {
115178
self.config_mut().enforce_http = is_enforced;
116179
}
117180

118-
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
181+
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration
182+
/// to remain idle before sending TCP keepalive probes.
119183
///
120-
/// If `None`, the option will not be set.
184+
/// If `None`, keepalive is disabled.
121185
///
122186
/// Default is `None`.
123187
#[inline]
124-
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
125-
self.config_mut().keep_alive_timeout = dur;
188+
pub fn set_keepalive(&mut self, time: Option<Duration>) {
189+
self.config_mut().tcp_keepalive_config.time = time;
190+
}
191+
192+
/// Set the duration between two successive TCP keepalive retransmissions,
193+
/// if acknowledgement to the previous keepalive transmission is not received.
194+
#[inline]
195+
pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) {
196+
self.config_mut().tcp_keepalive_config.interval = interval;
197+
}
198+
199+
/// Set the number of retransmissions to be carried out before declaring that remote end is not available.
200+
#[inline]
201+
pub fn set_keepalive_retries(&mut self, retries: Option<u32>) {
202+
self.config_mut().tcp_keepalive_config.retries = retries;
126203
}
127204

128205
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
@@ -577,7 +654,7 @@ fn connect(
577654
// TODO(eliza): if Tokio's `TcpSocket` gains support for setting the
578655
// keepalive timeout, it would be nice to use that instead of socket2,
579656
// and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance...
580-
use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type};
657+
use socket2::{Domain, Protocol, Socket, Type};
581658
use std::convert::TryInto;
582659

583660
let domain = Domain::for_address(*addr);
@@ -590,9 +667,8 @@ fn connect(
590667
.set_nonblocking(true)
591668
.map_err(ConnectError::m("tcp set_nonblocking error"))?;
592669

593-
if let Some(dur) = config.keep_alive_timeout {
594-
let conf = TcpKeepalive::new().with_time(dur);
595-
if let Err(e) = socket.set_tcp_keepalive(&conf) {
670+
if let Some(tcp_keepalive) = &config.tcp_keepalive_config.into_tcpkeepalive() {
671+
if let Err(e) = socket.set_tcp_keepalive(tcp_keepalive) {
596672
warn!("tcp set_keepalive error: {}", e);
597673
}
598674
}
@@ -701,6 +777,8 @@ mod tests {
701777

702778
use ::http::Uri;
703779

780+
use crate::client::connect::http::TcpKeepaliveConfig;
781+
704782
use super::super::sealed::{Connect, ConnectSvc};
705783
use super::{Config, ConnectError, HttpConnector};
706784

@@ -920,7 +998,7 @@ mod tests {
920998
local_address_ipv4: None,
921999
local_address_ipv6: None,
9221000
connect_timeout: None,
923-
keep_alive_timeout: None,
1001+
tcp_keepalive_config: TcpKeepaliveConfig::default(),
9241002
happy_eyeballs_timeout: Some(fallback_timeout),
9251003
nodelay: false,
9261004
reuse_address: false,
@@ -989,4 +1067,51 @@ mod tests {
9891067
(reachable, duration)
9901068
}
9911069
}
1070+
1071+
use std::time::Duration;
1072+
1073+
#[test]
1074+
fn no_tcp_keepalive_config() {
1075+
assert!(TcpKeepaliveConfig::default().into_tcpkeepalive().is_none());
1076+
}
1077+
1078+
#[test]
1079+
fn tcp_keepalive_time_config() {
1080+
let mut kac = TcpKeepaliveConfig::default();
1081+
kac.time = Some(Duration::from_secs(60));
1082+
if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1083+
assert!(format!("{tcp_keepalive:?}").contains("time: Some(60s)"));
1084+
} else {
1085+
panic!("test failed");
1086+
}
1087+
}
1088+
1089+
#[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
1090+
#[test]
1091+
fn tcp_keepalive_interval_config() {
1092+
let mut kac = TcpKeepaliveConfig::default();
1093+
kac.interval = Some(Duration::from_secs(1));
1094+
if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1095+
assert!(format!("{tcp_keepalive:?}").contains("interval: Some(1s)"));
1096+
} else {
1097+
panic!("test failed");
1098+
}
1099+
}
1100+
1101+
#[cfg(not(any(
1102+
target_os = "openbsd",
1103+
target_os = "redox",
1104+
target_os = "solaris",
1105+
target_os = "windows"
1106+
)))]
1107+
#[test]
1108+
fn tcp_keepalive_retries_config() {
1109+
let mut kac = TcpKeepaliveConfig::default();
1110+
kac.retries = Some(3);
1111+
if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1112+
assert!(format!("{tcp_keepalive:?}").contains("retries: Some(3)"));
1113+
} else {
1114+
panic!("test failed");
1115+
}
1116+
}
9921117
}

0 commit comments

Comments
 (0)