Skip to content

Commit 6b349a7

Browse files
authored
Add option to limit concurrent queries to Cassandra. (#2562)
* Add option to limit concurrent queries to Cassandra. Signed-off-by: Tom Wilkie <[email protected]> * go mod vendor Signed-off-by: Tom Wilkie <[email protected]> * Add changelog entry. Signed-off-by: Tom Wilkie <[email protected]> * Review feedback. Signed-off-by: Tom Wilkie <[email protected]>
1 parent f5e6bf0 commit 6b349a7

File tree

7 files changed

+176
-8
lines changed

7 files changed

+176
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* [FEATURE] Experimental: Added a new object storage client for OpenStack Swift. #2440
2727
* [FEATURE] Update in dependency `weaveworks/common`. TLS config options added to the Server. #2535
2828
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
29+
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.query-concurrency` flag. #2562
2930
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
3031
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
3132
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,6 +1685,10 @@ cassandra:
16851685
# CLI flag: -cassandra.retry-min-backoff
16861686
[retry_min_backoff: <duration> | default = 100ms]
16871687

1688+
# Limit number of concurrent queries to Cassandra. (Default is 0: no limit)
1689+
# CLI flag: -cassandra.query-concurrency
1690+
[query_concurrency: <int> | default = 0]
1691+
16881692
boltdb:
16891693
# Location of BoltDB index files.
16901694
# CLI flag: -boltdb.dir

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ require (
5757
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
5858
go.uber.org/atomic v1.5.1
5959
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
60-
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
60+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
6161
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
6262
google.golang.org/api v0.14.0
6363
google.golang.org/grpc v1.26.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha
10411041
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
10421042
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
10431043
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
1044+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
1045+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
10441046
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
10451047
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
10461048
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

pkg/chunk/cassandra/storage_client.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/gocql/gocql"
1414
"github.com/pkg/errors"
15+
"golang.org/x/sync/semaphore"
1516

1617
"github.com/cortexproject/cortex/pkg/chunk"
1718
"github.com/cortexproject/cortex/pkg/chunk/util"
@@ -40,6 +41,7 @@ type Config struct {
4041
Retries int `yaml:"max_retries"`
4142
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
4243
MinBackoff time.Duration `yaml:"retry_min_backoff"`
44+
QueryConcurrency int `yaml:"query_concurrency"`
4345
}
4446

4547
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -63,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6365
f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. (Default is 0: no retries)")
6466
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
6567
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
68+
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
6669
}
6770

6871
func (cfg *Config) Validate() error {
@@ -192,9 +195,10 @@ func (cfg *Config) createKeyspace() error {
192195

193196
// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
194197
type StorageClient struct {
195-
cfg Config
196-
schemaCfg chunk.SchemaConfig
197-
session *gocql.Session
198+
cfg Config
199+
schemaCfg chunk.SchemaConfig
200+
session *gocql.Session
201+
querySemaphore *semaphore.Weighted
198202
}
199203

200204
// NewStorageClient returns a new StorageClient.
@@ -206,10 +210,16 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
206210
return nil, errors.WithStack(err)
207211
}
208212

213+
var querySemaphore *semaphore.Weighted
214+
if cfg.QueryConcurrency > 0 {
215+
querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency))
216+
}
217+
209218
client := &StorageClient{
210-
cfg: cfg,
211-
schemaCfg: schemaCfg,
212-
session: session,
219+
cfg: cfg,
220+
schemaCfg: schemaCfg,
221+
session: session,
222+
querySemaphore: querySemaphore,
213223
}
214224
return client, nil
215225
}
@@ -277,6 +287,13 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue
277287
}
278288

279289
func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error {
290+
if s.querySemaphore != nil {
291+
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
292+
return err
293+
}
294+
defer s.querySemaphore.Release(1)
295+
}
296+
280297
var q *gocql.Query
281298

282299
switch {
@@ -383,6 +400,13 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
383400
}
384401

385402
func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
403+
if s.querySemaphore != nil {
404+
if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
405+
return input, err
406+
}
407+
defer s.querySemaphore.Release(1)
408+
}
409+
386410
tableName, err := s.schemaCfg.ChunkTableFor(input.From)
387411
if err != nil {
388412
return input, err

vendor/golang.org/x/sync/semaphore/semaphore.go

Lines changed: 136 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)