Skip to content

Chunk time ranges are inclusive. #1576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestChunkStoreRandom(t *testing.T) {
},
chunks[0],
ts,
ts.Add(chunkLen*time.Second),
ts.Add(chunkLen*time.Second).Add(-1*time.Second),
)
err := chunk.Encode()
require.NoError(t, err)
Expand All @@ -602,8 +602,8 @@ func TestChunkStoreRandom(t *testing.T) {

// pick two random numbers and do a query
for i := 0; i < 100; i++ {
start := rand.Int63n(100 * chunkLen)
end := start + rand.Int63n((100*chunkLen)-start)
start := rand.Int63n(99 * chunkLen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of the list of chunks there are no more chunks. It simplifies the test logic below to ensuring we never query over the end of the list of chunks.

end := start + 1 + rand.Int63n((99*chunkLen)-start)
assert.True(t, start < end)

startTime := model.TimeFromUnix(start)
Expand Down
97 changes: 39 additions & 58 deletions pkg/chunk/schema_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,61 @@ type schemaCaching struct {
}

func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
})
}

func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
})
}

func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
})
}

// If the query resulted in series IDs, use this method to find chunks.
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetChunksForSeries(from, through, userID, seriesID)
})
}

func splitTimesByCacheability(from, through model.Time, cacheBefore model.Time) (model.Time, model.Time, model.Time, model.Time) {
func (s *schemaCaching) splitTimesByCacheability(from, through model.Time, f func(from, through model.Time) ([]IndexQuery, error)) ([]IndexQuery, error) {
var (
cacheableQueries []IndexQuery
activeQueries []IndexQuery
err error
cacheBefore = model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
)

if from.After(cacheBefore) {
return 0, 0, from, through
}
activeQueries, err = f(from, through)
if err != nil {
return nil, err
}
} else if through.Before(cacheBefore) {
cacheableQueries, err = f(from, through)
if err != nil {
return nil, err
}
} else {
cacheableQueries, err = f(from, cacheBefore)
if err != nil {
return nil, err
}

if through.Before(cacheBefore) {
return from, through, 0, 0
activeQueries, err = f(cacheBefore, through)
if err != nil {
return nil, err
}
}

return from, cacheBefore, cacheBefore, through
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}

func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery {
Expand Down
26 changes: 12 additions & 14 deletions pkg/chunk/schema_caching_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package chunk

import (
"strconv"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/mtime"
)
Expand All @@ -26,7 +28,7 @@ func TestCachingSchema(t *testing.T) {

mtime.NowForce(baseTime)

for _, tc := range []struct {
for i, tc := range []struct {
from, through time.Time

cacheableIdx int
Expand Down Expand Up @@ -56,20 +58,16 @@ func TestCachingSchema(t *testing.T) {
0,
},
} {
have, err := schema.GetReadQueriesForMetric(
model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()),
userID, "foo",
)
if err != nil {
t.Fatal(err)
}
t.Run(strconv.Itoa(i), func(t *testing.T) {
have, err := schema.GetReadQueriesForMetric(
model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()),
userID, "foo",
)
require.NoError(t, err)

for i := range have {
if i <= tc.cacheableIdx {
require.True(t, have[i].Immutable)
} else {
require.False(t, have[i].Immutable)
for i := range have {
assert.Equal(t, have[i].Immutable, i <= tc.cacheableIdx, i)
}
}
})
}
}
10 changes: 0 additions & 10 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,6 @@ func (cfg *PeriodConfig) hourlyBuckets(from, through model.Time, userID string)
result = []Bucket{}
)

// If through ends on the hour, don't include the upcoming hour
if through.Unix()%secondsInHour == 0 {
throughHour--
}

for i := fromHour; i <= throughHour; i++ {
relativeFrom := util.Max64(0, int64(from)-(i*millisecondsInHour))
relativeThrough := util.Min64(millisecondsInHour, int64(through)-(i*millisecondsInHour))
Expand All @@ -284,11 +279,6 @@ func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) [
result = []Bucket{}
)

// If through ends on 00:00 of the day, don't include the upcoming day
if through.Unix()%secondsInDay == 0 {
throughDay--
}

for i := fromDay; i <= throughDay; i++ {
// The idea here is that the hash key contains the bucket start time (rounded to
// the nearest day). The range key can contain the offset from that, to the
Expand Down
36 changes: 27 additions & 9 deletions pkg/chunk/schema_config_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package chunk

import (
"reflect"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -34,7 +34,12 @@ func TestHourlyBuckets(t *testing.T) {
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0),
},
[]Bucket{},
[]Bucket{{
from: 0,
through: 0,
tableName: "table",
hashKey: "0:0",
}},
},
{
"30 minute window",
Expand All @@ -60,6 +65,11 @@ func TestHourlyBuckets(t *testing.T) {
through: 3600 * 1000, // ms
tableName: "table",
hashKey: "0:0",
}, {
from: 0,
through: 0, // ms
tableName: "table",
hashKey: "0:1",
}},
},
{
Expand Down Expand Up @@ -88,9 +98,8 @@ func TestHourlyBuckets(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := cfg.hourlyBuckets(tt.args.from, tt.args.through, userID); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SchemaConfig.dailyBuckets() = %v, want %v", got, tt.want)
}
got := cfg.hourlyBuckets(tt.args.from, tt.args.through, userID)
assert.Equal(t, tt.want, got)
})
}
}
Expand Down Expand Up @@ -120,7 +129,12 @@ func TestDailyBuckets(t *testing.T) {
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0),
},
[]Bucket{},
[]Bucket{{
from: 0,
through: 0,
tableName: "table",
hashKey: "0:d0",
}},
},
{
"6 hour window",
Expand All @@ -146,6 +160,11 @@ func TestDailyBuckets(t *testing.T) {
through: (24 * 3600) * 1000, // ms
tableName: "table",
hashKey: "0:d0",
}, {
from: 0,
through: 0,
tableName: "table",
hashKey: "0:d1",
}},
},
{
Expand Down Expand Up @@ -174,9 +193,8 @@ func TestDailyBuckets(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := cfg.dailyBuckets(tt.args.from, tt.args.through, userID); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SchemaConfig.dailyBuckets() = %v, want %v", got, tt.want)
}
got := cfg.dailyBuckets(tt.args.from, tt.args.through, userID)
assert.Equal(t, tt.want, got)
})
}
}
Expand Down