Skip to content

Commit 93c84ca

Browse files
committed
lint
Signed-off-by: alanprot <[email protected]>
1 parent 99fa6dc commit 93c84ca

File tree

5 files changed

+47
-40
lines changed

5 files changed

+47
-40
lines changed

pkg/querier/store_gateway_client.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"github.com/cortexproject/cortex/pkg/util/tls"
1818
)
1919

20-
func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
20+
func newStoreGatewayClientFactory(clientCfg grpcclient.ConfigWithHealthCheck, reg prometheus.Registerer) client.PoolFactory {
2121
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
2222
Namespace: "cortex",
2323
Name: "storegateway_client_request_duration_seconds",
@@ -31,7 +31,7 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re
3131
}
3232
}
3333

34-
func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
34+
func dialStoreGatewayClient(clientCfg grpcclient.ConfigWithHealthCheck, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
3535
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
3636
if err != nil {
3737
return nil, err
@@ -69,16 +69,18 @@ func (c *storeGatewayClient) RemoteAddress() string {
6969

7070
func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool {
7171
// We prefer sane defaults instead of exposing further config options.
72-
clientCfg := grpcclient.Config{
73-
MaxRecvMsgSize: 100 << 20,
74-
MaxSendMsgSize: 16 << 20,
75-
GRPCCompression: clientConfig.GRPCCompression,
76-
HealthCheckConfig: clientConfig.HealthCheckConfig,
77-
RateLimit: 0,
78-
RateLimitBurst: 0,
79-
BackoffOnRatelimits: false,
80-
TLSEnabled: clientConfig.TLSEnabled,
81-
TLS: clientConfig.TLS,
72+
clientCfg := grpcclient.ConfigWithHealthCheck{
73+
Config: grpcclient.Config{
74+
MaxRecvMsgSize: 100 << 20,
75+
MaxSendMsgSize: 16 << 20,
76+
GRPCCompression: clientConfig.GRPCCompression,
77+
RateLimit: 0,
78+
RateLimitBurst: 0,
79+
BackoffOnRatelimits: false,
80+
TLSEnabled: clientConfig.TLSEnabled,
81+
TLS: clientConfig.TLS,
82+
},
83+
HealthCheckConfig: clientConfig.HealthCheckConfig,
8284
}
8385
poolCfg := client.PoolConfig{
8486
CheckInterval: time.Minute,

pkg/querier/store_gateway_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) {
3636

3737
// Create a client factory and query back the mocked service
3838
// with different clients.
39-
cfg := grpcclient.Config{}
39+
cfg := grpcclient.ConfigWithHealthCheck{}
4040
flagext.DefaultValues(&cfg)
4141

4242
reg := prometheus.NewPedanticRegistry()

pkg/util/grpcclient/grpcclient.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ type Config struct {
2929
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
3030
BackoffConfig backoff.Config `yaml:"backoff_config"`
3131

32-
HealthCheckConfig HealthCheckConfig `yaml:"-"`
33-
3432
TLSEnabled bool `yaml:"tls_enabled"`
3533
TLS tls.ClientConfig `yaml:",inline"`
3634
SignWriteRequestsEnabled bool `yaml:"-"`
@@ -87,6 +85,15 @@ func (cfg *Config) CallOptions() []grpc.CallOption {
8785
return opts
8886
}
8987

88+
func (cfg *ConfigWithHealthCheck) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
89+
if cfg.HealthCheckConfig.HealthCheckInterceptors != nil {
90+
unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(cfg))
91+
streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(cfg))
92+
}
93+
94+
return cfg.Config.DialOption(unaryClientInterceptors, streamClientInterceptors)
95+
}
96+
9097
// DialOption returns the config as a grpc.DialOptions.
9198
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
9299
var opts []grpc.DialOption
@@ -103,10 +110,6 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
103110
if cfg.RateLimit > 0 {
104111
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
105112
}
106-
if cfg.HealthCheckConfig.HealthCheckInterceptors != nil {
107-
unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(*cfg))
108-
streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(*cfg))
109-
}
110113

111114
if cfg.SignWriteRequestsEnabled {
112115
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)

pkg/util/grpcclient/health_check.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ type HealthCheckConfig struct {
3232

3333
// RegisterFlagsWithPrefix for Config.
3434
func (cfg *HealthCheckConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
35-
f.IntVar(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
35+
f.IntVar(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 3, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
3636
f.DurationVar(&cfg.Timeout, prefix+".timeout", 1*time.Second, "The amount of time during which no response from a target means a failed health check.")
37-
f.DurationVar(&cfg.Interval, prefix+".interval", 1*time.Second, "The approximate amount of time between health checks of an individual target.")
37+
f.DurationVar(&cfg.Interval, prefix+".interval", 5*time.Second, "The approximate amount of time between health checks of an individual target.")
3838
}
3939

4040
type healthCheckEntry struct {
4141
address string
42-
clientConfig Config
42+
clientConfig *ConfigWithHealthCheck
4343

4444
sync.RWMutex
4545
unhealthyCount int
@@ -75,14 +75,16 @@ func (e *healthCheckEntry) isHealthy() bool {
7575
return e.unhealthyCount < e.clientConfig.HealthCheckConfig.UnhealthyThreshold
7676
}
7777

78-
func (e *healthCheckEntry) recordHealth(err error) {
78+
func (e *healthCheckEntry) recordHealth(err error) error {
7979
e.Lock()
8080
defer e.Unlock()
8181
if err != nil {
8282
e.unhealthyCount++
8383
} else {
8484
e.unhealthyCount = 0
8585
}
86+
87+
return err
8688
}
8789

8890
func (e *healthCheckEntry) tick() {
@@ -101,9 +103,9 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
101103
}
102104

103105
func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
104-
level.Warn(h.logger).Log("msg", "Performing health check")
106+
level.Warn(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances()))
105107
for _, instance := range h.registeredInstances() {
106-
dialOpts, err := instance.clientConfig.DialOption(nil, nil)
108+
dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil)
107109
if err != nil {
108110
return err
109111
}
@@ -126,9 +128,8 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
126128
instance.lastCheckTime.Store(time.Now())
127129

128130
go func(i *healthCheckEntry) {
129-
i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout))
130-
if !i.isHealthy() {
131-
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address)
131+
if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
132+
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
132133
}
133134
if err := conn.Close(); err != nil {
134135
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
@@ -138,7 +139,7 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
138139
return nil
139140
}
140141

141-
func (h *HealthCheckInterceptors) getOrAddHealthCheckEntry(address string, clientConfig Config) *healthCheckEntry {
142+
func (h *HealthCheckInterceptors) getOrAddHealthCheckEntry(address string, clientConfig *ConfigWithHealthCheck) *healthCheckEntry {
142143
h.RLock()
143144
e := h.activeInstances[address]
144145
h.RUnlock()
@@ -160,7 +161,7 @@ func (h *HealthCheckInterceptors) getOrAddHealthCheckEntry(address string, clien
160161
return h.activeInstances[address]
161162
}
162163

163-
func (h *HealthCheckInterceptors) StreamClientInterceptor(clientConfig Config) grpc.StreamClientInterceptor {
164+
func (h *HealthCheckInterceptors) StreamClientInterceptor(clientConfig *ConfigWithHealthCheck) grpc.StreamClientInterceptor {
164165
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
165166
e := h.getOrAddHealthCheckEntry(cc.Target(), clientConfig)
166167
e.tick()
@@ -172,7 +173,7 @@ func (h *HealthCheckInterceptors) StreamClientInterceptor(clientConfig Config) g
172173
}
173174
}
174175

175-
func (h *HealthCheckInterceptors) UnaryHealthCheckInterceptor(clientConfig Config) grpc.UnaryClientInterceptor {
176+
func (h *HealthCheckInterceptors) UnaryHealthCheckInterceptor(clientConfig *ConfigWithHealthCheck) grpc.UnaryClientInterceptor {
176177
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
177178
e := h.getOrAddHealthCheckEntry(cc.Target(), clientConfig)
178179
e.tick()

pkg/util/grpcclient/health_check_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
8-
"google.golang.org/grpc"
9-
"google.golang.org/grpc/credentials/insecure"
10-
"google.golang.org/grpc/health/grpc_health_v1"
117
"testing"
128
"time"
139

1410
"github.com/stretchr/testify/require"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/credentials/insecure"
13+
"google.golang.org/grpc/health/grpc_health_v1"
1514

1615
utillog "github.com/cortexproject/cortex/pkg/util/log"
16+
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
1717
)
1818

1919
type healthClientMock struct {
@@ -32,7 +32,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
3232
hMock := &healthClientMock{
3333
err: fmt.Errorf("some error"),
3434
}
35-
cfg := Config{
35+
cfg := ConfigWithHealthCheck{
3636
HealthCheckConfig: HealthCheckConfig{
3737
UnhealthyThreshold: 2,
3838
Interval: 0,
@@ -43,8 +43,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
4343
return hMock
4444
}
4545

46-
ui := i.UnaryHealthCheckInterceptor(cfg)
46+
ui := i.UnaryHealthCheckInterceptor(&cfg)
4747
ccUnhealthy, err := grpc.NewClient("localhost:999", grpc.WithTransportCredentials(insecure.NewCredentials()))
48+
require.NoError(t, err)
4849
ccHealthy, err := grpc.NewClient("localhost:111", grpc.WithTransportCredentials(insecure.NewCredentials()))
4950
require.NoError(t, err)
5051
invokedMap := map[string]int{}
@@ -58,15 +59,15 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
5859
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
5960

6061
// first health check
61-
i.iteration(context.Background())
62+
require.NoError(t, i.iteration(context.Background()))
6263

6364
//Should second first call
6465
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
6566

6667
require.Equal(t, invokedMap["localhost:999"], 2)
6768

6869
// Second Healthcheck -> should mark as unhealthy
69-
i.iteration(context.Background())
70+
require.NoError(t, i.iteration(context.Background()))
7071

7172
cortex_testutil.Poll(t, time.Second, true, func() interface{} {
7273
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
@@ -77,7 +78,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
7778

7879
// Should mark the instance back to healthy
7980
hMock.err = nil
80-
i.iteration(context.Background())
81+
require.NoError(t, i.iteration(context.Background()))
8182
cortex_testutil.Poll(t, time.Second, true, func() interface{} {
8283
return ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker) == nil
8384
})

0 commit comments

Comments
 (0)