Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
cmd/cortex/cortex
cmd/cortex_table_manager/cortex_table_manager
cmd/distributor/distributor
cmd/ingester/ingester
cmd/querier/querier
cmd/ruler/ruler
cmd/table-manager/table-manager
.uptodate
.pkg
*.pb.go
Expand Down
55 changes: 29 additions & 26 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# Boiler plate for bulding Docker containers.
# All this must go at top of file I'm afraid.
IMAGE_PREFIX := weaveworks
IMAGE_PREFIX := weaveworks/cortex-
IMAGE_TAG := $(shell ./tools/image-tag)
UPTODATE := .uptodate

Expand All @@ -12,40 +12,43 @@ UPTODATE := .uptodate
# Dependencies (i.e. things that go in the image) still need to be explicitly
# declared.
%/$(UPTODATE): %/Dockerfile
$(SUDO) docker build -t $(IMAGE_PREFIX)/$(shell basename $(@D)) $(@D)/
$(SUDO) docker tag $(IMAGE_PREFIX)/$(shell basename $(@D)) $(IMAGE_PREFIX)/$(shell basename $(@D)):$(IMAGE_TAG)
$(SUDO) docker build -t $(IMAGE_PREFIX)$(shell basename $(@D)) $(@D)/
$(SUDO) docker tag $(IMAGE_PREFIX)$(shell basename $(@D)) $(IMAGE_PREFIX)$(shell basename $(@D)):$(IMAGE_TAG)
touch $@

# Get a list of directories containing Dockerfiles
DOCKERFILES := $(shell find . -type f -name Dockerfile ! -path "./tools/*" ! -path "./vendor/*")
UPTODATE_FILES := $(patsubst %/Dockerfile,%/$(UPTODATE),$(DOCKERFILES))
DOCKER_IMAGE_DIRS=$(patsubst %/Dockerfile,%,$(DOCKERFILES))
IMAGE_NAMES=$(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)/%,$(shell basename $(dir))))
DOCKER_IMAGE_DIRS := $(patsubst %/Dockerfile,%,$(DOCKERFILES))
IMAGE_NAMES := $(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)%,$(shell basename $(dir))))
images:
$(info $(IMAGE_NAMES))
@echo > /dev/null

# Generating proto code is automated.
PROTO_DEFS := $(shell find . -type f -name "*.proto" ! -path "./tools/*" ! -path "./vendor/*")
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# List of exes please
CORTEX_EXE := ./cmd/cortex/cortex
CORTEX_TABLE_MANAGER_EXE := ./cmd/cortex_table_manager/cortex_table_manager
EXES = $(CORTEX_EXE) $(CORTEX_TABLE_MANAGER_EXE)

# Building binaries is now automated. The convention is to build a binary
# for every directory with main.go in it, in the ./cmd directory.
MAIN_GO := $(shell find ./cmd -type f -name main.go ! -path "./tools/*" ! -path "./vendor/*")
EXES := $(foreach exe, $(patsubst ./cmd/%/main.go, %, $(MAIN_GO)), ./cmd/$(exe)/$(exe))
GO_FILES := $(shell find . -name '*.go' ! -path "./cmd/*" ! -path "./tools/*" ! -path "./vendor/*")
define dep_exe
$(1): $(dir $(1))/main.go $(GO_FILES) ui/bindata.go $(PROTO_GOS)
$(dir $(1))$(UPTODATE): $(1)
endef
$(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))

# Manually declared dependancies And what goes into each exe
%.pb.go: %.proto
all: $(UPTODATE_FILES)
test: $(PROTO_GOS)

# And what goes into each exe
$(CORTEX_EXE): $(shell find . -name '*.go' ! -path "./tools/*" ! -path "./vendor/*") ui/bindata.go $(PROTO_GOS)
$(CORTEX_TABLE_MANAGER_EXE): $(shell find ./chunk/ -name '*.go') cmd/cortex_table_manager/main.go
%.pb.go: %.proto
ui/bindata.go: $(shell find ui/static ui/templates)
test: $(PROTO_GOS)

