Skip to content

Commit 45b1f53

Browse files
committed
Add tests to distributor Push including metadata
Signed-off-by: gotjosh <[email protected]>
1 parent 05188b5 commit 45b1f53

File tree

6 files changed

+77
-33
lines changed

6 files changed

+77
-33
lines changed

pkg/distributor/distributor_test.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func TestDistributor_Push(t *testing.T) {
5656
happyIngesters int
5757
samples int
5858
startTimestampMs int64
59+
metadata int
5960
expectedResponse *client.WriteResponse
6061
expectedError error
6162
expectedMetrics string
@@ -68,7 +69,8 @@ func TestDistributor_Push(t *testing.T) {
6869
"A push to 3 happy ingesters should succeed": {
6970
numIngesters: 3,
7071
happyIngesters: 3,
71-
samples: 10,
72+
samples: 5,
73+
metadata: 5,
7274
expectedResponse: success,
7375
startTimestampMs: 123456789000,
7476
expectedMetrics: `
@@ -80,7 +82,7 @@ func TestDistributor_Push(t *testing.T) {
8082
"A push to 2 happy ingesters should succeed": {
8183
numIngesters: 3,
8284
happyIngesters: 2,
83-
samples: 10,
85+
samples: 5,
8486
expectedResponse: success,
8587
startTimestampMs: 123456789000,
8688
expectedMetrics: `
@@ -116,8 +118,9 @@ func TestDistributor_Push(t *testing.T) {
116118
"A push exceeding burst size should fail": {
117119
numIngesters: 3,
118120
happyIngesters: 3,
119-
samples: 30,
120-
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 30 samples and 0 metadata"),
121+
samples: 25,
122+
metadata: 5,
123+
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
121124
startTimestampMs: 123456789000,
122125
expectedMetrics: `
123126
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
@@ -138,7 +141,7 @@ func TestDistributor_Push(t *testing.T) {
138141
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil)
139142
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
140143

141-
request := makeWriteRequest(tc.startTimestampMs, tc.samples)
144+
request := makeWriteRequest(tc.startTimestampMs, tc.samples, tc.metadata)
142145
response, err := d.Push(ctx, request)
143146
assert.Equal(t, tc.expectedResponse, response)
144147
assert.Equal(t, tc.expectedError, err)
@@ -156,6 +159,7 @@ func TestDistributor_Push(t *testing.T) {
156159
func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
157160
type testPush struct {
158161
samples int
162+
metadata int
159163
expectedError error
160164
}
161165

@@ -172,10 +176,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
172176
ingestionRate: 10,
173177
ingestionBurstSize: 10,
174178
pushes: []testPush{
175-
{samples: 5, expectedError: nil},
179+
{samples: 4, expectedError: nil},
180+
{metadata: 1, expectedError: nil},
176181
{samples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 6 samples and 0 metadata")},
177-
{samples: 5, expectedError: nil},
182+
{samples: 4, metadata: 1, expectedError: nil},
178183
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 1 samples and 0 metadata")},
184+
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")},
179185
},
180186
},
181187
"global strategy: limit should be evenly shared across distributors": {
@@ -184,10 +190,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
184190
ingestionRate: 10,
185191
ingestionBurstSize: 5,
186192
pushes: []testPush{
187-
{samples: 3, expectedError: nil},
188-
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")},
193+
{samples: 2, expectedError: nil},
194+
{samples: 1, expectedError: nil},
195+
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")},
189196
{samples: 2, expectedError: nil},
190197
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
198+
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
191199
},
192200
},
193201
"global strategy: burst should set to each distributor": {
@@ -196,10 +204,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
196204
ingestionRate: 10,
197205
ingestionBurstSize: 20,
198206
pushes: []testPush{
199-
{samples: 15, expectedError: nil},
200-
{samples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 6 samples and 0 metadata")},
207+
{samples: 10, expectedError: nil},
208+
{samples: 5, expectedError: nil},
209+
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")},
201210
{samples: 5, expectedError: nil},
202211
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
212+
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
203213
},
204214
},
205215
}
@@ -234,7 +244,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
234244

235245
// Push samples in multiple requests to the first distributor
236246
for _, push := range testData.pushes {
237-
request := makeWriteRequest(0, push.samples)
247+
request := makeWriteRequest(0, push.samples, push.metadata)
238248
response, err := distributors[0].Push(ctx, request)
239249

240250
if push.expectedError == nil {
@@ -337,6 +347,7 @@ func TestDistributor_PushQuery(t *testing.T) {
337347
numIngesters int
338348
happyIngesters int
339349
samples int
350+
metadata int
340351
matchers []*labels.Matcher
341352
expectedResponse model.Matrix
342353
expectedError error
@@ -420,7 +431,7 @@ func TestDistributor_PushQuery(t *testing.T) {
420431
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil, nil)
421432
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
422433

423-
request := makeWriteRequest(0, tc.samples)
434+
request := makeWriteRequest(0, tc.samples, tc.metadata)
424435
writeResponse, err := d.Push(ctx, request)
425436
assert.Equal(t, &client.WriteResponse{}, writeResponse)
426437
assert.Nil(t, err)
@@ -750,7 +761,7 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cli
750761
},
751762
}
752763

753-
return client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API)
764+
return client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API)
754765
}
755766

756767
func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits, kvStore kv.Client) (*Distributor, []mockIngester) {
@@ -817,7 +828,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
817828
return d, ingesters
818829
}
819830

820-
func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest {
831+
func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *client.WriteRequest {
821832
request := &client.WriteRequest{}
822833
for i := 0; i < samples; i++ {
823834
ts := client.PreallocTimeseries{
@@ -837,6 +848,16 @@ func makeWriteRequest(startTimestampMs int64, samples int) *client.WriteRequest
837848
}
838849
request.Timeseries = append(request.Timeseries, ts)
839850
}
851+
852+
for i := 0; i < metadata; i++ {
853+
m := &client.MetricMetadata{
854+
MetricName: fmt.Sprintf("metric_%d", i),
855+
Type: client.COUNTER,
856+
Help: fmt.Sprintf("a help for metric_%d", i),
857+
}
858+
request.Metadata = append(request.Metadata, m)
859+
}
860+
840861
return request
841862
}
842863

@@ -1142,19 +1163,20 @@ func TestDistributorValidation(t *testing.T) {
11421163
future, past := now.Add(5*time.Hour), now.Add(-25*time.Hour)
11431164

11441165
for i, tc := range []struct {
1145-
labels []labels.Labels
1146-
samples []client.Sample
1147-
err error
1166+
metadata []*client.MetricMetadata
1167+
labels []labels.Labels
1168+
samples []client.Sample
1169+
err error
11481170
}{
11491171
// Test validation passes.
11501172
{
1151-
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
1173+
metadata: []*client.MetricMetadata{{MetricName: "testmetric", Help: "a test metric.", Unit: "", Type: client.COUNTER}},
1174+
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
11521175
samples: []client.Sample{{
11531176
TimestampMs: int64(now),
11541177
Value: 1,
11551178
}},
11561179
},
1157-
11581180
// Test validation fails for very old samples.
11591181
{
11601182
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
@@ -1196,6 +1218,16 @@ func TestDistributorValidation(t *testing.T) {
11961218
},
11971219
err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`),
11981220
},
1221+
// Test metadata validation fails
1222+
{
1223+
metadata: []*client.MetricMetadata{{MetricName: "", Help: "a test metric.", Unit: "", Type: client.COUNTER}},
1224+
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
1225+
samples: []client.Sample{{
1226+
TimestampMs: int64(now),
1227+
Value: 1,
1228+
}},
1229+
err: httpgrpc.Errorf(http.StatusBadRequest, `metadata missing metric name`),
1230+
},
11991231
} {
12001232
t.Run(strconv.Itoa(i), func(t *testing.T) {
12011233
var limits validation.Limits
@@ -1209,7 +1241,7 @@ func TestDistributorValidation(t *testing.T) {
12091241
d, _ := prepare(t, 3, 3, 0, true, &limits, nil)
12101242
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
12111243

1212-
_, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, client.API))
1244+
_, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, tc.metadata, client.API))
12131245
require.Equal(t, tc.err, err)
12141246
})
12151247
}

