Skip to content

feature: redis storage client #1634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
github.com/Azure/go-autorest v11.5.1+incompatible // indirect
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/NYTimes/gziphandler v1.1.1
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/aws/aws-sdk-go v1.15.90
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
github.com/blang/semver v3.5.0+incompatible
Expand All @@ -28,6 +30,7 @@ require (
github.com/gogo/status v1.0.3
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gophercloud/gophercloud v0.0.0-20190307220656-fe1ba5ce12dd // indirect
github.com/gorilla/mux v1.6.2
github.com/gorilla/websocket v1.4.0 // indirect
Expand Down Expand Up @@ -70,6 +73,7 @@ require (
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
go.uber.org/zap v1.10.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down Expand Up @@ -68,6 +72,9 @@ github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA
github.com/certifi/gocertifi v0.0.0-20180905225744-ee1a9a0726d2/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292/go.mod h1:qRiX68mZX1lGBkTWyp3CLcenw9I94W2dLeRvMzcn9N4=
Expand Down Expand Up @@ -184,6 +191,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -513,6 +522,8 @@ github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZG
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 h1:1b6PAtenNyhsmo/NKXVe34h7JEZKva1YB/ne7K7mqKM=
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5 h1:3unozPyUjPcbSbfhBb4EgA3O1/yBYHNgRr4ZGjO9iyQ=
Expand Down Expand Up @@ -586,6 +597,7 @@ golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
161 changes: 161 additions & 0 deletions pkg/chunk/redis/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package redis

import (
"context"
"hash/fnv"
"time"

"github.com/go-kit/kit/log/level"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
)

var (
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "redis_request_duration_seconds",
Help: "Time spent doing redis requests.",
Buckets: []float64{.025, .05, .1, .25, .5, 1, 2},
}, []string{"operation", "server"})
)

func init() {
prometheus.MustRegister(requestDuration)
}

// Pool holds *redis.Pool instances, hashed consistently.
type Pool struct {
addr []string
Conns map[string]*redis.Pool
}

// NewPool creates a new Pool instance taking the config from this dialer object.
func NewPool(cfg Config) *Pool {
p := &Pool{
addr: cfg.Redis,
Conns: map[string]*redis.Pool{},
}

for _, addr := range p.addr {
if _, ok := p.Conns[addr]; ok {
continue
}

p.Conns[addr] = &redis.Pool{
MaxIdle: cfg.MaxIdle,
MaxActive: cfg.MaxActive,
Wait: true,
IdleTimeout: cfg.IdleTimeout,
Dial: dialerFuncFor(addr),
TestOnBorrow: testFunc,
}
}

return p
}

// MetricConn is a redis.Conn that has been wrapped with a metric instrument
type MetricConn struct {
c redis.Conn
a string
}

// Err implements the redis.Conn interface
func (r *MetricConn) Err() error { return r.c.Err() }

// Close implements the redis.Conn interface
func (r *MetricConn) Close() error { return r.c.Close() }

// Send implements the redis.Conn interface
func (r *MetricConn) Send(c string, args ...interface{}) error { return r.c.Send(c, args...) }

// Flush implements the redis.Conn interface
func (r *MetricConn) Flush() error { return r.c.Flush() }

// Receive implements the redis.Conn interface
func (r *MetricConn) Receive() (interface{}, error) { return r.c.Receive() }

// Do implements the redis.Conn interface but wraps a histogram metric around the call
func (r *MetricConn) Do(c string, args ...interface{}) (interface{}, error) {
defer func(start time.Time) {
requestDuration.WithLabelValues(c, r.a).Observe(time.Since(start).Seconds())
}(time.Now())
return r.c.Do(c, args...)
}

// Get a connection for the given key (hashed off of Addresses)
func (r Pool) Get(ctx context.Context, key string) (redis.Conn, error) {
addr := r.getAddr(key)
c, err := r.Conns[addr].GetContext(ctx)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to hash redis key", "key", key, "err", err)
return nil, err
}

return &MetricConn{
a: addr,
c: c,
}, nil
}

// getAddr will hash the key and return the node it maps to
func (r Pool) getAddr(key string) string {
hasher := fnv.New64a()
_, err := hasher.Write([]byte(key))
if err != nil {
level.Error(util.Logger).Log("msg", "failed to hash redis key", "key", key, "err", err)
return r.addr[0]
}

k := hasher.Sum64()
return r.addr[jumpHash(k, len(r.addr))]
}

// Close closes all connections
func (r Pool) Close() error {
for _, c := range r.Conns {
c.Close()
}
return nil
}

func dialerFuncFor(addr string) func() (redis.Conn, error) {
return func() (redis.Conn, error) {
level.Debug(util.Logger).Log("msg", "creating redis connection", "addr", addr)
c, err := redis.Dial("tcp", addr)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to create redis connection", "err", err)
}
return c, err
}
}

func testFunc(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}

_, err := c.Do("PING")
if err != nil {
return errors.Wrap(err, "could not validate Pool connection")
}

return nil
}

