Skip to content

Commit a92b613

Browse files
pracuccipstibrany
andauthored
Add blocks storage consistency check in the querier (#2593)
* Added blocks storage consistency check in the querier Signed-off-by: Marco Pracucci <[email protected]> * Log each time a consistency check fails Signed-off-by: Marco Pracucci <[email protected]> * Log the trace ID too when logging consistency check failure Signed-off-by: Marco Pracucci <[email protected]> * Update pkg/querier/blocks_consistency_checker.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/querier/blocks_consistency_checker.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/querier/blocks_consistency_checker.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Update pkg/querier/blocks_consistency_checker.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> * Merged TimeMillisecond() and TimeToMillis() utility functions together Signed-off-by: Marco Pracucci <[email protected]> * Disabled consistency check by default Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]>
1 parent f738229 commit a92b613

19 files changed

+722
-231
lines changed

development/tsdb-blocks-storage-s3/config/prometheus.yaml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ scrape_configs:
3939
- targets: ['query-frontend:8007']
4040
labels:
4141
container: 'query-frontend'
42-
- job_name: cortex-store-gateway-1
43-
static_configs:
44-
- targets: ['store-gateway-1:8008']
45-
labels:
46-
container: 'store-gateway-1'
47-
- job_name: cortex-store-gateway-2
48-
static_configs:
49-
- targets: ['store-gateway-2:8009']
50-
labels:
51-
container: 'store-gateway-2'
42+
- job_name: cortex-store-gateway-1
43+
static_configs:
44+
- targets: ['store-gateway-1:8008']
45+
labels:
46+
container: 'store-gateway-1'
47+
- job_name: cortex-store-gateway-2
48+
static_configs:
49+
- targets: ['store-gateway-2:8009']
50+
labels:
51+
container: 'store-gateway-2'
5252

5353
remote_write:
5454
- url: http://distributor:8001/api/prom/push