pkg/ingester/client/compat.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ import (
1818

1919
var json = jsoniter.ConfigCompatibleWithStandardLibrary
2020

21-
// ToWriteRequest converts matched slices of Labels and Samples into a WriteRequest proto.
21+
// ToWriteRequest converts matched slices of Labels, Samples and Metadata into a WriteRequest proto.
2222
// It gets timeseries from the pool, so ReuseSlice() should be called when done.
23-
func ToWriteRequest(lbls []labels.Labels, samples []Sample, source WriteRequest_SourceEnum) *WriteRequest {
23+
func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, source WriteRequest_SourceEnum) *WriteRequest {
2424
req := &WriteRequest{
2525
Timeseries: slicePool.Get().([]PreallocTimeseries),
26+
Metadata: metadata,
2627
Source: source,
2728
}
2829

pkg/ingester/ingester_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries, o
171171
// Append samples.
172172
for _, userID := range userIDs {
173173
ctx := user.InjectOrgID(context.Background(), userID)
174-
_, err := ing.Push(ctx, client.ToWriteRequest(matrixToLables(testData[userID]), matrixToSamples(testData[userID]), client.API))
174+
_, err := ing.Push(ctx, client.ToWriteRequest(matrixToLables(testData[userID]), matrixToSamples(testData[userID]), nil, client.API))
175175
require.NoError(t, err)
176176
}
177177

@@ -387,11 +387,11 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
387387

388388
// Append only one series first, expect no error.
389389
ctx := user.InjectOrgID(context.Background(), userID)
390-
_, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, client.API))
390+
_, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, nil, client.API))
391391
require.NoError(t, err)
392392

