Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] Metric `cortex_kv_request_duration_seconds` now includes `name` label to denote which client is being used as well as the `backend` label to denote the KV backend implementation in use. #2648
* [CHANGE] Experimental Ruler: Rule groups persisted to object storage using the experimental API have an updated object key encoding to better handle special characters. Rule groups previously-stored using object storage must be renamed to the new format. #2646
* [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553
* [CHANGE] `-promql.lookback-delta` is now deprecated and has been replaced by `-querier.lookback-delta` along with `lookback_delta` entry under `querier` in the config file. `-promql.lookback-delta` will be removed in v1.4.0. #2604
Expand Down
37 changes: 35 additions & 2 deletions integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ package main

import (
"context"
"errors"
"sort"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
Expand All @@ -28,6 +31,8 @@ func TestKV_List_Delete(t *testing.T) {

require.NoError(t, s.StartAndWaitReady(etcdSvc, consulSvc))

reg := prometheus.NewRegistry()

etcdKv, err := kv.NewClient(kv.Config{
Store: "etcd",
Prefix: "keys/",
Expand All @@ -38,7 +43,7 @@ func TestKV_List_Delete(t *testing.T) {
MaxRetries: 5,
},
},
}, stringCodec{})
}, stringCodec{}, reg)
require.NoError(t, err)

consulKv, err := kv.NewClient(kv.Config{
Expand All @@ -52,7 +57,7 @@ func TestKV_List_Delete(t *testing.T) {
WatchKeyRateLimit: 1,
},
},
}, stringCodec{})
}, stringCodec{}, reg)
require.NoError(t, err)

kvs := []struct {
Expand Down Expand Up @@ -98,6 +103,34 @@ func TestKV_List_Delete(t *testing.T) {
require.Nil(t, v, "object was not deleted")
})
}

// Ensure the proper histogram metrics are reported
metrics, err := reg.Gather()
require.NoError(t, err)

require.Len(t, metrics, 1)
require.Equal(t, "cortex_kv_request_duration_seconds", metrics[0].GetName())
require.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
require.Len(t, metrics[0].GetMetric(), 8)

getMetricOperation := func(labels []*dto.LabelPair) (string, error) {
for _, l := range labels {
if l.GetName() == "operation" {
return l.GetValue(), nil
}
}
return "", errors.New("no operation")
}

for _, metric := range metrics[0].GetMetric() {
op, err := getMetricOperation(metric.Label)
require.NoErrorf(t, err, "No operation label found in metric %v", metric.String())
if op == "CAS" {
require.Equal(t, uint64(4), metric.GetHistogram().GetSampleCount())
} else {
require.Equal(t, uint64(1), metric.GetHistogram().GetSampleCount())
}
}
}

type stringCodec struct{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (c *Compactor) starting(ctx context.Context) error {
// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false)
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false, c.registerer)
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey)
c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey, c.registerer)
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (t *Cortex) initServer() (services.Service, error) {
func (t *Cortex) initRing() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey)
t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) {
// ruler's dependency)
canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor)

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing)
t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (cfg *Config) Validate() error {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer) (*Distributor, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return ingester_client.MakeIngesterClient(addr, clientConfig)
Expand All @@ -184,7 +184,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

replicas, err := newClusterTracker(cfg.HATrackerConfig)
replicas, err := newClusterTracker(cfg.HATrackerConfig, reg)
if err != nil {
return nil, err
}
Expand All @@ -201,7 +201,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true)
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, reg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
d.HATracker = r
Expand Down Expand Up @@ -936,7 +936,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 3,
}, ring.IngesterRingKey, ring.IngesterRingKey)
}, ring.IngesterRingKey, ring.IngesterRingKey, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))

Expand Down Expand Up @@ -969,7 +969,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
overrides, err := validation.NewOverrides(*cfg.limits, nil)
require.NoError(t, err)

d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true)
d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
8 changes: 6 additions & 2 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func GetReplicaDescCodec() codec.Proto {

// NewClusterTracker returns a new HA cluster tracker using either Consul
// or in-memory KV store. Tracker must be started via StartAsync().
func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTracker, error) {
var jitter time.Duration
if cfg.UpdateTimeoutJitterMax > 0 {
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
Expand All @@ -157,7 +157,11 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
}

if cfg.EnableHATracker {
client, err := kv.NewClient(cfg.KVStore, GetReplicaDescCodec())
client, err := kv.NewClient(
cfg.KVStore,
GetReplicaDescCodec(),
kv.RegistererWithKVName(reg, "distributor-hatracker"),
)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestWatchPrefixAssignment(t *testing.T) {
UpdateTimeout: time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) {
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) {
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestCheckReplicaMultiUser(t *testing.T) {
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) {
UpdateTimeout: testData.updateTimeout,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
})
}, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
// During WAL recovery, it will create new user states which requires the limiter.
// Hence initialise the limiter before creating the WAL.
// The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled.
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled)
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled, registerer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
}, i.numSeriesInTSDB)
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true)
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa

if gatewayCfg.ShardingEnabled {
storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig()
storesRingBackend, err := kv.NewClient(storesRingCfg.KVStore, ring.GetCodec())
storesRingBackend, err := kv.NewClient(
storesRingCfg.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(reg, "querier-store-gateway"),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create store-gateway ring backend")
}
Expand Down
32 changes: 22 additions & 10 deletions pkg/ring/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
Expand Down Expand Up @@ -97,19 +99,19 @@ type Client interface {

// NewClient creates a new Client (consul, etcd or inmemory) based on the config,
// encodes and decodes data for storage using the codec.
func NewClient(cfg Config, codec codec.Codec) (Client, error) {
func NewClient(cfg Config, codec codec.Codec, reg prometheus.Registerer) (Client, error) {
if cfg.Mock != nil {
return cfg.Mock, nil
}

return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec)
return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec, reg)
}

func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec) (Client, error) {
func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) {
var client Client
var err error

switch name {
switch backend {
case "consul":
client, err = consul.NewClient(cfg.Consul, codec)

Expand All @@ -135,10 +137,10 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec
}

case "multi":
client, err = buildMultiClient(cfg, codec)
client, err = buildMultiClient(cfg, codec, reg)

default:
return nil, fmt.Errorf("invalid KV store type: %s", name)
return nil, fmt.Errorf("invalid KV store type: %s", backend)
}

if err != nil {
Expand All @@ -149,10 +151,20 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec
client = PrefixClient(client, prefix)
}

return metrics{client}, nil
// If no Registerer is provided return the raw client
if reg == nil {
return client, nil
}

return newMetricsClient(backend, client, reg), nil
}

func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) {
func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) {
var (
primaryLabel = prometheus.Labels{"role": "primary"}
secondaryLabel = prometheus.Labels{"role": "secondary"}
)

if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" {
return nil, fmt.Errorf("primary or secondary store not set")
}
Expand All @@ -163,12 +175,12 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) {
return nil, fmt.Errorf("primary and secondary stores must be different")
}

primary, err := createClient(cfg.Multi.Primary, "", cfg, codec)
primary, err := createClient(cfg.Multi.Primary, "", cfg, codec, prometheus.WrapRegistererWith(primaryLabel, reg))
if err != nil {
return nil, err
}

secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec)
secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec, prometheus.WrapRegistererWith(secondaryLabel, reg))
if err != nil {
return nil, err
}
Expand Down
Loading