diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 58e23e8556f..4e3142c5e50 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -34,9 +34,9 @@ type Schema interface { GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) // Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether. - GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) + // returns cache key string and []IndexEntry per bucket, matched in order + GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) - GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) @@ -97,17 +97,31 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN return result, nil } -func (s schema) GetLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { - var result []IndexEntry +// returns cache key string and []IndexEntry per bucket, matched in order +func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) { + var keys []string + var indexEntries [][]IndexEntry for _, bucket := range s.buckets(from, through, userID) { + key := strings.Join([]string{ + bucket.tableName, + bucket.hashKey, + string(labelsSeriesID(labels)), + }, + "-", + ) + // This is just encoding to remove invalid characters so that we can put them in memcache. + // We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID) + key = hex.EncodeToString([]byte(key)) + keys = append(keys, key) + entries, err := s.entries.GetLabelWriteEntries(bucket, metricName, labels, chunkID) if err != nil { - return nil, err + return nil, nil, err } - result = append(result, entries...) + indexEntries = append(indexEntries, entries) } - return result, nil + return keys, indexEntries, nil } func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { @@ -124,27 +138,6 @@ func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, me } -// Should only used for v9Schema -func (s schema) GetLabelEntryCacheKeys(from, through model.Time, userID string, labels labels.Labels) []string { - var result []string - for _, bucket := range s.buckets(from, through, userID) { - key := strings.Join([]string{ - bucket.tableName, - bucket.hashKey, - string(labelsSeriesID(labels)), - }, - "-", - ) - // This is just encoding to remove invalid characters so that we can put them in memcache. - // We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID) - key = hex.EncodeToString([]byte(key)) - - result = append(result, key) - } - - return result -} - func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) { var result []IndexQuery diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index af7343bbfda..11ac736a144 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -372,23 +372,25 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []IndexEntry{} - keysToCache := []string{} metricName := chunk.Metric.Get(labels.MetricName) if metricName == "" { return nil, nil, fmt.Errorf("no MetricNameLabel for chunk") } - keys := c.schema.GetLabelEntryCacheKeys(from, through, chunk.UserID, chunk.Metric) + keys, labelEntries, err := c.schema.GetCacheKeysAndLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) + if err != nil { + return nil, nil, err + } _, _, missing := c.writeDedupeCache.Fetch(context.Background(), keys) - if len(missing) != 0 { - labelEntries, err := c.schema.GetLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) - if err != nil { - return nil, nil, err + // keys and labelEntries are matched in order, but Fetch() may + // return missing keys in any order so check against all of them. + for _, missingKey := range missing { + for i, key := range keys { + if key == missingKey { + entries = append(entries, labelEntries[i]...) + } } - - entries = append(entries, labelEntries...) - keysToCache = missing } chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) @@ -410,5 +412,5 @@ func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chun } } - return result, keysToCache, nil + return result, missing, nil }