From 2615afe379c8a979154ab7fd86e63755c6b1f159 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jun 2020 10:35:20 +0200 Subject: [PATCH 1/3] Switched store-gateway query concurrency limit from per-tenant to shared Signed-off-by: Marco Pracucci --- CHANGELOG.md | 4 + go.mod | 2 +- go.sum | 6 +- pkg/ingester/ingester_v2.go | 5 +- pkg/storage/tsdb/config.go | 2 +- pkg/storegateway/bucket_store_metrics_test.go | 6 - pkg/storegateway/bucket_stores.go | 17 ++- pkg/storegateway/bucket_stores_test.go | 32 ++++- .../thanos-io/thanos/pkg/shipper/shipper.go | 122 +++++++++--------- .../thanos-io/thanos/pkg/store/bucket.go | 42 +++--- vendor/modules.txt | 2 +- 11 files changed, 139 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 734a224121..4b42ea4fc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## master / unreleased * [CHANGE] Experimental Delete Series: Change target flag for purger from `data-purger` to `purger`. #2777 +* [CHANGE] Experimental TSDB: The max concurrent queries against the long-term storage, configured via `-experimental.tsdb.bucket-store.max-concurrent`, is now a limit shared across all tenants and not a per-tenant limit anymore. The default value has changed from `20` to `100` and the following new metrics have been added: + * `cortex_bucket_stores_gate_queries_concurrent_max` + * `cortex_bucket_stores_gate_queries_in_flight` + * `cortex_bucket_stores_gate_duration_seconds` * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 * [FEATURE] Introduced `ruler.for-resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783 diff --git a/go.mod b/go.mod index 41762e0d71..5e02ff0e95 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e github.com/spf13/afero v1.2.2 github.com/stretchr/testify v1.5.1 - github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f + github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96 github.com/uber/jaeger-client-go v2.23.1+incompatible github.com/weaveworks/common v0.0.0-20200512154658-384f10054ec5 go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50 diff --git a/go.sum b/go.sum index 19685ef9ea..ada1b3b908 100644 --- a/go.sum +++ b/go.sum @@ -960,7 +960,7 @@ github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oA github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0= github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33/go.mod h1:fkIPPkuZnkXyopYHmXPxf9rgiPkVgZCN8w9o8+UgBlY= -github.com/prometheus/prometheus v1.8.2-0.20200609165731-66dfb951c4ca/go.mod h1:CwaXafRa0mm72de2GQWtfQxjGytbSKIGivWxQvjpRZs= +github.com/prometheus/prometheus v1.8.2-0.20200619100132-74207c04655e/go.mod h1:QV6T0PPQi5UFmqcLBJw3JiyIR8r1O7KEv9qlVw4VV40= github.com/prometheus/prometheus v1.8.2-0.20200622142935-153f859b7499 h1:q+yGm39CmSV1S7oxCz36nlvx9ugRoEodwuHusgJw+iU= github.com/prometheus/prometheus v1.8.2-0.20200622142935-153f859b7499/go.mod h1:QV6T0PPQi5UFmqcLBJw3JiyIR8r1O7KEv9qlVw4VV40= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg= @@ -1046,8 +1046,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/TxtJQ7DzinTt+G9kinDQmRS5sxwu0unVKZ9vdcw= -github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f h1:UnMVEOejh6tWKUag5tuC0WjKfKmGwJ2+ky0MV4KM52I= -github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f/go.mod h1:CPqrM/ibNtlraee0to4dSRiTs+KLI1c3agMS2lmJpz0= +github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96 h1:McsluZ8fXVwGbdXsZ20uZNGukmPycDU9m6df64S2bqQ= +github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96/go.mod h1:VuNcGvUE0u57S1XXqYhf0dQzUO3wUnw2B5IKsju+1z4= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 02a7531efb..610cd79dee 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -898,7 +898,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { tsdbPromReg, udir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), - func() labels.Labels { return l }, metadata.ReceiveSource) + func() labels.Labels { return l }, + metadata.ReceiveSource, + true, // Allow out of order uploads. It's fine in Cortex's context. + ) } i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d9e9f65084..f0beff5ad9 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -207,7 +207,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size - in bytes - of a per-tenant chunk pool, used to reduce memory allocations.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples per query when loading series from the long-term storage. 0 disables the limit.") - f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to execute against the long-term storage on a per-tenant basis.") + f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") f.IntVar(&cfg.TenantSyncConcurrency, "experimental.tsdb.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "experimental.tsdb.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "experimental.tsdb.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket_store_metrics_test.go b/pkg/storegateway/bucket_store_metrics_test.go index fc4a712afe..b2aaea16a7 100644 --- a/pkg/storegateway/bucket_store_metrics_test.go +++ b/pkg/storegateway/bucket_store_metrics_test.go @@ -236,7 +236,6 @@ func populateMockedBucketStoreMetrics(base float64) *prometheus.Registry { m.chunkSizeBytes.Observe(30 * base) m.queriesDropped.Add(31 * base) - m.queriesLimit.Add(32 * base) m.seriesRefetches.Add(33 * base) @@ -273,7 +272,6 @@ type mockedBucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter - queriesLimit prometheus.Gauge cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -355,10 +353,6 @@ func newMockedBucketStoreMetrics(reg prometheus.Registerer) *mockedBucketStoreMe Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) - m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_concurrent_max", - Help: "Number of maximum concurrent queries.", - }) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", 64*1024), diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 820c951802..b480c41a70 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -18,6 +18,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" @@ -43,6 +44,9 @@ type BucketStores struct { // Index cache shared across all tenants. indexCache storecache.IndexCache + // Gate used to limit query concurrency across all tenants. + queryGate gate.Gate + // Keeps a bucket store for each tenant. storesMu sync.RWMutex stores map[string]*store.BucketStore @@ -59,6 +63,14 @@ func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClie return nil, errors.Wrapf(err, "create caching bucket") } + // The number of concurrent queries against the tenants BucketStores are limited. + queryGateReg := extprom.WrapRegistererWithPrefix("cortex_bucket_stores_", reg) + queryGate := gate.NewKeeper(queryGateReg).NewGate(cfg.BucketStore.MaxConcurrent) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_gate_queries_concurrent_max", + Help: "Number of maximum concurrent queries allowed.", + }).Set(float64(cfg.BucketStore.MaxConcurrent)) + u := &BucketStores{ logger: logger, cfg: cfg, @@ -68,6 +80,7 @@ func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClie logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), + queryGate: queryGate, syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -268,9 +281,9 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro fetcher, filepath.Join(u.cfg.BucketStore.SyncDir, userID), u.indexCache, - uint64(u.cfg.BucketStore.MaxChunkPoolBytes), + u.queryGate, + u.cfg.BucketStore.MaxChunkPoolBytes, u.cfg.BucketStore.MaxSampleCount, - u.cfg.BucketStore.MaxConcurrent, u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug u.cfg.BucketStore.BlockSyncConcurrency, nil, // Do not limit timerange. diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 44921d7c70..ae4fb14fb8 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -88,7 +88,21 @@ func TestBucketStores_InitialSync(t *testing.T) { # HELP cortex_bucket_store_block_load_failures_total Total number of failed remote block loading attempts. # TYPE cortex_bucket_store_block_load_failures_total counter cortex_bucket_store_block_load_failures_total 0 - `), "cortex_bucket_store_blocks_loaded", "cortex_bucket_store_block_loads_total", "cortex_bucket_store_block_load_failures_total")) + + # HELP cortex_bucket_stores_gate_queries_concurrent_max Number of maximum concurrent queries allowed. + # TYPE cortex_bucket_stores_gate_queries_concurrent_max gauge + cortex_bucket_stores_gate_queries_concurrent_max 100 + + # HELP cortex_bucket_stores_gate_queries_in_flight Number of queries that are currently in flight. + # TYPE cortex_bucket_stores_gate_queries_in_flight gauge + cortex_bucket_stores_gate_queries_in_flight 0 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + "cortex_bucket_store_block_load_failures_total", + "cortex_bucket_stores_gate_queries_concurrent_max", + "cortex_bucket_stores_gate_queries_in_flight", + )) assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) } @@ -145,7 +159,21 @@ func TestBucketStores_SyncBlocks(t *testing.T) { # HELP cortex_bucket_store_block_load_failures_total Total number of failed remote block loading attempts. # TYPE cortex_bucket_store_block_load_failures_total counter cortex_bucket_store_block_load_failures_total 0 - `), "cortex_bucket_store_blocks_loaded", "cortex_bucket_store_block_loads_total", "cortex_bucket_store_block_load_failures_total")) + + # HELP cortex_bucket_stores_gate_queries_concurrent_max Number of maximum concurrent queries allowed. + # TYPE cortex_bucket_stores_gate_queries_concurrent_max gauge + cortex_bucket_stores_gate_queries_concurrent_max 100 + + # HELP cortex_bucket_stores_gate_queries_in_flight Number of queries that are currently in flight. + # TYPE cortex_bucket_stores_gate_queries_in_flight gauge + cortex_bucket_stores_gate_queries_in_flight 0 + `), + "cortex_bucket_store_blocks_loaded", + "cortex_bucket_store_block_loads_total", + "cortex_bucket_store_block_load_failures_total", + "cortex_bucket_stores_gate_queries_concurrent_max", + "cortex_bucket_stores_gate_queries_in_flight", + )) assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go index e3f1bec554..5c961f4459 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go +++ b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go @@ -72,13 +72,15 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { // Shipper watches a directory for matching files and directories and uploads // them to a remote data store. type Shipper struct { - logger log.Logger - dir string - metrics *metrics - bucket objstore.Bucket - labels func() labels.Labels - source metadata.SourceType - uploadCompacted bool + logger log.Logger + dir string + metrics *metrics + bucket objstore.Bucket + labels func() labels.Labels + source metadata.SourceType + + uploadCompacted bool + allowOutOfOrderUploads bool } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -90,6 +92,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -99,12 +102,13 @@ func New( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, false), - source: source, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, false), + source: source, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -118,6 +122,7 @@ func NewWithCompacted( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -127,13 +132,14 @@ func NewWithCompacted( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, true), - source: source, - uploadCompacted: true, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, true), + source: source, + uploadCompacted: true, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -153,23 +159,23 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, 0, err + } + for _, m := range metas { if m.MinTime < minTime { minTime = m.MinTime } if _, ok := hasUploaded[m.ULID]; ok && m.MaxTime > maxSyncTime { maxSyncTime = m.MaxTime } - return nil - }); err != nil { - return 0, 0, errors.Wrap(err, "iter Block metas for timestamp") } if minTime == math.MaxInt64 { // No block yet found. We cannot assume any min block size so propagate 0 minTime. minTime = 0 } - return minTime, maxSyncTime, nil } @@ -272,72 +278,78 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { checker = newLazyOverlapChecker(s.logger, s.bucket, s.labels) uploadErrs int ) - // Sync non compacted blocks first. - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, err + } + for _, m := range metas { // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket, // it was generally removed by the compaction process. if _, uploaded := hasUploaded[m.ULID]; uploaded { meta.Uploaded = append(meta.Uploaded, m.ULID) - return nil + continue } if m.Stats.NumSamples == 0 { // Ignore empty blocks. level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID) - return nil + continue } // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { if !s.uploadCompacted { - return nil + continue } } // Check against bucket if the meta file for this block exists. ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) if err != nil { - return errors.Wrap(err, "check exists") + return 0, errors.Wrap(err, "check exists") } if ok { - return nil + continue } if m.Compaction.Level > 1 { if err := checker.IsOverlapping(ctx, m.BlockMeta); err != nil { + if !s.allowOutOfOrderUploads { + return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err) + } level.Error(s.logger).Log("msg", "found overlap or error during sync, cannot upload compacted block", "err", err) uploadErrs++ - return nil + continue } } if err := s.upload(ctx, m); err != nil { - level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) + if !s.allowOutOfOrderUploads { + return 0, errors.Wrapf(err, "upload %v", m.ULID) + } + // No error returned, just log line. This is because we want other blocks to be uploaded even // though this one failed. It will be retried on second Sync iteration. + level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) uploadErrs++ - return nil + continue } meta.Uploaded = append(meta.Uploaded, m.ULID) - uploaded++ s.metrics.uploads.Inc() - return nil - }); err != nil { - s.metrics.dirSyncFailures.Inc() - return uploaded, errors.Wrap(err, "iter local block metas") } - if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err) } s.metrics.dirSyncs.Inc() - if uploadErrs > 0 { s.metrics.uploadFailures.Add(float64(uploadErrs)) return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) - } else if s.uploadCompacted { + } + + if s.uploadCompacted { s.metrics.uploadedCompacted.Set(1) } return uploaded, nil @@ -380,15 +392,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return block.Upload(ctx, s.logger, s.bucket, updir) } -// iterBlockMetas calls f with the block meta for each block found in dir -// sorted by minTime asc. It logs an error and continues if it cannot access a -// meta.json file. -// If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { - var metas []*metadata.Meta +// blockMetasFromOldest returns the block meta of each block found in dir +// sorted by minTime asc. +func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) { fis, err := ioutil.ReadDir(s.dir) if err != nil { - return errors.Wrap(err, "read dir") + return nil, errors.Wrap(err, "read dir") } names := make([]string, 0, len(fis)) for _, fi := range fis { @@ -402,28 +411,21 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { fi, err := os.Stat(dir) if err != nil { - level.Warn(s.logger).Log("msg", "open file failed", "err", err) - continue + return nil, errors.Wrapf(err, "stat block %v", dir) } if !fi.IsDir() { continue } m, err := metadata.Read(dir) if err != nil { - level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) - continue + return nil, errors.Wrapf(err, "read metadata for block %v", dir) } metas = append(metas, m) } sort.Slice(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }) - for _, m := range metas { - if err := f(m); err != nil { - return err - } - } - return nil + return metas, nil } func hardlinkBlock(src, dst string) error { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index e5793d78c0..33e9d68918 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -40,7 +40,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -101,7 +100,6 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter - queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec @@ -184,10 +182,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) - m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_concurrent_max", - Help: "Number of maximum concurrent queries.", - }) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), @@ -273,9 +267,9 @@ func NewBucketStore( fetcher block.MetadataFetcher, dir string, indexCache storecache.IndexCache, + queryGate gate.Gate, maxChunkPoolBytes uint64, maxSampleCount uint64, - maxConcurrent int, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -288,10 +282,6 @@ func NewBucketStore( logger = log.NewNopLogger() } - if maxConcurrent < 0 { - return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) - } - chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, errors.Wrap(err, "create chunk pool") @@ -310,7 +300,7 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, filterConfig: filterConfig, - queryGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrent), + queryGate: queryGate, samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, @@ -324,8 +314,6 @@ func NewBucketStore( return nil, errors.Wrap(err, "create dir") } - s.metrics.queriesLimit.Set(float64(maxConcurrent)) - return s, nil } @@ -844,14 +832,16 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { - err = s.queryGate.Start(srv.Context()) - }) - if err != nil { - return errors.Wrapf(err, "failed to wait for turn") - } + if s.queryGate != nil { + tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { + err = s.queryGate.Start(srv.Context()) + }) + if err != nil { + return errors.Wrapf(err, "failed to wait for turn") + } - defer s.queryGate.Done() + defer s.queryGate.Done() + } matchers, err := promclient.TranslateMatchers(req.Matchers) if err != nil { @@ -1329,11 +1319,15 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] } defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader") - c, err := ioutil.ReadAll(r) - if err != nil { + // Preallocate the buffer with the exact size so we don't waste allocations + // while progressively growing an initial small buffer. The buffer capacity + // is increased by MinRead to avoid extra allocations due to how ReadFrom() + // internally works. + buf := bytes.NewBuffer(make([]byte, 0, length+bytes.MinRead)) + if _, err := buf.ReadFrom(r); err != nil { return nil, errors.Wrap(err, "read range") } - return c, nil + return buf.Bytes(), nil } func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) (*[]byte, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index bae1df7826..4b1ebb732c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -587,7 +587,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f +# github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96 ## explicit github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader From 0e8abfa0a3bd84692c7553e8359c3f8ce240ecdf Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jun 2020 10:37:32 +0200 Subject: [PATCH 2/3] Added PR number Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b42ea4fc9..64f5261af1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [CHANGE] Experimental Delete Series: Change target flag for purger from `data-purger` to `purger`. #2777 -* [CHANGE] Experimental TSDB: The max concurrent queries against the long-term storage, configured via `-experimental.tsdb.bucket-store.max-concurrent`, is now a limit shared across all tenants and not a per-tenant limit anymore. The default value has changed from `20` to `100` and the following new metrics have been added: +* [CHANGE] Experimental TSDB: The max concurrent queries against the long-term storage, configured via `-experimental.tsdb.bucket-store.max-concurrent`, is now a limit shared across all tenants and not a per-tenant limit anymore. The default value has changed from `20` to `100` and the following new metrics have been added: #2797 * `cortex_bucket_stores_gate_queries_concurrent_max` * `cortex_bucket_stores_gate_queries_in_flight` * `cortex_bucket_stores_gate_duration_seconds` From aefcc8bdd5c486378b04e225340b03f7b4ad8e6a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jun 2020 10:55:14 +0200 Subject: [PATCH 3/3] Updated doc Signed-off-by: Marco Pracucci --- docs/configuration/config-file-reference.md | 6 +++--- docs/operations/blocks-storage.md | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 09b1d559c0..1eb4078200 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2805,10 +2805,10 @@ bucket_store: # CLI flag: -experimental.tsdb.bucket-store.max-sample-count [max_sample_count: | default = 0] - # Max number of concurrent queries to execute against the long-term storage on - # a per-tenant basis. + # Max number of concurrent queries to execute against the long-term storage. + # The limit is shared across all tenants. # CLI flag: -experimental.tsdb.bucket-store.max-concurrent - [max_concurrent: | default = 20] + [max_concurrent: | default = 100] # Maximum number of concurrent tenants synching blocks. # CLI flag: -experimental.tsdb.bucket-store.tenant-sync-concurrency diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index ce1020045f..0151ee9f7e 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -192,10 +192,10 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.max-sample-count [max_sample_count: | default = 0] - # Max number of concurrent queries to execute against the long-term storage - # on a per-tenant basis. + # Max number of concurrent queries to execute against the long-term storage. + # The limit is shared across all tenants. # CLI flag: -experimental.tsdb.bucket-store.max-concurrent - [max_concurrent: | default = 20] + [max_concurrent: | default = 100] # Maximum number of concurrent tenants synching blocks. # CLI flag: -experimental.tsdb.bucket-store.tenant-sync-concurrency