diff --git a/CHANGELOG.md b/CHANGELOG.md index 93522b41164..470aefe5228 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 +* [CHANGE] Enable Thanos series limiter in store-gateway. #4702 ## 1.12.0 in progress diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 804e8b0b181..c6233aa4b6f 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -492,7 +492,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro fetcher, u.syncDirForUser(userID), newChunksLimiterFactory(u.limits, userID), - store.NewSeriesLimiterFactory(0), // No series limiter. + newSeriesLimiterFactory(u.limits, userID), u.partitioner, u.cfg.BucketStore.BlockSyncConcurrency, false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers @@ -605,11 +605,11 @@ func (s spanSeriesServer) Context() context.Context { return s.ctx } -type chunkLimiter struct { +type limiter struct { limiter *store.Limiter } -func (c *chunkLimiter) Reserve(num uint64) error { +func (c *limiter) Reserve(num uint64) error { err := c.limiter.Reserve(num) if err != nil { return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error()) @@ -622,8 +622,18 @@ func newChunksLimiterFactory(limits *validation.Overrides, userID string) store. return func(failedCounter prometheus.Counter) store.ChunksLimiter { // Since limit overrides could be live reloaded, we have to get the current user's limit // each time a new limiter is instantiated. - return &chunkLimiter{ + return &limiter{ limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter), } } } + +func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory { + return func(failedCounter prometheus.Counter) store.SeriesLimiter { + // Since limit overrides could be live reloaded, we have to get the current user's limit + // each time a new limiter is instantiated. + return &limiter{ + limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter), + } + } +} diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index a43b64a10fb..acb08d698f6 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -973,6 +973,95 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi } } +func TestStoreGateway_SeriesQueryingShouldEnforceMaxSeriesPerQueryLimit(t *testing.T) { + const seriesQueried = 10 + + tests := map[string]struct { + limit int + expectedErr error + }{ + "no limit enforced if zero": { + limit: 0, + expectedErr: nil, + }, + "should return NO error if the actual number of queried series is <= limit": { + limit: seriesQueried, + expectedErr: nil, + }, + "should return error if the actual number of queried series is > limit": { + limit: seriesQueried - 1, + expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded series limit: rpc error: code = Code(422) desc = limit %d violated (got %d)", seriesQueried-1, seriesQueried)), + }, + } + + ctx := context.Background() + logger := log.NewNopLogger() + userID := "user-1" + + storageDir, err := ioutil.TempDir(os.TempDir(), "") + require.NoError(t, err) + defer os.RemoveAll(storageDir) //nolint:errcheck + + // Generate 1 TSDB block with chunksQueried series. Since each mocked series contains only 1 sample, + // it will also only have 1 chunk. + now := time.Now() + minT := now.Add(-1*time.Hour).Unix() * 1000 + maxT := now.Unix() * 1000 + mockTSDB(t, path.Join(storageDir, userID), seriesQueried, 0, minT, maxT) + + bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + // Prepare the request to query back all series (1 chunk per series in this test). + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Customise the limits. + limits := defaultLimitsConfig() + limits.MaxFetchedSeriesPerQuery = testData.limit + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + // Create a store-gateway used to query back the series from the blocks. + gatewayCfg := mockGatewayConfig() + gatewayCfg.ShardingEnabled = false + storageCfg := mockStorageConfig(t) + + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, overrides, mockLoggingLevel(), logger, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, g)) + defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck + + // Query back all the series (1 chunk per series in this test). + srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + err = g.Series(req, srv) + + if testData.expectedErr != nil { + fmt.Println("Error: ", err.Error()) + require.Error(t, err) + assert.IsType(t, testData.expectedErr, err) + s1, ok := status.FromError(errors.Cause(err)) + assert.True(t, ok) + s2, ok := status.FromError(errors.Cause(testData.expectedErr)) + assert.True(t, ok) + assert.True(t, strings.Contains(s1.Message(), s2.Message())) + assert.Equal(t, s1.Code(), s2.Code()) + } else { + require.NoError(t, err) + assert.Empty(t, srv.Warnings) + assert.Len(t, srv.SeriesSet, seriesQueried) + } + }) + } +} + func mockGatewayConfig() Config { cfg := Config{} flagext.DefaultValues(&cfg)