// jumpHash described in A Fast, Minimal Memory, Consistent Hash Algorithm
// https://arxiv.org/pdf/1406.2294.pdf
// Takes a key and the number of buckets(nodes) to hash into. Returns the node key should be placed in
func jumpHash(key uint64, numBuckets int) int32 {
var b int64 = -1
for j := int64(0); j < int64(numBuckets); {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}

return int32(b)
}
162 changes: 162 additions & 0 deletions pkg/chunk/redis/storage_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package redis

import (
"context"
"flag"
"fmt"
"strings"
"time"

"github.com/alicebob/miniredis"
"github.com/go-kit/kit/log/level"
"github.com/gomodule/redigo/redis"
"github.com/prometheus/common/model"

"github.com/cortexproject/cortex/pkg/chunk"
chunkutil "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

// override for testing purposes
var nowFunc = model.Now

// Config values for redis connections
type Config struct {
Redis flagext.Strings
MaxIdle int
MaxActive int
IdleTimeout time.Duration
ChunkRetention time.Duration
}

// RegisterFlags registers CLI flags for Config
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.Redis, "redis.url", "Redis endpoint URL(s). When multiple redis addresses are provided keys will be sharded "+
"across the instances using a consistent hash. Use inmemory:///localhost to use a mock, in-memory implementation.")

f.IntVar(&cfg.MaxIdle, "redis.max-idle", 32, "maximum idle connections to redis")
f.IntVar(&cfg.MaxActive, "redis.max-active", 64, "maximum active connections to redis")
f.DurationVar(&cfg.IdleTimeout, "redis.idle-timeout", time.Second, "redis connection idle timeout")
f.DurationVar(&cfg.ChunkRetention, "redis.chunk-retention", 0, "how long chunks should live in redis. 0 means forever.")
}

// ObjectClient is an object client that connects to Redis
type ObjectClient struct {
Pool *Pool
memoryServer *miniredis.Miniredis
retention time.Duration
}

// NewRedisObjectClient makes a new Redis-backed ObjectClient.
func NewRedisObjectClient(cfg Config, _ chunk.SchemaConfig) (*ObjectClient, error) {
if len(cfg.Redis) == 0 {
return nil, fmt.Errorf("no URL specified for Redis")
}

client := &ObjectClient{
retention: cfg.ChunkRetention,
}

if len(cfg.Redis) == 1 && strings.HasPrefix(cfg.Redis[0], "inmemory:///") {
srv, err := miniredis.Run()
if err != nil {
level.Error(util.Logger).Log("msg", "starting miniredis failed", "err", err)
return nil, fmt.Errorf("failed to start miniredis: %+v", err)
}
client.memoryServer = srv
cfg.Redis = []string{srv.Addr()}
level.Debug(util.Logger).Log("msg", "miniredis server started", "addr", cfg.Redis)
}

pool := NewPool(cfg)
client.Pool = pool
return client, nil
}

// Stop closes the redis connections and the memory server if applicable
func (a ObjectClient) Stop() {
a.Pool.Close()
if a.memoryServer != nil {
a.memoryServer.Close()
}
}

// GetChunks retrieves the provided chunks from the store
func (a ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
return chunkutil.GetParallelChunks(ctx, chunks, a.getChunk)
}

func (a ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
con, err := a.Pool.Get(ctx, c.ExternalKey())
if err != nil {
level.Error(util.Logger).Log("msg", "failed to get redis connection", "err", err)
return chunk.Chunk{}, err
}
defer con.Close()
resp, err := redis.Bytes(con.Do("GET", c.ExternalKey()))
if err != nil {
if err == redis.ErrNil {
return chunk.Chunk{}, nil
}
return chunk.Chunk{}, fmt.Errorf("failed to get chunk from redis: %+v", err)
}
if err := c.Decode(decodeContext, resp); err != nil {
return chunk.Chunk{}, err
}
return c, nil
}

// PutChunks stores the provided chunks into the store
func (a ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
incomingErrors := make(chan error)
now := nowFunc()

var putChunk = func(c chunk.Chunk) {
key := c.ExternalKey()
buf, err := c.Encoded()
if err != nil {
level.Error(util.Logger).Log("msg", "failed encoding chunk", "key", key, "err", err)
incomingErrors <- err
}

// TTL is the Through date plus the configured retention time so that the newest chunk is kept for the retention period
ttl := int(c.Through.Add(a.retention).Unix() - now.Unix())
if ttl < 0 {
level.Debug(util.Logger).Log("msg", "chunk is already expired and will be dropped", "key", key)
incomingErrors <- nil
}
incomingErrors <- a.putRedisChunk(ctx, key, buf, ttl)
}

for _, c := range chunks {
go putChunk(c)
}

var lastErr error
for range chunks {
err := <-incomingErrors
if err != nil {
lastErr = err
}
}
return lastErr
}

func (a ObjectClient) putRedisChunk(ctx context.Context, key string, buf []byte, ttl int) error {
con, err := a.Pool.Get(ctx, key)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to get connection to redis", "key", key, "err", err)
return err
}
defer con.Close()
args := []interface{}{key, buf}
if ttl > 0 {
args = append(args, "EX", ttl)
}
_, err = con.Do("SET", args...)
if err != nil {
level.Error(util.Logger).Log("msg", "putRedisChunk failed", "key", key, "err", err)
}
return err
}
Loading