Skip to content

Commit 63dc59a

Browse files
committed
Fix healthcheck interceptors
Signed-off-by: alanprot <[email protected]>
1 parent d08f93b commit 63dc59a

File tree

3 files changed

+42
-23
lines changed

3 files changed

+42
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
1919
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
2020
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
21-
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
21+
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
2222
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
2323

2424
## 1.18.0 2024-09-03

pkg/util/grpcclient/health_check.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"io"
89
"sync"
910
"time"
1011

@@ -55,15 +56,17 @@ type HealthCheckInterceptors struct {
5556
activeInstances map[string]*healthCheckEntry
5657

5758
instanceGcTimeout time.Duration
58-
healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient
59+
healthClientFactory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)
5960
}
6061

6162
func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
6263
h := &HealthCheckInterceptors{
63-
logger: logger,
64-
instanceGcTimeout: 2 * time.Minute,
65-
healthClientFactory: grpc_health_v1.NewHealthClient,
66-
activeInstances: make(map[string]*healthCheckEntry),
64+
logger: logger,
65+
instanceGcTimeout: 2 * time.Minute,
66+
healthClientFactory: func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
67+
return grpc_health_v1.NewHealthClient(cc), cc
68+
},
69+
activeInstances: make(map[string]*healthCheckEntry),
6770
}
6871

6972
h.Service = services.
@@ -107,16 +110,6 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
107110
func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
108111
level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances()))
109112
for _, instance := range h.registeredInstances() {
110-
dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil)
111-
if err != nil {
112-
return err
113-
}
114-
conn, err := grpc.NewClient(instance.address, dialOpts...)
115-
c := h.healthClientFactory(conn)
116-
if err != nil {
117-
return err
118-
}
119-
120113
if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
121114
h.Lock()
122115
delete(h.activeInstances, instance.address)
@@ -131,10 +124,23 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
131124
instance.lastCheckTime.Store(time.Now())
132125

133126
go func(i *healthCheckEntry) {
134-
if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
127+
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
128+
if err != nil {
129+
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
130+
return
131+
}
132+
conn, err := grpc.NewClient(i.address, dialOpts...)
133+
if err != nil {
134+
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
135+
return
136+
}
137+
138+
client, closer := h.healthClientFactory(conn)
139+
140+
if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
135141
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
136142
}
137-
if err := conn.Close(); err != nil {
143+
if err := closer.Close(); err != nil {
138144
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
139145
}
140146
}(instance)

pkg/util/grpcclient/health_check_test.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"testing"
89
"time"
910

@@ -20,7 +21,13 @@ import (
2021

2122
type healthClientMock struct {
2223
grpc_health_v1.HealthClient
23-
err atomic.Error
24+
err atomic.Error
25+
open atomic.Bool
26+
}
27+
28+
func (h *healthClientMock) Close() error {
29+
h.open.Store(false)
30+
return nil
2431
}
2532

2633
func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
@@ -36,8 +43,9 @@ func TestNewHealthCheckService(t *testing.T) {
3643
i.instanceGcTimeout = time.Second * 5
3744

3845
hMock := &healthClientMock{}
39-
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
40-
return hMock
46+
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
47+
hMock.open.Store(true)
48+
return hMock, hMock
4149
}
4250

4351
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -79,6 +87,8 @@ func TestNewHealthCheckService(t *testing.T) {
7987
cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} {
8088
return len(i.registeredInstances())
8189
})
90+
91+
require.False(t, hMock.open.Load())
8292
}
8393

8494
func TestNewHealthCheckInterceptors(t *testing.T) {
@@ -92,8 +102,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
92102
Timeout: time.Second,
93103
},
94104
}
95-
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
96-
return hMock
105+
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
106+
hMock.open.Store(true)
107+
return hMock, hMock
97108
}
98109

99110
ui := i.UnaryHealthCheckInterceptor(&cfg)
@@ -113,6 +124,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
113124

114125
// first health check
115126
require.NoError(t, i.iteration(context.Background()))
127+
require.False(t, hMock.open.Load())
116128

117129
//Should second call even with error
118130
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
@@ -121,6 +133,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
121133

122134
// Second Healthcheck -> should mark as unhealthy
123135
require.NoError(t, i.iteration(context.Background()))
136+
require.False(t, hMock.open.Load())
124137

125138
cortex_testutil.Poll(t, time.Second, true, func() interface{} {
126139
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)

0 commit comments

Comments
 (0)