diff --git a/CHANGELOG.md b/CHANGELOG.md index 8670b99fb0a..ac4b73c18d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction * [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics * [FEATURE] Add shuffle sharding for the compactor #4433 +* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716 ## 1.12.0 in progress diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 95e3650002f..25fede59ec2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -980,12 +980,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through return nil, err } + mutex.Lock() result := make([]metric.Metric, 0, len(metrics)) for _, m := range metrics { result = append(result, metric.Metric{ Metric: m, }) } + mutex.Unlock() return result, nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b01855a8180..63cbefb930e 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1841,6 +1841,36 @@ func TestSlowQueries(t *testing.T) { } } +func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) { + // Create distributor + ds, ing, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + shuffleShardEnabled: true, + shuffleShardSize: 3, + replicationFactor: 3, + }) + + ing[2].queryDelay = 50 * time.Millisecond + + ctx := user.InjectOrgID(context.Background(), "test") + + now := model.Now() + + for i := 0; i < 100; i++ { + req := mockWriteRequest([]labels.Labels{{{Name: labels.MetricName, Value: "test"}, {Name: "app", Value: "m"}, {Name: "uniq8", Value: strconv.Itoa(i)}}}, 1, now.Unix()) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + for i := 0; i < 50; i++ { + _, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test")) + require.NoError(t, err) + } +} + func TestDistributor_MetricsForLabelMatchers(t *testing.T) { const numIngesters = 5 @@ -2192,10 +2222,10 @@ type prepConfig struct { errFail error } -func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { - ingesters := []mockIngester{} +func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) { + ingesters := []*mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { - ingesters = append(ingesters, mockIngester{ + ingesters = append(ingesters, &mockIngester{ happy: *atomic.NewBool(true), queryDelay: cfg.queryDelay, }) @@ -2206,7 +2236,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []* miError = cfg.errFail } - ingesters = append(ingesters, mockIngester{ + ingesters = append(ingesters, &mockIngester{ queryDelay: cfg.queryDelay, failResp: *atomic.NewError(miError), }) @@ -2225,7 +2255,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []* RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(), Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}, } - ingestersByAddr[addr] = &ingesters[i] + ingestersByAddr[addr] = ingesters[i] } kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) @@ -2637,6 +2667,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest } func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) { + time.Sleep(i.queryDelay) i.Lock() defer i.Unlock() @@ -3098,7 +3129,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...)) } -func countMockIngestersCalls(ingesters []mockIngester, name string) int { +func countMockIngestersCalls(ingesters []*mockIngester, name string) int { count := 0 for i := 0; i < len(ingesters); i++ { if ingesters[i].countCalls(name) > 0 {