393393
// Append to two series, expect series-exceeded error.
394-
_, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, client.API))
394+
_, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, nil, client.API))
395395
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests {
396396
t.Fatalf("expected error about exceeding metrics per user, got %v", err)
397397
}
@@ -444,11 +444,11 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
444444

445445
// Append only one series first, expect no error.
446446
ctx := user.InjectOrgID(context.Background(), userID)
447-
_, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, client.API))
447+
_, err := ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1}, []client.Sample{sample1}, nil, client.API))
448448
require.NoError(t, err)
449449

450450
// Append to two series, expect series-exceeded error.
451-
_, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, client.API))
451+
_, err = ing.Push(ctx, client.ToWriteRequest([]labels.Labels{labels1, labels3}, []client.Sample{sample2, sample3}, nil, client.API))
452452
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests {
453453
t.Fatalf("expected error about exceeding series per metric, got %v", err)
454454
}
@@ -507,7 +507,7 @@ func TestIngesterValidation(t *testing.T) {
507507
},
508508
} {
509509
t.Run(tc.desc, func(t *testing.T) {
510-
_, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, client.API))
510+
_, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, nil, client.API))
511511
require.Equal(t, tc.err, err)
512512
})
513513
}
@@ -617,7 +617,7 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
617617
for i := range allSamples {
618618
allSamples[i].TimestampMs = int64(j + 1)
619619
}
620-
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API))
620+
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, nil, client.API))
621621
if !errorsExpected {
622622
require.NoError(b, err)
623623
}

