Skip to content

Commit 8ef3061

Browse files
authored
Merge pull request #1138 from gouthamve/up-max-size
Make the max grpc receive message size configurable, in Bigtable client
2 parents 5d861bb + 35969e6 commit 8ef3061

File tree

4 files changed

+69
-18
lines changed

4 files changed

+69
-18
lines changed

pkg/chunk/gcp/bigtable_index_client.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"cloud.google.com/go/bigtable"
1111
ot "github.com/opentracing/opentracing-go"
1212
otlog "github.com/opentracing/opentracing-go/log"
13+
"google.golang.org/api/option"
1314

1415
"github.com/cortexproject/cortex/pkg/chunk"
1516
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
1617
"github.com/cortexproject/cortex/pkg/util"
18+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1719
"github.com/pkg/errors"
1820
)
1921

@@ -30,12 +32,21 @@ const (
3032
type Config struct {
3133
Project string `yaml:"project"`
3234
Instance string `yaml:"instance"`
35+
36+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
37+
38+
ColumnKey bool
3339
}
3440

3541
// RegisterFlags adds the flags required to config this to the given FlagSet
3642
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3743
f.StringVar(&cfg.Project, "bigtable.project", "", "Bigtable project ID.")
3844
f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.")
45+
46+
cfg.GRPCClientConfig.RegisterFlags("bigtable", f)
47+
48+
// Deprecated.
49+
f.Int("bigtable.max-recv-msg-size", 100<<20, "DEPRECATED. Bigtable grpc max receive message size.")
3950
}
4051

4152
// storageClientColumnKey implements chunk.storageClient for GCP.
@@ -53,7 +64,10 @@ type storageClientV1 struct {
5364

5465
// NewStorageClientV1 returns a new v1 StorageClient.
5566
func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
56-
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...)
67+
opts := instrumentation()
68+
opts = append(opts, option.WithGRPCDialOption(cfg.GRPCClientConfig.DialOption()))
69+
70+
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
5771
if err != nil {
5872
return nil, err
5973
}

pkg/ingester/client/client.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
1313
"google.golang.org/grpc/health/grpc_health_v1"
1414

15+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1516
cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware"
1617
"github.com/weaveworks/common/middleware"
1718
)
@@ -49,13 +50,7 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
4950
middleware.StreamClientUserHeaderInterceptor,
5051
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
5152
)),
52-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)),
53-
}
54-
if cfg.legacyCompressToIngester {
55-
cfg.CompressToIngester = true
56-
}
57-
if cfg.CompressToIngester {
58-
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
53+
cfg.GRPCClientConfig.DialOption(),
5954
}
6055
conn, err := grpc.Dial(addr, opts...)
6156
if err != nil {
@@ -74,16 +69,15 @@ func (c *closableHealthAndIngesterClient) Close() error {
7469

7570
// Config is the configuration struct for the ingester client
7671
type Config struct {
77-
MaxRecvMsgSize int
78-
CompressToIngester bool
79-
legacyCompressToIngester bool
72+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
8073
}
8174

8275
// RegisterFlags registers configuration settings used by the ingester client config.
8376
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
84-
// We have seen 20MB returns from queries - add a bit of headroom
85-
f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.")
86-
f.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.")
87-
// moved from distributor pkg, but flag prefix left as back compat fallback for existing users.
88-
f.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
77+
cfg.GRPCClientConfig.RegisterFlags("ingester.client", f)
78+
79+
// Deprecated.
80+
f.Int("ingester.client.max-recv-message-size", 64*1024*1024, "DEPRECATED. Maximum message size, in bytes, this client will receive.")
81+
f.Bool("ingester.client.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters.")
82+
f.Bool("distributor.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
8983
}

pkg/querier/frontend/worker.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc/naming"
1515

1616
"github.com/cortexproject/cortex/pkg/util"
17+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1718
"github.com/weaveworks/common/httpgrpc"
1819
"github.com/weaveworks/common/httpgrpc/server"
1920
"github.com/weaveworks/common/middleware"
@@ -31,13 +32,17 @@ type WorkerConfig struct {
3132
Address string
3233
Parallelism int
3334
DNSLookupDuration time.Duration
35+
36+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
3437
}
3538

3639
// RegisterFlags adds the flags required to config this to the given FlagSet.
3740
func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
3841
f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.")
3942
f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.")
4043
f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.")
44+
45+
cfg.GRPCClientConfig.RegisterFlags("querier.frontend-client", f)
4146
}
4247

4348
// Worker is the counter-part to the frontend, actually processing requests.
@@ -142,7 +147,7 @@ func (w *worker) watchDNSLoop() {
142147

143148
// runMany starts N runOne loops for a given address.
144149
func (w *worker) runMany(ctx context.Context, address string) {
145-
client, err := connect(address)
150+
client, err := w.connect(address)
146151
if err != nil {
147152
level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err)
148153
return
@@ -206,13 +211,14 @@ func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error {
206211
}
207212
}
208213

209-
func connect(address string) (FrontendClient, error) {
214+
func (w *worker) connect(address string) (FrontendClient, error) {
210215
conn, err := grpc.Dial(
211216
address,
212217
grpc.WithInsecure(),
213218
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
214219
middleware.ClientUserHeaderInterceptor,
215220
)),
221+
w.cfg.GRPCClientConfig.DialOption(),
216222
)
217223
if err != nil {
218224
return nil, err

pkg/util/grpcclient/grpcclient.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package grpcclient
2+
3+
import (
4+
"flag"
5+
6+
"google.golang.org/grpc"
7+
)
8+
9+
// Config for a gRPC client.
10+
type Config struct {
11+
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
12+
MaxSendMsgSize int `yaml:"max_send_msg_size"`
13+
UseGzipCompression bool `yaml:"use_gzip_compression"`
14+
}
15+
16+
// RegisterFlags registers flags.
17+
func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) {
18+
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
19+
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
20+
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
21+
}
22+
23+
// CallOptions returns the config in terms of CallOptions.
24+
func (cfg *Config) CallOptions() []grpc.CallOption {
25+
var opts []grpc.CallOption
26+
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize))
27+
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize))
28+
if cfg.UseGzipCompression {
29+
opts = append(opts, grpc.UseCompressor("gzip"))
30+
}
31+
return opts
32+
}
33+
34+
// DialOption returns the config as a grpc.DialOptions.
35+
func (cfg *Config) DialOption() grpc.DialOption {
36+
return grpc.WithDefaultCallOptions(cfg.CallOptions()...)
37+
}

0 commit comments

Comments
 (0)