Skip to content

Commit d8af0ab

Browse files
thorfourgouthamve
authored andcommitted
Feat/blocks (#1695)
* ingester: v2 add tsdb block storage per user Signed-off-by: Thor <[email protected]> * ingester: transfer tsdb on shutdown Signed-off-by: Thor <[email protected]> * querier: query s3 tsdb blocks Signed-off-by: Thor <[email protected]> * vendor Signed-off-by: Thor <[email protected]> * ingester.v2.enable Signed-off-by: Thor <[email protected]> * split v2 into separate file Signed-off-by: Thor <[email protected]> * log error Signed-off-by: Thor <[email protected]> * check for creation of db in-between locks Signed-off-by: Thor <[email protected]> * anchor regexp Signed-off-by: Thor <[email protected]> * configurable ship interval Signed-off-by: Thor <[email protected]> * fixed transfer_test flake Signed-off-by: Thor <[email protected]> * skip store init when v2 enabled Signed-off-by: Thor <[email protected]> * s3insecure flag Signed-off-by: Thor <[email protected]> * ignore eof errors on block sync Signed-off-by: Thor <[email protected]> * moved transfer function to common wrapper Signed-off-by: Thor <[email protected]> * don't stop stores if nil Signed-off-by: Thor <[email protected]> * use TSDB base dir for querier directories Signed-off-by: Thor <[email protected]> * ignore table manager in v2 Signed-off-by: Thor <[email protected]> * Refactored TSDB config file structure and CLI flags Signed-off-by: Thor <[email protected]> * fixed lint errors, mark flags as experimental Signed-off-by: Thor <[email protected]> * use separate directory for syncing block data Signed-off-by: Thor <[email protected]> * fixed unit test with new config changes Signed-off-by: Thor <[email protected]> * sync-dir -> sync_dir Signed-off-by: Thor <[email protected]> * validate configs Signed-off-by: Thor <[email protected]> * fixed rebase overflow changes Signed-off-by: Thor <[email protected]>
1 parent a74df64 commit d8af0ab

File tree

227 files changed

+55761
-176
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

227 files changed

+55761
-176
lines changed

go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,18 @@ require (
88
github.com/Azure/go-autorest v11.5.1+incompatible // indirect
99
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
1010
github.com/NYTimes/gziphandler v1.1.1
11+
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4
1112
github.com/aws/aws-sdk-go v1.23.12
1213
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
1314
github.com/blang/semver v3.5.0+incompatible
1415
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
1516
github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668
1617
github.com/cenkalti/backoff v1.0.0 // indirect
1718
github.com/cespare/xxhash v1.1.0
18-
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
1919
github.com/coreos/go-semver v0.3.0 // indirect
2020
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d // indirect
2121
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
2222
github.com/cznic/ql v1.2.0 // indirect
23-
github.com/dustin/go-humanize v1.0.0 // indirect
2423
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
2524
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
2625
github.com/fsouza/fake-gcs-server v1.3.0
@@ -48,6 +47,7 @@ require (
4847
github.com/lib/pq v1.0.0
4948
github.com/mattes/migrate v1.3.1
5049
github.com/mattn/go-sqlite3 v1.10.0 // indirect
50+
github.com/oklog/ulid v1.3.1
5151
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02
5252
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
5353
github.com/opentracing/opentracing-go v1.1.0
@@ -62,11 +62,10 @@ require (
6262
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
6363
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
6464
github.com/stretchr/testify v1.4.0
65+
github.com/thanos-io/thanos v0.7.0
6566
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
6667
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
67-
github.com/uber-go/atomic v1.3.2 // indirect
6868
github.com/uber/jaeger-client-go v2.16.0+incompatible
69-
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
7069
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
7170
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4
7271
github.com/weaveworks/promrus v1.2.0 // indirect

go.sum

Lines changed: 58 additions & 2 deletions
Large diffs are not rendered by default.

pkg/chunk/storage/factory.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ import (
1818
"github.com/pkg/errors"
1919
)
2020

21+
// Supported storage engines
22+
const (
23+
StorageEngineChunks = "chunks"
24+
StorageEngineTSDB = "tsdb"
25+
)
26+
2127
// StoreLimits helps get Limits specific to Queries for Stores
2228
type StoreLimits interface {
2329
CardinalityLimit(userID string) int
@@ -27,6 +33,7 @@ type StoreLimits interface {
2733

2834
// Config chooses which storage client to use.
2935
type Config struct {
36+
Engine string `yaml:"engine"`
3037
AWSStorageConfig aws.StorageConfig `yaml:"aws"`
3138
GCPStorageConfig gcp.Config `yaml:"bigtable"`
3239
GCSConfig gcp.GCSConfig `yaml:"gcs"`
@@ -48,10 +55,20 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4855
cfg.BoltDBConfig.RegisterFlags(f)
4956
cfg.FSConfig.RegisterFlags(f)
5057

58+
f.StringVar(&cfg.Engine, "store.engine", "chunks", "The storage engine to use: chunks or tsdb. Be aware tsdb is experimental and shouldn't be used in production.")
5159
cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f)
5260
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
5361
}
5462

63+
// Validate config and returns error on failure
64+
func (cfg *Config) Validate() error {
65+
if cfg.Engine != StorageEngineChunks && cfg.Engine != StorageEngineTSDB {
66+
return errors.New("unsupported storage engine")
67+
}
68+
69+
return nil
70+
}
71+
5572
// NewStore makes the storage clients based on the configuration.
5673
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
5774
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)

pkg/cortex/cortex.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cortexproject/cortex/pkg/querier/queryrange"
2929
"github.com/cortexproject/cortex/pkg/ring"
3030
"github.com/cortexproject/cortex/pkg/ruler"
31+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
3132
"github.com/cortexproject/cortex/pkg/util"
3233
"github.com/cortexproject/cortex/pkg/util/validation"
3334
)
@@ -71,6 +72,7 @@ type Config struct {
7172
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
7273
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
7374
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
75+
TSDB tsdb.Config `yaml:"tsdb"`
7476

7577
Ruler ruler.Config `yaml:"ruler,omitempty"`
7678
ConfigDB db.Config `yaml:"configdb,omitempty"`
@@ -103,6 +105,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
103105
c.QueryRange.RegisterFlags(f)
104106
c.TableManager.RegisterFlags(f)
105107
c.Encoding.RegisterFlags(f)
108+
c.TSDB.RegisterFlags(f)
106109

107110
c.Ruler.RegisterFlags(f)
108111
c.ConfigDB.RegisterFlags(f)
@@ -122,6 +125,14 @@ func (c *Config) Validate() error {
122125
if err := c.Encoding.Validate(); err != nil {
123126
return errors.Wrap(err, "invalid encoding config")
124127
}
128+
129+
if err := c.Storage.Validate(); err != nil {
130+
return errors.Wrap(err, "invalid storage config")
131+
}
132+
133+
if err := c.TSDB.Validate(); err != nil {
134+
return errors.Wrap(err, "invalid TSDB config")
135+
}
125136
return nil
126137
}
127138

pkg/cortex/modules.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/prometheus/common/route"
1313
"github.com/prometheus/prometheus/config"
1414
v1 "github.com/prometheus/prometheus/web/api/v1"
15+
"github.com/thanos-io/thanos/pkg/objstore/s3"
1516
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
1617
"github.com/weaveworks/common/middleware"
1718
"github.com/weaveworks/common/server"
@@ -194,7 +195,25 @@ func (t *Cortex) initQuerier(cfg *Config) (err error) {
194195
return
195196
}
196197

197-
queryable, engine := querier.New(cfg.Querier, t.distributor, t.store)
198+
var store querier.ChunkStore
199+
200+
if cfg.Storage.Engine == storage.StorageEngineTSDB {
201+
s3cfg := s3.Config{
202+
Bucket: cfg.TSDB.S3.BucketName,
203+
Endpoint: cfg.TSDB.S3.Endpoint,
204+
AccessKey: cfg.TSDB.S3.AccessKeyID,
205+
SecretKey: cfg.TSDB.S3.SecretAccessKey,
206+
Insecure: cfg.TSDB.S3.Insecure,
207+
}
208+
store, err = querier.NewBlockQuerier(s3cfg, cfg.TSDB.SyncDir, prometheus.DefaultRegisterer)
209+
if err != nil {
210+
return err
211+
}
212+
} else {
213+
store = t.store
214+
}
215+
216+
queryable, engine := querier.New(cfg.Querier, t.distributor, store)
198217
api := v1.NewAPI(
199218
engine,
200219
queryable,
@@ -229,6 +248,9 @@ func (t *Cortex) stopQuerier() error {
229248

230249
func (t *Cortex) initIngester(cfg *Config) (err error) {
231250
cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
251+
cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB
252+
cfg.Ingester.TSDBConfig = cfg.TSDB
253+
232254
t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
233255
if err != nil {
234256
return
@@ -247,6 +269,9 @@ func (t *Cortex) stopIngester() error {
247269
}
248270

249271
func (t *Cortex) initStore(cfg *Config) (err error) {
272+
if cfg.Storage.Engine == storage.StorageEngineTSDB {
273+
return nil
274+
}
250275
err = cfg.Schema.Load()
251276
if err != nil {
252277
return
@@ -257,7 +282,9 @@ func (t *Cortex) initStore(cfg *Config) (err error) {
257282
}
258283

259284
func (t *Cortex) stopStore() error {
260-
t.store.Stop()
285+
if t.store != nil {
286+
t.store.Stop()
287+
}
261288
return nil
262289
}
263290

@@ -287,6 +314,10 @@ func (t *Cortex) stopQueryFrontend() (err error) {
287314
}
288315

289316
func (t *Cortex) initTableManager(cfg *Config) error {
317+
if cfg.Storage.Engine == storage.StorageEngineTSDB {
318+
return nil // table manager isn't used in v2
319+
}
320+
290321
err := cfg.Schema.Load()
291322
if err != nil {
292323
return err
@@ -325,7 +356,10 @@ func (t *Cortex) initTableManager(cfg *Config) error {
325356
}
326357

327358
func (t *Cortex) stopTableManager() error {
328-
t.tableManager.Stop()
359+
if t.tableManager != nil {
360+
t.tableManager.Stop()
361+
}
362+
329363
return nil
330364
}
331365

pkg/ingester/bucket.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ingester
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"strings"
7+
8+
"github.com/thanos-io/thanos/pkg/objstore"
9+
"golang.org/x/net/context"
10+
)
11+
12+
// Bucket is a wrapper around a objstore.Bucket that prepends writes with a userID
13+
type Bucket struct {
14+
UserID string
15+
Bucket objstore.Bucket
16+
}
17+
18+
func (b *Bucket) fullName(name string) string {
19+
return fmt.Sprintf("%s/%s", b.UserID, name)
20+
}
21+
22+
// Close implements io.Closer
23+
func (b *Bucket) Close() error { return b.Bucket.Close() }
24+
25+
// Upload the contents of the reader as an object into the bucket.
26+
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
27+
return b.Bucket.Upload(ctx, b.fullName(name), r)
28+
}
29+
30+
// Delete removes the object with the given name.
31+
func (b *Bucket) Delete(ctx context.Context, name string) error {
32+
return b.Bucket.Delete(ctx, b.fullName(name))
33+
}
34+
35+
// Name returns the bucket name for the provider.
36+
func (b *Bucket) Name() string { return b.Bucket.Name() }
37+
38+
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
39+
// object name including the prefix of the inspected directory.
40+
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
41+
return b.Bucket.Iter(ctx, b.fullName(dir), func(s string) error {
42+
/*
43+
Since all objects are prefixed with the userID we need to strip the userID
44+
upon passing to the processing function
45+
*/
46+
return f(strings.Join(strings.Split(s, "/")[1:], "/"))
47+
})
48+
}
49+
50+
// Get returns a reader for the given object name.
51+
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
52+
return b.Bucket.Get(ctx, b.fullName(name))
53+
}
54+
55+
// GetRange returns a new range reader for the given object name and range.
56+
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
57+
return b.Bucket.GetRange(ctx, b.fullName(name), off, length)
58+
}
59+
60+
// Exists checks if the given object exists in the bucket.
61+
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
62+
return b.Bucket.Exists(ctx, b.fullName(name))
63+
}
64+
65+
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
66+
func (b *Bucket) IsObjNotFoundErr(err error) bool {
67+
return b.Bucket.IsObjNotFoundErr(err)
68+
}

0 commit comments

Comments
 (0)