Skip to content

Commit d397fe4

Browse files
committed
Address review comments
- Removed the checks around metadata/samples iteration. Don't need them anymore. - Changed the comment wrt WriteRequest samples/metadata handling. Signed-off-by: gotjosh <[email protected]>
1 parent 45b1f53 commit d397fe4

File tree

2 files changed

+79
-83
lines changed

2 files changed

+79
-83
lines changed

pkg/distributor/distributor.go

Lines changed: 78 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
377377
// Count the total number of metadata in.
378378
incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
379379

380-
// A WriteRequest can now include samples and metadata. However, at the moment of writing,
381-
// we can only receive samples and no metadata or metadata and no samples.
380+
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
382381
// For each timeseries or samples, we compute a hash to distribute across ingesters;
383382
// check each sample/metadata and discard if outside limits.
384383
validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
@@ -387,106 +386,102 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
387386
seriesKeys := make([]uint32, 0, len(req.Timeseries))
388387
validatedSamples := 0
389388

390-
if len(req.Timeseries) > 0 {
391-
if d.limits.AcceptHASamples(userID) {
392-
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
393-
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
394-
if err != nil {
395-
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() == 202 {
396-
// These samples have been deduped.
397-
dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
398-
}
389+
if len(req.Timeseries) > 0 && d.limits.AcceptHASamples(userID) {
390+
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
391+
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
392+
if err != nil {
393+
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && resp.GetCode() == 202 {
394+
// These samples have been deduped.
395+
dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
396+
}
399397

400-
// Ensure the request slice is reused if the series get deduped.
401-
client.ReuseSlice(req.Timeseries)
398+
// Ensure the request slice is reused if the series get deduped.
399+
client.ReuseSlice(req.Timeseries)
402400

403-
return nil, err
404-
}
405-
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
406-
if !removeReplica {
407-
nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
408-
}
401+
return nil, err
402+
}
403+
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
404+
if !removeReplica {
405+
nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
409406
}
407+
}
410408

411-
latestSampleTimestampMs := int64(0)
412-
defer func() {
413-
// Update this metric even in case of errors.
414-
if latestSampleTimestampMs > 0 {
415-
latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
416-
}
417-
}()
418-
419-
// For each timeseries, compute a hash to distribute across ingesters;
420-
// check each sample and discard if outside limits.
421-
for _, ts := range req.Timeseries {
422-
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
423-
if len(ts.Samples) > 0 {
424-
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
425-
}
409+
latestSampleTimestampMs := int64(0)
410+
defer func() {
411+
// Update this metric even in case of errors.
412+
if latestSampleTimestampMs > 0 {
413+
latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
414+
}
415+
}()
426416

427-
// If we found both the cluster and replica labels, we only want to include the cluster label when
428-
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
429-
// series we're trying to dedupe when HA tracking moves over to a different replica.
430-
if removeReplica {
431-
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
432-
}
417+
// For each timeseries, compute a hash to distribute across ingesters;
418+
// check each sample and discard if outside limits.
419+
for _, ts := range req.Timeseries {
420+
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
421+
if len(ts.Samples) > 0 {
422+
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
423+
}
433424

434-
for _, labelName := range d.limits.DropLabels(userID) {
435-
removeLabel(labelName, &ts.Labels)
436-
}
425+
// If we found both the cluster and replica labels, we only want to include the cluster label when
426+
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
427+
// series we're trying to dedupe when HA tracking moves over to a different replica.
428+
if removeReplica {
429+
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
430+
}
437431

438-
if len(ts.Labels) == 0 {
439-
continue
440-
}
432+
for _, labelName := range d.limits.DropLabels(userID) {
433+
removeLabel(labelName, &ts.Labels)
434+
}
441435

442-
// We rely on sorted labels in different places:
443-
// 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
444-
// different tokens, which is bad.
445-
// 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected
446-
// later in the validation phase, we ignore them here.
447-
sortLabelsIfNeeded(ts.Labels)
448-
449-
// Generate the sharding token based on the series labels without the HA replica
450-
// label and dropped labels (if any)
451-
key, err := d.tokenForLabels(userID, ts.Labels)
452-
if err != nil {
453-
return nil, err
454-
}
436+
if len(ts.Labels) == 0 {
437+
continue
438+
}
455439

456-
validatedSeries, err := d.validateSeries(ts, userID)
440+
// We rely on sorted labels in different places:
441+
// 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
442+
// different tokens, which is bad.
443+
// 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected
444+
// later in the validation phase, we ignore them here.
445+
sortLabelsIfNeeded(ts.Labels)
457446

458-
// Errors in validation are considered non-fatal, as one series in a request may contain
459-
// invalid data but all the remaining series could be perfectly valid.
460-
if err != nil && firstPartialErr == nil {
461-
firstPartialErr = err
462-
}
447+
// Generate the sharding token based on the series labels without the HA replica
448+
// label and dropped labels (if any)
449+
key, err := d.tokenForLabels(userID, ts.Labels)
450+
if err != nil {
451+
return nil, err
452+
}
463453

464-
// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
465-
if validatedSeries == emptyPreallocSeries {
466-
continue
467-
}
454+
validatedSeries, err := d.validateSeries(ts, userID)
468455

469-
seriesKeys = append(seriesKeys, key)
470-
validatedTimeseries = append(validatedTimeseries, validatedSeries)
471-
validatedSamples += len(ts.Samples)
456+
// Errors in validation are considered non-fatal, as one series in a request may contain
457+
// invalid data but all the remaining series could be perfectly valid.
458+
if err != nil && firstPartialErr == nil {
459+
firstPartialErr = err
460+
}
461+
462+
// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
463+
if validatedSeries == emptyPreallocSeries {
464+
continue
472465
}
473-
}
474466

475-
if len(req.Metadata) > 0 {
476-
for _, m := range req.Metadata {
477-
err := d.validateMetadata(m, userID)
467+
seriesKeys = append(seriesKeys, key)
468+
validatedTimeseries = append(validatedTimeseries, validatedSeries)
469+
validatedSamples += len(ts.Samples)
470+
}
478471

479-
if err != nil {
480-
if firstPartialErr == nil {
481-
firstPartialErr = err
482-
}
472+
for _, m := range req.Metadata {
473+
err := d.validateMetadata(m, userID)
483474

484-
continue
475+
if err != nil {
476+
if firstPartialErr == nil {
477+
firstPartialErr = err
485478
}
486479

487-
metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricName))
488-
validatedMetadata = append(validatedMetadata, m)
480+
continue
489481
}
482+
483+
metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricName))
484+
validatedMetadata = append(validatedMetadata, m)
490485
}
491486

492487
receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func TestDistributor_Push(t *testing.T) {
8383
numIngesters: 3,
8484
happyIngesters: 2,
8585
samples: 5,
86+
metadata: 5,
8687
expectedResponse: success,
8788
startTimestampMs: 123456789000,
8889
expectedMetrics: `

0 commit comments

Comments
 (0)