Skip to content

Commit a1b1954

Browse files
authored
Filter empty labels from sharding key (#5717)
* Filter empty labels from sharding key Signed-off-by: Daniel Deluiggi <[email protected]> * Update changelog Signed-off-by: Daniel Deluiggi <[email protected]> --------- Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent aef8ad3 commit a1b1954

File tree

3 files changed

+94
-3
lines changed

3 files changed

+94
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
1212
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686
1313
* [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703
14+
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
15+
1416

1517
## 1.16.0 2023-11-20
1618

pkg/distributor/distributor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,10 @@ func shardByUser(userID string) uint32 {
466466
func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 {
467467
h := shardByUser(userID)
468468
for _, label := range labels {
469-
h = ingester_client.HashAdd32(h, label.Name)
470-
h = ingester_client.HashAdd32(h, label.Value)
469+
if len(label.Value) > 0 {
470+
h = ingester_client.HashAdd32(h, label.Name)
471+
h = ingester_client.HashAdd32(h, label.Value)
472+
}
471473
}
472474
return h
473475
}

pkg/distributor/distributor_test.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2391,6 +2391,7 @@ type prepConfig struct {
23912391
replicationFactor int
23922392
enableTracker bool
23932393
errFail error
2394+
tokens [][]uint32
23942395
}
23952396

23962397
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) {
@@ -2417,14 +2418,20 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
24172418
ingesterDescs := map[string]ring.InstanceDesc{}
24182419
ingestersByAddr := map[string]*mockIngester{}
24192420
for i := range ingesters {
2421+
var tokens []uint32
2422+
if len(cfg.tokens) > i {
2423+
tokens = cfg.tokens[i]
2424+
} else {
2425+
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
2426+
}
24202427
addr := fmt.Sprintf("%d", i)
24212428
ingesterDescs[addr] = ring.InstanceDesc{
24222429
Addr: addr,
24232430
Zone: "",
24242431
State: ring.ACTIVE,
24252432
Timestamp: time.Now().Unix(),
24262433
RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(),
2427-
Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)},
2434+
Tokens: tokens,
24282435
}
24292436
ingestersByAddr[addr] = ingesters[i]
24302437
}
@@ -3303,6 +3310,86 @@ func TestDistributor_Push_Relabel(t *testing.T) {
33033310
}
33043311
}
33053312

3313+
func TestDistributor_Push_EmptyLabel(t *testing.T) {
3314+
t.Parallel()
3315+
ctx := user.InjectOrgID(context.Background(), "pushEmptyLabel")
3316+
type testcase struct {
3317+
name string
3318+
inputSeries []labels.Labels
3319+
expectedSeries labels.Labels
3320+
}
3321+
3322+
cases := []testcase{
3323+
{
3324+
name: "with empty label",
3325+
inputSeries: []labels.Labels{
3326+
{ //Token 1106054332 without filtering
3327+
{Name: "__name__", Value: "foo"},
3328+
{Name: "empty", Value: ""},
3329+
},
3330+
{ //Token 3827924124 without filtering
3331+
{Name: "__name__", Value: "foo"},
3332+
{Name: "changHash", Value: ""},
3333+
},
3334+
},
3335+
expectedSeries: labels.Labels{
3336+
//Token 1797290973
3337+
{Name: "__name__", Value: "foo"},
3338+
},
3339+
},
3340+
}
3341+
3342+
for _, tc := range cases {
3343+
tc := tc
3344+
t.Run(tc.name, func(t *testing.T) {
3345+
t.Parallel()
3346+
var err error
3347+
var limits validation.Limits
3348+
flagext.DefaultValues(&limits)
3349+
3350+
token := [][]uint32{
3351+
{1},
3352+
{2},
3353+
{3},
3354+
{1106054333},
3355+
{5},
3356+
{6},
3357+
{7},
3358+
{8},
3359+
{9},
3360+
{3827924125},
3361+
}
3362+
3363+
ds, ingesters, _, _ := prepare(t, prepConfig{
3364+
numIngesters: 10,
3365+
happyIngesters: 10,
3366+
numDistributors: 1,
3367+
shardByAllLabels: true,
3368+
limits: &limits,
3369+
replicationFactor: 1,
3370+
shuffleShardSize: 10,
3371+
tokens: token,
3372+
})
3373+
3374+
// Push the series to the distributor
3375+
req := mockWriteRequest(tc.inputSeries, 1, 1)
3376+
_, err = ds[0].Push(ctx, req)
3377+
require.NoError(t, err)
3378+
3379+
// Since each test pushes only 1 series, we do expect the ingester
3380+
// to have received exactly 1 series
3381+
ingesterWithSeries := 0
3382+
for i := range ingesters {
3383+
timeseries := ingesters[i].series()
3384+
if len(timeseries) > 0 {
3385+
ingesterWithSeries++
3386+
}
3387+
}
3388+
assert.Equal(t, 1, ingesterWithSeries)
3389+
})
3390+
}
3391+
}
3392+
33063393
func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) {
33073394
t.Parallel()
33083395
metricRelabelConfigs := []*relabel.Config{

0 commit comments

Comments
 (0)