# And now what goes into each image
cortex-build/$(UPTODATE): cortex-build/*
cmd/cortex/$(UPTODATE): $(CORTEX_EXE)
cmd/cortex_table_manager/$(UPTODATE): $(CORTEX_TABLE_MANAGER_EXE)
build/$(UPTODATE): build/*

# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
Expand All @@ -63,32 +66,32 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \

ifeq ($(BUILD_IN_CONTAINER),true)

$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: cortex-build/$(UPTODATE)
$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: build/$(UPTODATE)
@mkdir -p $(shell pwd)/.pkg
$(SUDO) docker run $(RM) -ti \
-v $(shell pwd)/.pkg:/go/pkg \
-v $(shell pwd):/go/src/github.com/weaveworks/cortex \
$(IMAGE_PREFIX)/cortex-build $@
$(IMAGE_PREFIX)build $@

else

$(EXES): cortex-build/$(UPTODATE)
$(EXES): build/$(UPTODATE)
go build $(GO_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)

%.pb.go: cortex-build/$(UPTODATE)
%.pb.go: build/$(UPTODATE)
protoc -I ./vendor:./$(@D) --go_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)

ui/bindata.go: cortex-build/$(UPTODATE)
ui/bindata.go: build/$(UPTODATE)
go-bindata -pkg ui -o ui/bindata.go -ignore '(.*\.map|bootstrap\.js|bootstrap-theme\.css|bootstrap\.css)' ui/templates/... ui/static/...

lint: cortex-build/$(UPTODATE)
lint: build/$(UPTODATE)
./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers .

test: cortex-build/$(UPTODATE)
test: build/$(UPTODATE)
./tools/test -netgo

shell: cortex-build/$(UPTODATE)
shell: build/$(UPTODATE)
bash

endif
Expand Down
File renamed without changes.
File renamed without changes.
43 changes: 38 additions & 5 deletions chunk/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"time"
Expand Down Expand Up @@ -47,10 +48,34 @@ type Memcache interface {
Set(item *memcache.Item) error
}

// CacheConfig is config to make a Cache
type CacheConfig struct {
Expiration time.Duration
memcacheConfig MemcacheConfig
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
cfg.memcacheConfig.RegisterFlags(f)
}

// Cache type caches chunks
type Cache struct {
Memcache Memcache
Expiration time.Duration
cfg CacheConfig
memcache Memcache
}

// NewCache makes a new Cache
func NewCache(cfg CacheConfig) *Cache {
var memcache Memcache
if cfg.memcacheConfig.Host != "" {
memcache = NewMemcacheClient(cfg.memcacheConfig)
}
return &Cache{
cfg: cfg,
memcache: memcache,
}
}

func memcacheStatusCode(err error) string {
Expand All @@ -73,6 +98,10 @@ func memcacheKey(userID, chunkID string) string {

// FetchChunkData gets chunks from the chunk cache.
func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chunk) (found []Chunk, missing []Chunk, err error) {
if c.memcache == nil {
return nil, chunks, nil
}

memcacheRequests.Add(float64(len(chunks)))

keys := make([]string, 0, len(chunks))
Expand All @@ -83,7 +112,7 @@ func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chun
var items map[string]*memcache.Item
err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
var err error
items, err = c.Memcache.GetMulti(keys)
items, err = c.memcache.GetMulti(keys)
return err
})
if err != nil {
Expand Down Expand Up @@ -111,6 +140,10 @@ func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chun

// StoreChunkData serializes and stores a chunk in the chunk cache.
func (c *Cache) StoreChunkData(ctx context.Context, userID string, chunk *Chunk) error {
if c.memcache == nil {
return nil
}

reader, err := chunk.reader()
if err != nil {
return err
Expand All @@ -125,9 +158,9 @@ func (c *Cache) StoreChunkData(ctx context.Context, userID string, chunk *Chunk)
item := memcache.Item{
Key: memcacheKey(userID, chunk.ID),
Value: buf,
Expiration: int32(c.Expiration.Seconds()),
Expiration: int32(c.cfg.Expiration.Seconds()),
}
return c.Memcache.Set(&item)
return c.memcache.Set(&item)
})
}

Expand Down
69 changes: 41 additions & 28 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"sort"
"strconv"
Expand All @@ -21,6 +22,7 @@ import (
"golang.org/x/net/context"

"github.com/weaveworks/cortex/user"
"github.com/weaveworks/cortex/util"
)

const (
Expand Down Expand Up @@ -89,20 +91,28 @@ type Store interface {

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
S3 S3Client
BucketName string
DynamoDB DynamoDBClient
TableName string
ChunkCache *Cache
PeriodicTableConfig
CacheConfig
S3 S3ClientValue
DynamoDB DynamoDBClientValue

// After midnight on this day, we start bucketing indexes by day instead of by
// hour. Only the day matters, not the time within the day.
DailyBucketsFrom model.Time
DailyBucketsFrom util.DayValue

// After this time, we will only query for base64-encoded label values.
Base64ValuesFrom model.Time
Base64ValuesFrom util.DayValue
}

PeriodicTableConfig
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.PeriodicTableConfig.RegisterFlags(f)
cfg.CacheConfig.RegisterFlags(f)

f.Var(&cfg.S3, "s3.url", "S3 endpoint URL.")
f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL.")
f.Var(&cfg.DailyBucketsFrom, "dynamodb.daily-buckets-from", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.")
f.Var(&cfg.Base64ValuesFrom, "dynamodb.base64-buckets-from", "The date in the format YYYY-MM-DD after which we will stop querying to non-base64 encoded values.")
}

// PeriodicTableConfig for the use of periodic tables (ie, weekly talbes). Can
Expand All @@ -112,20 +122,29 @@ type PeriodicTableConfig struct {
UsePeriodicTables bool
TablePrefix string
TablePeriod time.Duration
PeriodicTableStartAt time.Time
PeriodicTableStartAt util.DayValue
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *PeriodicTableConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.UsePeriodicTables, "dynamodb.use-periodic-tables", true, "Should we user periodic tables.")
f.StringVar(&cfg.TablePrefix, "dynamodb.periodic-table.prefix", "cortex_", "DynamoDB table prefix for the periodic tables.")
f.DurationVar(&cfg.TablePeriod, "dynamodb.periodic-table.period", 7*24*time.Hour, "DynamoDB periodic tables period.")
f.Var(&cfg.PeriodicTableStartAt, "dynamodb.periodic-table.start", "DynamoDB periodic tables start time.")
}

// AWSStore implements ChunkStore for AWS
type AWSStore struct {
cfg StoreConfig

cfg StoreConfig
cache *Cache
dynamo *dynamoDBBackoffClient
}

// NewAWSStore makes a new ChunkStore
func NewAWSStore(cfg StoreConfig) *AWSStore {
return &AWSStore{
cfg: cfg,
cache: NewCache(cfg.CacheConfig),
dynamo: newDynamoDBBackoffClient(cfg.DynamoDB),
}
}
Expand Down Expand Up @@ -184,7 +203,7 @@ func (c *AWSStore) bigBuckets(from, through model.Time) []bucketSpec {

func (c *AWSStore) tableForBucket(bucketStart int64) string {
if !c.cfg.UsePeriodicTables || bucketStart < (c.cfg.PeriodicTableStartAt.Unix()) {
return c.cfg.TableName
return c.cfg.DynamoDB.TableName
}
return c.cfg.TablePrefix + strconv.Itoa(int(bucketStart/int64(c.cfg.TablePeriod/time.Second)))
}
Expand Down Expand Up @@ -356,7 +375,7 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
var err error
_, err = c.cfg.S3.PutObject(&s3.PutObjectInput{
Body: body,
Bucket: aws.String(c.cfg.BucketName),
Bucket: aws.String(c.cfg.S3.BucketName),
Key: aws.String(chunkName(userID, chunk.ID)),
})
return err
Expand All @@ -365,10 +384,8 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
return err
}

if c.cfg.ChunkCache != nil {
if err = c.cfg.ChunkCache.StoreChunkData(ctx, userID, chunk); err != nil {
log.Warnf("Could not store %v in chunk cache: %v", chunk.ID, err)
}
if err = c.cache.StoreChunkData(ctx, userID, chunk); err != nil {
log.Warnf("Could not store %v in chunk cache: %v", chunk.ID, err)
}
return nil
}
Expand Down Expand Up @@ -433,22 +450,18 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers .
queryChunks.Observe(float64(len(missing)))

var fromCache []Chunk
if c.cfg.ChunkCache != nil {
fromCache, missing, err = c.cfg.ChunkCache.FetchChunkData(ctx, userID, missing)
if err != nil {
log.Warnf("Error fetching from cache: %v", err)
}
fromCache, missing, err = c.cache.FetchChunkData(ctx, userID, missing)
if err != nil {
log.Warnf("Error fetching from cache: %v", err)
}

fromS3, err := c.fetchChunkData(ctx, userID, missing)
if err != nil {
return nil, err
}

if c.cfg.ChunkCache != nil {
if err = c.cfg.ChunkCache.StoreChunks(ctx, userID, fromS3); err != nil {
log.Warnf("Could not store chunks in chunk cache: %v", err)
}
if err = c.cache.StoreChunks(ctx, userID, fromS3); err != nil {
log.Warnf("Could not store chunks in chunk cache: %v", err)
}

// TODO instead of doing this sort, propagate an index and assign chunks
Expand Down Expand Up @@ -611,7 +624,7 @@ func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bu
return nil, err
}

if matcher.Type == metric.Equal && bucket.startTime.Before(c.cfg.Base64ValuesFrom) {
if matcher.Type == metric.Equal && bucket.startTime.Before(c.cfg.Base64ValuesFrom.Time) {
legacyRangePrefix, err := rangeValueKeyAndValueOnly(matcher.Name, matcher.Value)
if err != nil {
return nil, err
Expand Down Expand Up @@ -715,7 +728,7 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [
err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(_ context.Context) error {
var err error
resp, err = c.cfg.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.cfg.BucketName),
Bucket: aws.String(c.cfg.S3.BucketName),
Key: aws.String(chunkName(userID, chunk.ID)),
})
return err
Expand Down
Loading