Skip to content

Commit 83eb9f6

Browse files
authored
Add TCP probe to target port if no readiness probe is found in the API spec (#2379)
1 parent 2203a92 commit 83eb9f6

File tree

3 files changed

+39
-15
lines changed

3 files changed

+39
-15
lines changed

cmd/proxy/main.go

+18-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"fmt"
2223
"net"
2324
"net/http"
2425
"os"
@@ -48,6 +49,7 @@ func main() {
4849
userContainerPort int
4950
maxConcurrency int
5051
maxQueueLength int
52+
hasTCPProbe bool
5153
clusterConfigPath string
5254
)
5355

@@ -56,6 +58,7 @@ func main() {
5658
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
5759
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
5860
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
61+
flag.BoolVar(&hasTCPProbe, "has-tcp-probe", false, "tcp probe to the user-provided container port")
5962
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
6063
flag.Parse()
6164

@@ -142,7 +145,7 @@ func main() {
142145

143146
adminHandler := http.NewServeMux()
144147
adminHandler.Handle("/metrics", promStats)
145-
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, log))
148+
adminHandler.Handle("/healthz", readinessTCPHandler(userContainerPort, hasTCPProbe, log))
146149

147150
servers := map[string]*http.Server{
148151
"proxy": {
@@ -201,19 +204,22 @@ func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
201204
os.Exit(1)
202205
}
203206

204-
func readinessTCPHandler(port int, logger *zap.SugaredLogger) http.HandlerFunc {
207+
func readinessTCPHandler(port int, enableTCPProbe bool, logger *zap.SugaredLogger) http.HandlerFunc {
205208
return func(w http.ResponseWriter, r *http.Request) {
206-
timeout := time.Duration(1) * time.Second
207-
address := net.JoinHostPort("localhost", strconv.FormatInt(int64(port), 10))
208-
209-
conn, err := net.DialTimeout("tcp", address, timeout)
210-
if err != nil {
211-
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
212-
w.WriteHeader(http.StatusInternalServerError)
213-
_, _ = w.Write([]byte("unhealthy"))
214-
return
209+
if enableTCPProbe {
210+
ctx := r.Context()
211+
address := net.JoinHostPort("localhost", fmt.Sprintf("%d", port))
212+
213+
var d net.Dialer
214+
conn, err := d.DialContext(ctx, "tcp", address)
215+
if err != nil {
216+
logger.Warn(errors.Wrap(err, "TCP probe to user-provided container port failed"))
217+
w.WriteHeader(http.StatusInternalServerError)
218+
_, _ = w.Write([]byte("unhealthy"))
219+
return
220+
}
221+
_ = conn.Close()
215222
}
216-
_ = conn.Close()
217223

218224
w.WriteHeader(http.StatusOK)
219225
_, _ = w.Write([]byte("healthy"))

pkg/workloads/helpers.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
8383
if container == nil {
8484
continue
8585
}
86-
8786
if container.ReadinessProbe != nil {
8887
probes[container.Name] = *GetProbeSpec(container.ReadinessProbe)
8988
}
@@ -92,6 +91,21 @@ func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[st
9291
return probes
9392
}
9493

94+
func HasReadinessProbesTargetingPort(containers []*userconfig.Container, targetPort int32) bool {
95+
for _, container := range containers {
96+
if container == nil || container.ReadinessProbe == nil {
97+
continue
98+
}
99+
100+
probe := container.ReadinessProbe
101+
if (probe.TCPSocket != nil && probe.TCPSocket.Port == targetPort) ||
102+
probe.HTTPGet != nil && probe.HTTPGet.Port == targetPort {
103+
return true
104+
}
105+
}
106+
return false
107+
}
108+
95109
func BaseClusterEnvVars() []kcore.EnvFromSource {
96110
envVars := []kcore.EnvFromSource{
97111
{

pkg/workloads/k8s.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co
172172
}
173173

174174
func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
175+
proxyHasTCPProbe := !HasReadinessProbesTargetingPort(api.Pod.Containers, *api.Pod.Port)
176+
175177
return kcore.Container{
176178
Name: ProxyContainerName,
177179
Image: config.ClusterConfig.ImageProxy,
@@ -189,6 +191,8 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
189191
s.Int32(int32(api.Pod.MaxConcurrency)),
190192
"--max-queue-length",
191193
s.Int32(int32(api.Pod.MaxQueueLength)),
194+
"--has-tcp-probe",
195+
s.Bool(proxyHasTCPProbe),
192196
},
193197
Ports: []kcore.ContainerPort{
194198
{Name: consts.AdminPortName, ContainerPort: consts.AdminPortInt32},
@@ -213,10 +217,10 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
213217
},
214218
},
215219
InitialDelaySeconds: 1,
216-
TimeoutSeconds: 1,
220+
TimeoutSeconds: 3,
217221
PeriodSeconds: 10,
218222
SuccessThreshold: 1,
219-
FailureThreshold: 1,
223+
FailureThreshold: 3,
220224
},
221225
}, ClusterConfigVolume()
222226
}

0 commit comments

Comments
 (0)