4
4
"bytes"
5
5
"encoding/base64"
6
6
"encoding/json"
7
+ "flag"
7
8
"fmt"
8
9
"sort"
9
10
"strconv"
@@ -21,6 +22,7 @@ import (
21
22
"golang.org/x/net/context"
22
23
23
24
"github.com/weaveworks/cortex/user"
25
+ "github.com/weaveworks/cortex/util"
24
26
)
25
27
26
28
const (
@@ -89,20 +91,28 @@ type Store interface {
89
91
90
92
// StoreConfig specifies config for a ChunkStore
91
93
type StoreConfig struct {
92
- S3 S3Client
93
- BucketName string
94
- DynamoDB DynamoDBClient
95
- TableName string
96
- ChunkCache * Cache
94
+ PeriodicTableConfig
95
+ CacheConfig
96
+ S3 S3ClientValue
97
+ DynamoDB DynamoDBClientValue
97
98
98
99
// After midnight on this day, we start bucketing indexes by day instead of by
99
100
// hour. Only the day matters, not the time within the day.
100
- DailyBucketsFrom model. Time
101
+ DailyBucketsFrom util. DayValue
101
102
102
103
// After this time, we will only query for base64-encoded label values.
103
- Base64ValuesFrom model.Time
104
+ Base64ValuesFrom util.DayValue
105
+ }
104
106
105
- PeriodicTableConfig
107
+ // RegisterFlags adds the flags required to config this to the given FlagSet
108
+ func (cfg * StoreConfig ) RegisterFlags (f * flag.FlagSet ) {
109
+ cfg .PeriodicTableConfig .RegisterFlags (f )
110
+ cfg .CacheConfig .RegisterFlags (f )
111
+
112
+ f .Var (& cfg .S3 , "s3.url" , "S3 endpoint URL." )
113
+ f .Var (& cfg .DynamoDB , "dynamodb.url" , "DynamoDB endpoint URL." )
114
+ f .Var (& cfg .DailyBucketsFrom , "dynamodb.daily-buckets-from" , "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized." )
115
+ f .Var (& cfg .Base64ValuesFrom , "dynamodb.base64-buckets-from" , "The date in the format YYYY-MM-DD after which we will stop querying to non-base64 encoded values." )
106
116
}
107
117
108
118
// PeriodicTableConfig for the use of periodic tables (ie, weekly talbes). Can
@@ -112,20 +122,29 @@ type PeriodicTableConfig struct {
112
122
UsePeriodicTables bool
113
123
TablePrefix string
114
124
TablePeriod time.Duration
115
- PeriodicTableStartAt time.Time
125
+ PeriodicTableStartAt util.DayValue
126
+ }
127
+
128
+ // RegisterFlags adds the flags required to config this to the given FlagSet
129
+ func (cfg * PeriodicTableConfig ) RegisterFlags (f * flag.FlagSet ) {
130
+ f .BoolVar (& cfg .UsePeriodicTables , "dynamodb.use-periodic-tables" , true , "Should we user periodic tables." )
131
+ f .StringVar (& cfg .TablePrefix , "dynamodb.periodic-table.prefix" , "cortex_" , "DynamoDB table prefix for the periodic tables." )
132
+ f .DurationVar (& cfg .TablePeriod , "dynamodb.periodic-table.period" , 7 * 24 * time .Hour , "DynamoDB periodic tables period." )
133
+ f .Var (& cfg .PeriodicTableStartAt , "dynamodb.periodic-table.start" , "DynamoDB periodic tables start time." )
116
134
}
117
135
118
136
// AWSStore implements ChunkStore for AWS
119
137
type AWSStore struct {
120
- cfg StoreConfig
121
-
138
+ cfg StoreConfig
139
+ cache * Cache
122
140
dynamo * dynamoDBBackoffClient
123
141
}
124
142
125
143
// NewAWSStore makes a new ChunkStore
126
144
func NewAWSStore (cfg StoreConfig ) * AWSStore {
127
145
return & AWSStore {
128
146
cfg : cfg ,
147
+ cache : NewCache (cfg .CacheConfig ),
129
148
dynamo : newDynamoDBBackoffClient (cfg .DynamoDB ),
130
149
}
131
150
}
@@ -184,7 +203,7 @@ func (c *AWSStore) bigBuckets(from, through model.Time) []bucketSpec {
184
203
185
204
func (c * AWSStore ) tableForBucket (bucketStart int64 ) string {
186
205
if ! c .cfg .UsePeriodicTables || bucketStart < (c .cfg .PeriodicTableStartAt .Unix ()) {
187
- return c .cfg .TableName
206
+ return c .cfg .DynamoDB . TableName
188
207
}
189
208
return c .cfg .TablePrefix + strconv .Itoa (int (bucketStart / int64 (c .cfg .TablePeriod / time .Second )))
190
209
}
@@ -356,7 +375,7 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
356
375
var err error
357
376
_ , err = c .cfg .S3 .PutObject (& s3.PutObjectInput {
358
377
Body : body ,
359
- Bucket : aws .String (c .cfg .BucketName ),
378
+ Bucket : aws .String (c .cfg .S3 . BucketName ),
360
379
Key : aws .String (chunkName (userID , chunk .ID )),
361
380
})
362
381
return err
@@ -365,10 +384,8 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
365
384
return err
366
385
}
367
386
368
- if c .cfg .ChunkCache != nil {
369
- if err = c .cfg .ChunkCache .StoreChunkData (ctx , userID , chunk ); err != nil {
370
- log .Warnf ("Could not store %v in chunk cache: %v" , chunk .ID , err )
371
- }
387
+ if err = c .cache .StoreChunkData (ctx , userID , chunk ); err != nil {
388
+ log .Warnf ("Could not store %v in chunk cache: %v" , chunk .ID , err )
372
389
}
373
390
return nil
374
391
}
@@ -433,22 +450,18 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers .
433
450
queryChunks .Observe (float64 (len (missing )))
434
451
435
452
var fromCache []Chunk
436
- if c .cfg .ChunkCache != nil {
437
- fromCache , missing , err = c .cfg .ChunkCache .FetchChunkData (ctx , userID , missing )
438
- if err != nil {
439
- log .Warnf ("Error fetching from cache: %v" , err )
440
- }
453
+ fromCache , missing , err = c .cache .FetchChunkData (ctx , userID , missing )
454
+ if err != nil {
455
+ log .Warnf ("Error fetching from cache: %v" , err )
441
456
}
442
457
443
458
fromS3 , err := c .fetchChunkData (ctx , userID , missing )
444
459
if err != nil {
445
460
return nil , err
446
461
}
447
462
448
- if c .cfg .ChunkCache != nil {
449
- if err = c .cfg .ChunkCache .StoreChunks (ctx , userID , fromS3 ); err != nil {
450
- log .Warnf ("Could not store chunks in chunk cache: %v" , err )
451
- }
463
+ if err = c .cache .StoreChunks (ctx , userID , fromS3 ); err != nil {
464
+ log .Warnf ("Could not store chunks in chunk cache: %v" , err )
452
465
}
453
466
454
467
// TODO instead of doing this sort, propagate an index and assign chunks
@@ -611,7 +624,7 @@ func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bu
611
624
return nil , err
612
625
}
613
626
614
- if matcher .Type == metric .Equal && bucket .startTime .Before (c .cfg .Base64ValuesFrom ) {
627
+ if matcher .Type == metric .Equal && bucket .startTime .Before (c .cfg .Base64ValuesFrom . Time ) {
615
628
legacyRangePrefix , err := rangeValueKeyAndValueOnly (matcher .Name , matcher .Value )
616
629
if err != nil {
617
630
return nil , err
@@ -715,7 +728,7 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [
715
728
err := instrument .TimeRequestHistogram (ctx , "S3.GetObject" , s3RequestDuration , func (_ context.Context ) error {
716
729
var err error
717
730
resp , err = c .cfg .S3 .GetObject (& s3.GetObjectInput {
718
- Bucket : aws .String (c .cfg .BucketName ),
731
+ Bucket : aws .String (c .cfg .S3 . BucketName ),
719
732
Key : aws .String (chunkName (userID , chunk .ID )),
720
733
})
721
734
return err
0 commit comments