docs/configuration/config-file-reference.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,19 @@ store_gateway_client:
664664
# TLS CA path for the client
665665
# CLI flag: -experimental.querier.store-gateway-client.tls-ca-path
666666
[tls_ca_path: <string> | default = ""]
667+
668+
# Configures the consistency check done by the querier on queried blocks when
669+
# running the experimental blocks storage.
670+
blocks_consistency_check:
671+
# Whether the querier should run a consistency check to ensure all expected
672+
# blocks have been queried.
673+
# CLI flag: -experimental.querier.blocks-consistency-check.enabled
674+
[enabled: <boolean> | default = false]
675+
676+
# The grace period allowed before a new block is included in the consistency
677+
# check.
678+
# CLI flag: -experimental.querier.blocks-consistency-check.upload-grace-period
679+
[upload_grace_period: <duration> | default = 1h]
667680
```
668681

669682
### `query_frontend_config`
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package querier
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"time"
7+
8+
"github.com/oklog/ulid"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/thanos-io/thanos/pkg/block/metadata"
12+
"github.com/thanos-io/thanos/pkg/store/hintspb"
13+
)
14+
15+
type BlocksConsistencyChecker struct {
16+
uploadGracePeriod time.Duration
17+
deletionGracePeriod time.Duration
18+
19+
checksTotal prometheus.Counter
20+
checksFailed prometheus.Counter
21+
}
22+
23+
func NewBlocksConsistencyChecker(uploadGracePeriod, deletionGracePeriod time.Duration, reg prometheus.Registerer) *BlocksConsistencyChecker {
24+
return &BlocksConsistencyChecker{
25+
uploadGracePeriod: uploadGracePeriod,
26+
deletionGracePeriod: deletionGracePeriod,
27+
checksTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
28+
Name: "cortex_querier_blocks_consistency_checks_total",
29+
Help: "Total number of consistency checks run on queried blocks.",
30+
}),
31+
checksFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
32+
Name: "cortex_querier_blocks_consistency_checks_failed_total",
33+
Help: "Total number of consistency checks failed on queried blocks.",
34+
}),
35+
}
36+
}
37+
38+
func (c *BlocksConsistencyChecker) Check(expectedBlocks []ulid.ULID, knownDeletionMarks map[ulid.ULID]*metadata.DeletionMark, queriedBlocks map[string][]hintspb.Block) error {
39+
c.checksTotal.Inc()
40+
41+
// Reverse the map of queried blocks, so that we can easily look for missing ones
42+
// while keeping the information about which store-gateways have already been queried
43+
// for that block.
44+
actualBlocks := map[string][]string{}
45+
for gatewayAddr, blocks := range queriedBlocks {
46+
for _, b := range blocks {
47+
actualBlocks[b.Id] = append(actualBlocks[b.Id], gatewayAddr)
48+
}
49+
}
50+
51+
// Look for any missing block.
52+
missingBlocks := map[string][]string{}
53+
var missingBlockIDs []string
54+
55+
for _, blockID := range expectedBlocks {
56+
// Some recently uploaded blocks, already discovered by the querier, may not have been discovered
57+
// and loaded by the store-gateway yet. In order to avoid false positives, we grant some time
58+
// to the store-gateway to discover them. It's safe to exclude recently uploaded blocks because:
59+
// - Blocks uploaded by ingesters: we will continue querying them from ingesters for a while (depends
60+
// on the configured retention period).
61+
// - Blocks uploaded by compactor: the source blocks are marked for deletion but will continue to be
62+
// queried by store-gateways for a while (depends on the configured deletion marks delay).
63+
if ulid.Now()-blockID.Time() < uint64(c.uploadGracePeriod.Milliseconds()) {
64+
continue
65+
}
66+
67+
// The store-gateway may offload blocks before the querier. If that happens, the querier will run a consistency check
68+
// on blocks that can't be queried because they were offloaded. For this reason, we don't run the consistency check on any block
69+
// which has been marked for deletion more then "grace period" time ago. Basically, the grace period is the time
70+
// we still expect a block marked for deletion to be still queried.
71+
if mark := knownDeletionMarks[blockID]; mark != nil {
72+
if time.Since(time.Unix(mark.DeletionTime, 0)) > c.deletionGracePeriod {
73+
continue
74+
}
75+
}
76+
77+
id := blockID.String()
78+
if gatewayAddrs, ok := actualBlocks[id]; !ok {
79+
missingBlocks[id] = gatewayAddrs
80+
missingBlockIDs = append(missingBlockIDs, id)
81+
}
82+
}
83+
84+
if len(missingBlocks) == 0 {
85+
return nil
86+
}
87+
88+
c.checksFailed.Inc()
89+
return fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(missingBlockIDs, " "))
90+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package querier
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/oklog/ulid"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/testutil"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/thanos-io/thanos/pkg/block/metadata"
13+
"github.com/thanos-io/thanos/pkg/store/hintspb"
14+
15+
"github.com/cortexproject/cortex/pkg/util"
16+
)
17+
18+
func TestBlocksConsistencyChecker_Check(t *testing.T) {
19+
now := time.Now()
20+
uploadGracePeriod := 10 * time.Minute
21+
deletionGracePeriod := 5 * time.Minute
22+
23+
block1 := ulid.MustNew(uint64(util.TimeToMillis(now.Add(-uploadGracePeriod*2))), nil)
24+
block2 := ulid.MustNew(uint64(util.TimeToMillis(now.Add(-uploadGracePeriod*3))), nil)
25+
block3 := ulid.MustNew(uint64(util.TimeToMillis(now.Add(-uploadGracePeriod*4))), nil)
26+
blockRecentlyUploaded := ulid.MustNew(uint64(util.TimeToMillis(now)), nil)
27+
28+
tests := map[string]struct {
29+
expectedBlocks []ulid.ULID
30+
knownDeletionMarks map[ulid.ULID]*metadata.DeletionMark
31+
queriedBlocks map[string][]hintspb.Block
32+
expectedErr error
33+
}{
34+
"no expected blocks": {
35+
expectedBlocks: []ulid.ULID{},
36+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
37+
queriedBlocks: map[string][]hintspb.Block{},
38+
},
39+
"all expected blocks have been queried from a single store-gateway": {
40+
expectedBlocks: []ulid.ULID{block1, block2},
41+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
42+
queriedBlocks: map[string][]hintspb.Block{
43+
"1.1.1.1": {{Id: block1.String()}, {Id: block2.String()}},
44+
},
45+
},
46+
"all expected blocks have been queried from multiple store-gateway": {
47+
expectedBlocks: []ulid.ULID{block1, block2},
48+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
49+
queriedBlocks: map[string][]hintspb.Block{
50+
"1.1.1.1": {{Id: block1.String()}},
51+
"2.2.2.2": {{Id: block2.String()}},
52+
},
53+
},
54+
"store-gateway has queried more blocks than expected": {
55+
expectedBlocks: []ulid.ULID{block1, block2},
56+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
57+
queriedBlocks: map[string][]hintspb.Block{
58+
"1.1.1.1": {{Id: block1.String()}},
59+
"2.2.2.2": {{Id: block2.String()}, {Id: block3.String()}},
60+
},
61+
},
62+
"store-gateway has queried less blocks than expected": {
63+
expectedBlocks: []ulid.ULID{block1, block2, block3},
64+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
65+
queriedBlocks: map[string][]hintspb.Block{
66+
"1.1.1.1": {{Id: block1.String()}},
67+
"2.2.2.2": {{Id: block3.String()}},
68+
},
69+
expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block2.String()),
70+
},
71+
"store-gateway has queried less blocks than expected, but the missing block has been recently uploaded": {
72+
expectedBlocks: []ulid.ULID{block1, block2, blockRecentlyUploaded},
73+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{},
74+
queriedBlocks: map[string][]hintspb.Block{
75+
"1.1.1.1": {{Id: block1.String()}},
76+
"2.2.2.2": {{Id: block2.String()}},
77+
},
78+
},
79+
"store-gateway has queried less blocks than expected and the missing block has been recently marked for deletion": {
80+
expectedBlocks: []ulid.ULID{block1, block2, block3},
81+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{
82+
block3: {DeletionTime: now.Add(-deletionGracePeriod / 2).Unix()},
83+
},
84+
queriedBlocks: map[string][]hintspb.Block{
85+
"1.1.1.1": {{Id: block1.String()}},
86+
"2.2.2.2": {{Id: block2.String()}},
87+
},
88+
expectedErr: fmt.Errorf("consistency check failed because some blocks were not queried: %s", block3.String()),
89+
},
90+
"store-gateway has queried less blocks than expected and the missing block has been marked for deletion long time ago": {
91+
expectedBlocks: []ulid.ULID{block1, block2, block3},
92+
knownDeletionMarks: map[ulid.ULID]*metadata.DeletionMark{
93+
block3: {DeletionTime: now.Add(-deletionGracePeriod * 2).Unix()},
94+
},
95+
queriedBlocks: map[string][]hintspb.Block{
96+
"1.1.1.1": {{Id: block1.String()}},
97+
"2.2.2.2": {{Id: block2.String()}},
98+
},
99+
},
100+
}
101+
102+
for testName, testData := range tests {
103+
t.Run(testName, func(t *testing.T) {
104+
reg := prometheus.NewPedanticRegistry()
105+
c := NewBlocksConsistencyChecker(uploadGracePeriod, deletionGracePeriod, reg)
106+
107+
err := c.Check(testData.expectedBlocks, testData.knownDeletionMarks, testData.queriedBlocks)
108+
assert.Equal(t, testData.expectedErr, err)
109+
assert.Equal(t, float64(1), testutil.ToFloat64(c.checksTotal))
110+
111+
if testData.expectedErr != nil {
112+
assert.Equal(t, float64(1), testutil.ToFloat64(c.checksFailed))
113+
} else {
114+
assert.Equal(t, float64(0), testutil.ToFloat64(c.checksFailed))
115+
}
116+
})
117+
}
118+
}

0 commit comments

Comments
 (0)