pkg/ingester/ingester_v2_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ func TestIngester_v2Push(t *testing.T) {
6060
client.ToWriteRequest(
6161
[]labels.Labels{metricLabels},
6262
[]client.Sample{{Value: 1, TimestampMs: 9}},
63+
nil,
6364
client.API),
6465
client.ToWriteRequest(
6566
[]labels.Labels{metricLabels},
6667
[]client.Sample{{Value: 2, TimestampMs: 10}},
68+
nil,
6769
client.API),
6870
},
6971
expectedErr: nil,
@@ -96,10 +98,12 @@ func TestIngester_v2Push(t *testing.T) {
9698
client.ToWriteRequest(
9799
[]labels.Labels{metricLabels},
98100
[]client.Sample{{Value: 2, TimestampMs: 10}},
101+
nil,
99102
client.API),
100103
client.ToWriteRequest(
101104
[]labels.Labels{metricLabels},
102105
[]client.Sample{{Value: 1, TimestampMs: 9}},
106+
nil,
103107
client.API),
104108
},
105109
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrOutOfOrderSample, "series=%s, timestamp=%s", metricLabels.String(), model.Time(9).Time().Format(time.RFC3339Nano)), userID).Error()),
@@ -135,10 +139,12 @@ func TestIngester_v2Push(t *testing.T) {
135139
client.ToWriteRequest(
136140
[]labels.Labels{metricLabels},
137141
[]client.Sample{{Value: 2, TimestampMs: 1575043969}},
142+
nil,
138143
client.API),
139144
client.ToWriteRequest(
140145
[]labels.Labels{metricLabels},
141146
[]client.Sample{{Value: 1, TimestampMs: 1575043969 - (86400 * 1000)}},
147+
nil,
142148
client.API),
143149
},
144150
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrOutOfBounds, "series=%s, timestamp=%s", metricLabels.String(), model.Time(1575043969-(86400*1000)).Time().Format(time.RFC3339Nano)), userID).Error()),
@@ -174,10 +180,12 @@ func TestIngester_v2Push(t *testing.T) {
174180
client.ToWriteRequest(
175181
[]labels.Labels{metricLabels},
176182
[]client.Sample{{Value: 2, TimestampMs: 1575043969}},
183+
nil,
177184
client.API),
178185
client.ToWriteRequest(
179186
[]labels.Labels{metricLabels},
180187
[]client.Sample{{Value: 1, TimestampMs: 1575043969}},
188+
nil,
181189
client.API),
182190
},
183191
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(tsdb.ErrAmendSample, "series=%s, timestamp=%s", metricLabels.String(), model.Time(1575043969).Time().Format(time.RFC3339Nano)), userID).Error()),
@@ -297,6 +305,7 @@ func TestIngester_v2Push_ShouldHandleTheCaseTheCachedReferenceIsInvalid(t *testi
297305
req := client.ToWriteRequest(
298306
[]labels.Labels{metricLabels},
299307
[]client.Sample{{Value: float64(j), TimestampMs: int64(j)}},
308+
nil,
300309
client.API)
301310

302311
_, err := i.v2Push(ctx, req)
@@ -361,10 +370,12 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
361370
client.ToWriteRequest(
362371
[]labels.Labels{metricLabels},
363372
[]client.Sample{{Value: 1, TimestampMs: 9}},
373+
nil,
364374
client.API),
365375
client.ToWriteRequest(
366376
[]labels.Labels{metricLabels},
367377
[]client.Sample{{Value: 2, TimestampMs: 10}},
378+
nil,
368379
client.API),
369380
}
370381

@@ -989,7 +1000,7 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*cl
9891000
},
9901001
}
9911002

992-
req := client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API)
1003+
req := client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API)
9931004

9941005
// Generate the expected response
9951006
expectedQueryRes := &client.QueryResponse{

pkg/ingester/lifecycle_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func TestIngesterFlush(t *testing.T) {
366366
}
367367
)
368368
ctx := user.InjectOrgID(context.Background(), userID)
369-
_, err := ing.Push(ctx, client.ToWriteRequest(lbls, sampleData, client.API))
369+
_, err := ing.Push(ctx, client.ToWriteRequest(lbls, sampleData, nil, client.API))
370370
require.NoError(t, err)
371371

372372
// We add a 100ms sleep into the flush loop, such that we can reliably detect

pkg/ruler/compat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (a *appender) AddFast(l labels.Labels, ref uint64, t int64, v float64) erro
3636
}
3737

3838
func (a *appender) Commit() error {
39-
_, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, client.RULE))
39+
_, err := a.pusher.Push(user.InjectOrgID(context.Background(), a.userID), client.ToWriteRequest(a.labels, a.samples, nil, client.RULE))
4040
a.labels = nil
4141
a.samples = nil
4242
return err

0 commit comments

Comments
 (0)