diff --git a/CHANGELOG.md b/CHANGELOG.md index bd5118c3aac..7b3e8f30541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 +* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 ## 1.13.0 in progress * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 diff --git a/integration/querier_test.go b/integration/querier_test.go index 4a1166ee824..14e4161d60f 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -734,18 +734,19 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) - // Start the querier and store-gateway, and configure them to not frequently sync blocks. + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-blocks-storage.bucket-store.sync-interval": "1m", + "-blocks-storage.bucket-store.sync-interval": "5s", }), "") querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-blocks-storage.bucket-store.sync-interval": "1m", + "-blocks-storage.bucket-store.sync-interval": "5s", }), "") require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) - // Wait until the querier and store-gateway have updated the ring. + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) // Query back the series. c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 7526a4f4b2b..f33be24fec5 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -9,6 +9,9 @@ import ( "sync" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/types" @@ -419,6 +422,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) if err != nil { + return nil, err } @@ -586,6 +590,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // TODO(goutham): we should ideally be passing the hints down to the storage layer // and let the TSDB return us data with no chunks as in prometheus#8050. // But this is an acceptable workaround for now. + + // Only fail the function if we have validation error. We should return blocks that were successfully + // retrieved. skipChunks := sp != nil && sp.Func == "series" req, err := createSeriesRequest(minT, maxT, convertedMatchers, skipChunks, blockIDs) @@ -595,6 +602,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( stream, err := c.Series(gCtx, req) if err != nil { + if isRetryableError(err) { + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + return nil + } return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } @@ -725,6 +736,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( namesResp, err := c.LabelNames(gCtx, req) if err != nil { + if isRetryableError(err) { + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + return nil + } return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } @@ -802,6 +817,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( valuesResp, err := c.LabelValues(gCtx, req) if err != nil { + if isRetryableError(err) { + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + return nil + } return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } @@ -967,3 +986,13 @@ func countChunkBytes(series ...*storepb.Series) (count int) { return count } + +// only retry connection issues +func isRetryableError(err error) bool { + switch status.Code(err) { + case codes.Unavailable: + return true + default: + return false + } +} diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 0dc97f80106..7af2a271373 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -9,6 +9,9 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/go-kit/log" "github.com/gogo/protobuf/types" "github.com/oklog/ulid" @@ -584,6 +587,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { queryLimiter: limiter.NewQueryLimiter(0, 8, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)), }, + "multiple store-gateways has the block, but one of them fails to return": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.Unavailable, "unavailable"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, } for testName, testData := range tests { @@ -1059,6 +1091,41 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { cortex_querier_storegateway_refetches_per_query_count 1 `, }, + "multiple store-gateways has the block, but one of them fails to return": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesErr: status.Error(codes.Unavailable, "unavailable"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + }, + }, + expectedLabelNames: namesFromSeries(series1), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1), + }, } for testName, testData := range tests { @@ -1361,8 +1428,10 @@ func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, m type storeGatewayClientMock struct { remoteAddr string mockedSeriesResponses []*storepb.SeriesResponse + mockedSeriesErr error mockedLabelNamesResponse *storepb.LabelNamesResponse mockedLabelValuesResponse *storepb.LabelValuesResponse + mockedLabelValuesErr error } func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) { @@ -1370,7 +1439,7 @@ func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesR mockedResponses: m.mockedSeriesResponses, } - return seriesClient, nil + return seriesClient, m.mockedSeriesErr } func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1378,7 +1447,7 @@ func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNames } func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { - return m.mockedLabelValuesResponse, nil + return m.mockedLabelValuesResponse, m.mockedLabelValuesErr } func (m *storeGatewayClientMock) RemoteAddress() string {