From 928a75761b979989f5ed4b33e703a23cf5c7a60d Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 28 May 2020 13:15:49 -0400 Subject: [PATCH 01/10] initial attempt Signed-off-by: Jacob Lisi --- .circleci/config.yml | 2 +- integration/kv_test.go | 41 ++++++++++++-- pkg/distributor/distributor.go | 4 +- pkg/distributor/distributor_ring.go | 4 ++ pkg/distributor/distributor_test.go | 2 +- pkg/distributor/ha_tracker.go | 4 +- pkg/distributor/ha_tracker_test.go | 15 ++--- pkg/querier/blocks_store_queryable.go | 2 +- pkg/ring/kv/client.go | 22 +++---- pkg/ring/kv/consul/client.go | 19 +++---- pkg/ring/kv/consul/metrics.go | 82 --------------------------- pkg/ring/kv/metrics.go | 50 ++++++++++------ pkg/ring/lifecycler.go | 3 +- pkg/ring/ring.go | 3 +- pkg/ruler/ruler.go | 2 +- pkg/storegateway/gateway.go | 2 +- 16 files changed, 113 insertions(+), 144 deletions(-) delete mode 100644 pkg/ring/kv/consul/metrics.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 844bc65fdcf..fec8ffa4419 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -165,7 +165,7 @@ jobs: export CORTEX_IMAGE="${CORTEX_IMAGE_PREFIX}cortex:${CIRCLE_TAG:-$(./tools/image-tag)}" export CORTEX_CHECKOUT_DIR="/home/circleci/.go_workspace/src/github.com/cortexproject/cortex" echo "Running integration tests with image: $CORTEX_IMAGE" - go test -tags=requires_docker -timeout 1200s -v -count=1 ./integration/... + go test -mod=vendor -tags=requires_docker -timeout 1500s -v -count=1 ./integration/... no_output_timeout: 20m build: diff --git a/integration/kv_test.go b/integration/kv_test.go index 5114908d8f2..fdd17c6229c 100644 --- a/integration/kv_test.go +++ b/integration/kv_test.go @@ -4,10 +4,13 @@ package main import ( "context" + "errors" "sort" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" @@ -28,7 +31,9 @@ func TestKV_List_Delete(t *testing.T) { require.NoError(t, s.StartAndWaitReady(etcdSvc, consulSvc)) - etcdKv, err := kv.NewClient(kv.Config{ + reg := prometheus.NewRegistry() + + etcdKv, err := kv.NewClient("test-etcd", kv.Config{ Store: "etcd", Prefix: "keys/", StoreConfig: kv.StoreConfig{ @@ -38,10 +43,10 @@ func TestKV_List_Delete(t *testing.T) { MaxRetries: 5, }, }, - }, stringCodec{}) + }, stringCodec{}, reg) require.NoError(t, err) - consulKv, err := kv.NewClient(kv.Config{ + consulKv, err := kv.NewClient("test-consul", kv.Config{ Store: "consul", Prefix: "keys/", StoreConfig: kv.StoreConfig{ @@ -52,7 +57,7 @@ func TestKV_List_Delete(t *testing.T) { WatchKeyRateLimit: 1, }, }, - }, stringCodec{}) + }, stringCodec{}, reg) require.NoError(t, err) kvs := []struct { @@ -98,6 +103,34 @@ func TestKV_List_Delete(t *testing.T) { require.Nil(t, v, "object was not deleted") }) } + + // Ensure the proper histogram metrics are reported + metrics, err := reg.Gather() + require.NoError(t, err) + + require.Len(t, metrics, 1) + require.Equal(t, "cortex_kv_request_duration_seconds", metrics[0].GetName()) + require.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType()) + require.Len(t, metrics[0].GetMetric(), 8) + + getMetricOperation := func(labels []*dto.LabelPair) (string, error) { + for _, l := range labels { + if l.GetName() == "operation" { + return l.GetValue(), nil + } + } + return "", errors.New("no operation") + } + + for _, metric := range metrics[0].GetMetric() { + op, err := getMetricOperation(metric.Label) + require.NoErrorf(t, err, "No operation label found in metric %v", metric.String()) + if op == "CAS" { + require.Equal(t, uint64(4), metric.GetHistogram().GetSampleCount()) + } else { + require.Equal(t, uint64(1), metric.GetHistogram().GetSampleCount()) + } + } } type stringCodec struct{} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 28e66213c5a..3a3efd889e5 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -184,7 +184,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout - replicas, err := newClusterTracker(cfg.HATrackerConfig) + replicas, err := newClusterTracker(cfg.HATrackerConfig, nil) if err != nil { return nil, err } @@ -201,7 +201,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true) + distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, RingNameForClient, ring.DistributorRingKey, true) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index 2f3fba14b21..5cb6ce9b757 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -13,6 +13,10 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" ) +var ( + RingNameForClient = "distributor" +) + // RingConfig masks the ring lifecycler config which contains // many options not really required by the distributors ring. This config // is used to strip down the config to the minimum, and avoid confusion diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 972eb0d59c2..c2de9d9d7d5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -369,7 +369,7 @@ func TestDistributor_PushHAInstances(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: 100 * time.Millisecond, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) d.HATracker = r diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index fce5e72ed94..944ee00fac3 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -143,7 +143,7 @@ func GetReplicaDescCodec() codec.Proto { // NewClusterTracker returns a new HA cluster tracker using either Consul // or in-memory KV store. Tracker must be started via StartAsync(). -func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { +func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTracker, error) { var jitter time.Duration if cfg.UpdateTimeoutJitterMax > 0 { jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax @@ -157,7 +157,7 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { } if cfg.EnableHATracker { - client, err := kv.NewClient(cfg.KVStore, GetReplicaDescCodec()) + client, err := kv.NewClient(RingNameForClient, cfg.KVStore, GetReplicaDescCodec(), reg) if err != nil { return nil, err } diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 4767650a56a..02c31823a82 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -135,7 +136,7 @@ func TestWatchPrefixAssignment(t *testing.T) { UpdateTimeout: time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Millisecond * 2, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -165,7 +166,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -200,7 +201,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -235,7 +236,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -286,7 +287,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) { UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -347,7 +348,7 @@ func TestCheckReplicaMultiUser(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -430,7 +431,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) { UpdateTimeout: testData.updateTimeout, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 04cc8ade4fe..35ab8be6567 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -150,7 +150,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa if gatewayCfg.ShardingEnabled { storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() - storesRingBackend, err := kv.NewClient(storesRingCfg.KVStore, ring.GetCodec()) + storesRingBackend, err := kv.NewClient(storegateway.RingNameForClient, storesRingCfg.KVStore, ring.GetCodec(), reg) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") } diff --git a/pkg/ring/kv/client.go b/pkg/ring/kv/client.go index 9b5c9188e03..8852746a2a5 100644 --- a/pkg/ring/kv/client.go +++ b/pkg/ring/kv/client.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ring/kv/etcd" @@ -97,19 +99,19 @@ type Client interface { // NewClient creates a new Client (consul, etcd or inmemory) based on the config, // encodes and decodes data for storage using the codec. -func NewClient(cfg Config, codec codec.Codec) (Client, error) { +func NewClient(name string, cfg Config, codec codec.Codec, reg prometheus.Registerer) (Client, error) { if cfg.Mock != nil { return cfg.Mock, nil } - return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec) + return createClient(name, cfg.Store, cfg.Prefix, cfg.StoreConfig, codec, reg) } -func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec) (Client, error) { +func createClient(name string, backend string, prefix string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { var client Client var err error - switch name { + switch backend { case "consul": client, err = consul.NewClient(cfg.Consul, codec) @@ -135,10 +137,10 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec } case "multi": - client, err = buildMultiClient(cfg, codec) + client, err = buildMultiClient(name, cfg, codec, reg) default: - return nil, fmt.Errorf("invalid KV store type: %s", name) + return nil, fmt.Errorf("invalid KV store type: %s", backend) } if err != nil { @@ -149,10 +151,10 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec client = PrefixClient(client, prefix) } - return metrics{client}, nil + return newMetricsClient(name, backend, client, reg), nil } -func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) { +func buildMultiClient(name string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" { return nil, fmt.Errorf("primary or secondary store not set") } @@ -163,12 +165,12 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) { return nil, fmt.Errorf("primary and secondary stores must be different") } - primary, err := createClient(cfg.Multi.Primary, "", cfg, codec) + primary, err := createClient(name+"-primary", cfg.Multi.Primary, "", cfg, codec, reg) if err != nil { return nil, err } - secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec) + secondary, err := createClient(name+"-secondary", cfg.Multi.Secondary, "", cfg, codec, reg) if err != nil { return nil, err } diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index 31bb4e58db3..7349e6a0afe 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" - "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" "github.com/cortexproject/cortex/pkg/ring/kv/codec" @@ -85,7 +84,7 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) { return nil, err } c := &Client{ - kv: consulMetrics{client.KV()}, + kv: client.KV(), codec: codec, cfg: cfg, } @@ -99,21 +98,17 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - _, err := c.kv.Put(&consul.KVPair{ - Key: key, - Value: bytes, - }, nil) - return err - }) + _, err = c.kv.Put(&consul.KVPair{ + Key: key, + Value: bytes, + }, nil) + return err } // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - return c.cas(ctx, key, f) - }) + return c.cas(ctx, key, f) } func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { diff --git a/pkg/ring/kv/consul/metrics.go b/pkg/ring/kv/consul/metrics.go deleted file mode 100644 index 99910a16a33..00000000000 --- a/pkg/ring/kv/consul/metrics.go +++ /dev/null @@ -1,82 +0,0 @@ -package consul - -import ( - "context" - - consul "github.com/hashicorp/consul/api" - "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" -) - -var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "consul_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, -}, []string{"operation", "status_code"})) - -func init() { - consulRequestDuration.Register() -} - -type consulMetrics struct { - kv -} - -func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { - var ok bool - var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - options = options.WithContext(ctx) - var err error - ok, result, err = c.kv.CAS(p, options) - return err - }) - return ok, result, err -} - -func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { - var kvp *consul.KVPair - var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - options = options.WithContext(ctx) - var err error - kvp, meta, err = c.kv.Get(key, options) - return err - }) - return kvp, meta, err -} - -func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { - var kvps consul.KVPairs - var meta *consul.QueryMeta - err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - options = options.WithContext(ctx) - var err error - kvps, meta, err = c.kv.List(path, options) - return err - }) - return kvps, meta, err -} - -func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { - var meta *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - options = options.WithContext(ctx) - var err error - meta, err = c.kv.Delete(key, options) - return err - }) - return meta, err -} - -func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { - var result *consul.WriteMeta - err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - options = options.WithContext(ctx) - var err error - result, err = c.kv.Put(p, options) - return err - }) - return result, err -} diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 1e905c93551..65ca686c6b0 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -5,21 +5,11 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/instrument" ) -var requestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "kv_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, -}, []string{"operation", "status_code"})) - -func init() { - requestDuration.Register() -} - // errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument func errorCode(err error) string { if err == nil { @@ -32,12 +22,36 @@ func errorCode(err error) string { } type metrics struct { - c Client + c Client + requestDuration *instrument.HistogramCollector +} + +func newMetricsClient(name string, backend string, c Client, reg prometheus.Registerer) Client { + // TODO: Ensure all clients are passed a valid registry and remove the following check + if reg == nil { + reg = prometheus.DefaultRegisterer + } + + return &metrics{ + c: c, + requestDuration: instrument.NewHistogramCollector( + promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "kv_request_duration_seconds", + Help: "Time spent on kv store requests.", + Buckets: prometheus.DefBuckets, + ConstLabels: prometheus.Labels{ + "name": name, + "type": backend, + }, + }, []string{"operation", "status_code"}), + ), + } } func (m metrics) List(ctx context.Context, prefix string) ([]string, error) { var result []string - err := instrument.CollectedRequest(ctx, "List", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "List", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error result, err = m.c.List(ctx, prefix) return err @@ -47,7 +61,7 @@ func (m metrics) List(ctx context.Context, prefix string) ([]string, error) { func (m metrics) Get(ctx context.Context, key string) (interface{}, error) { var result interface{} - err := instrument.CollectedRequest(ctx, "GET", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "GET", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error result, err = m.c.Get(ctx, key) return err @@ -56,27 +70,27 @@ func (m metrics) Get(ctx context.Context, key string) (interface{}, error) { } func (m metrics) Delete(ctx context.Context, key string) error { - err := instrument.CollectedRequest(ctx, "Delete", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "Delete", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { return m.c.Delete(ctx, key) }) return err } func (m metrics) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS", requestDuration, errorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, errorCode, func(ctx context.Context) error { return m.c.CAS(ctx, key, f) }) } func (m metrics) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { - _ = instrument.CollectedRequest(ctx, "WatchKey", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchKey", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchKey(ctx, key, f) return nil }) } func (m metrics) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { - _ = instrument.CollectedRequest(ctx, "WatchPrefix", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchPrefix", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchPrefix(ctx, prefix, f) return nil }) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e68aed9ed14..d41c7a33500 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -147,7 +147,8 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa } port := GetInstancePort(cfg.Port, cfg.ListenPort) codec := GetCodec() - store, err := kv.NewClient(cfg.RingConfig.KVStore, codec) + // Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler + store, err := kv.NewClient(ringName+"-lifecycler", cfg.RingConfig.KVStore, codec, nil) if err != nil { return nil, err } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 10a113b6a2e..b362a14697d 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -118,7 +118,8 @@ type Ring struct { // New creates a new Ring. Being a service, Ring needs to be started to do anything. func New(cfg Config, name, key string) (*Ring, error) { codec := GetCodec() - store, err := kv.NewClient(cfg.KVStore, codec) + // Suffix all client names with "-ring" to denote this kv client is used by the ring + store, err := kv.NewClient(name+"-ring", cfg.KVStore, codec, nil) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a5192a4738a..a80b99a0f98 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -190,7 +190,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable } if cfg.EnableSharding { - ringStore, err := kv.NewClient(cfg.Ring.KVStore, ring.GetCodec()) + ringStore, err := kv.NewClient(ring.RulerRingKey, cfg.Ring.KVStore, ring.GetCodec(), reg) if err != nil { return nil, errors.Wrap(err, "create KV store client") } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 02ec9be63d0..b1be787a808 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -81,7 +81,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.Config, logLevel } if gatewayCfg.ShardingEnabled { - ringStore, err = kv.NewClient(gatewayCfg.ShardingRing.KVStore, ring.GetCodec()) + ringStore, err = kv.NewClient(RingNameForClient, gatewayCfg.ShardingRing.KVStore, ring.GetCodec(), reg) if err != nil { return nil, errors.Wrap(err, "create KV store client") } From 0f4b13495ea56eb70a584c8b9d7278d55cba4d38 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 28 May 2020 18:24:20 -0400 Subject: [PATCH 02/10] different name for block store queryable Signed-off-by: Jacob Lisi --- pkg/querier/blocks_store_queryable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 35ab8be6567..388d8181ebc 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -150,7 +150,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa if gatewayCfg.ShardingEnabled { storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() - storesRingBackend, err := kv.NewClient(storegateway.RingNameForClient, storesRingCfg.KVStore, ring.GetCodec(), reg) + storesRingBackend, err := kv.NewClient(storegateway.RingNameForClient+"-block-store", storesRingCfg.KVStore, ring.GetCodec(), reg) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") } From 6ea374a1da8d5bd32f761426fd8028043893da2f Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 28 May 2020 23:15:16 -0400 Subject: [PATCH 03/10] revert circleci config Signed-off-by: Jacob Lisi --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fec8ffa4419..844bc65fdcf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -165,7 +165,7 @@ jobs: export CORTEX_IMAGE="${CORTEX_IMAGE_PREFIX}cortex:${CIRCLE_TAG:-$(./tools/image-tag)}" export CORTEX_CHECKOUT_DIR="/home/circleci/.go_workspace/src/github.com/cortexproject/cortex" echo "Running integration tests with image: $CORTEX_IMAGE" - go test -mod=vendor -tags=requires_docker -timeout 1500s -v -count=1 ./integration/... + go test -tags=requires_docker -timeout 1200s -v -count=1 ./integration/... no_output_timeout: 20m build: From fc91e8fbf0837156610aaaa2052fad50b9f86108 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 12:32:42 -0400 Subject: [PATCH 04/10] don't register nil reg in kv metrics Signed-off-by: Jacob Lisi --- pkg/distributor/ha_tracker_test.go | 15 +++++++-------- pkg/ring/kv/metrics.go | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 02c31823a82..c4a3c78bd46 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -136,7 +135,7 @@ func TestWatchPrefixAssignment(t *testing.T) { UpdateTimeout: time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Millisecond * 2, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -166,7 +165,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -201,7 +200,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -236,7 +235,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -287,7 +286,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) { UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -348,7 +347,7 @@ func TestCheckReplicaMultiUser(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -431,7 +430,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) { UpdateTimeout: testData.updateTimeout, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 65ca686c6b0..ebb0667b329 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -27,9 +27,9 @@ type metrics struct { } func newMetricsClient(name string, backend string, c Client, reg prometheus.Registerer) Client { - // TODO: Ensure all clients are passed a valid registry and remove the following check + // If no Registerer is provided return the raw client if reg == nil { - reg = prometheus.DefaultRegisterer + return c } return &metrics{ From e1b95f90fdea35dc2ee92d86efd7ba841ceb1b1b Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 13:03:43 -0400 Subject: [PATCH 05/10] refactor per PR comments Signed-off-by: Jacob Lisi --- integration/kv_test.go | 4 +-- pkg/compactor/compactor.go | 4 +-- pkg/cortex/modules.go | 4 +-- pkg/distributor/distributor.go | 6 ++-- pkg/distributor/distributor_test.go | 4 +-- pkg/distributor/ha_tracker.go | 6 +++- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_v2.go | 2 +- pkg/querier/blocks_store_queryable.go | 6 +++- pkg/ring/kv/client.go | 16 +++++------ pkg/ring/kv/metrics.go | 8 ++++-- pkg/ring/lifecycler.go | 8 ++++-- pkg/ring/lifecycler_test.go | 40 +++++++++++++-------------- pkg/ring/ring.go | 8 ++++-- pkg/ruler/ruler.go | 6 +++- pkg/storegateway/gateway.go | 6 +++- 16 files changed, 79 insertions(+), 51 deletions(-) diff --git a/integration/kv_test.go b/integration/kv_test.go index fdd17c6229c..c74185de2dc 100644 --- a/integration/kv_test.go +++ b/integration/kv_test.go @@ -33,7 +33,7 @@ func TestKV_List_Delete(t *testing.T) { reg := prometheus.NewRegistry() - etcdKv, err := kv.NewClient("test-etcd", kv.Config{ + etcdKv, err := kv.NewClient(kv.Config{ Store: "etcd", Prefix: "keys/", StoreConfig: kv.StoreConfig{ @@ -46,7 +46,7 @@ func TestKV_List_Delete(t *testing.T) { }, stringCodec{}, reg) require.NoError(t, err) - consulKv, err := kv.NewClient("test-consul", kv.Config{ + consulKv, err := kv.NewClient(kv.Config{ Store: "consul", Prefix: "keys/", StoreConfig: kv.StoreConfig{ diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 902120532eb..ea682547d29 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -195,12 +195,12 @@ func (c *Compactor) starting(ctx context.Context) error { // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() - c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false) + c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false, c.registerer) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } - c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey) + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey, c.registerer) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ab50a693257..694aacd4747 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -107,7 +107,7 @@ func (t *Cortex) initServer() (services.Service, error) { func (t *Cortex) initRing() (serv services.Service, err error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) + t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, nil) if err != nil { return } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3a3efd889e5..08e42d84c97 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -174,7 +174,7 @@ func (cfg *Config) Validate() error { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer) (*Distributor, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = func(addr string) (ring_client.PoolClient, error) { return ingester_client.MakeIngesterClient(addr, clientConfig) @@ -184,7 +184,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout - replicas, err := newClusterTracker(cfg.HATrackerConfig, nil) + replicas, err := newClusterTracker(cfg.HATrackerConfig, reg) if err != nil { return nil, err } @@ -201,7 +201,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, RingNameForClient, ring.DistributorRingKey, true) + distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, RingNameForClient, ring.DistributorRingKey, true, reg) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c2de9d9d7d5..d6eb5f75d80 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -936,7 +936,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 3, - }, ring.IngesterRingKey, ring.IngesterRingKey) + }, ring.IngesterRingKey, ring.IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) @@ -969,7 +969,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin overrides, err := validation.NewOverrides(*cfg.limits, nil) require.NoError(t, err) - d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true) + d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 944ee00fac3..56e0afb1d35 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -157,7 +157,11 @@ func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTrack } if cfg.EnableHATracker { - client, err := kv.NewClient(RingNameForClient, cfg.KVStore, GetReplicaDescCodec(), reg) + client, err := kv.NewClient( + cfg.KVStore, + GetReplicaDescCodec(), + prometheus.WrapRegistererWith(prometheus.Labels{"name": RingNameForClient}, reg), + ) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5bc1a1c35e1..a80126a52bb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -199,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // During WAL recovery, it will create new user states which requires the limiter. // Hence initialise the limiter before creating the WAL. // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled, registerer) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 32ce43c7a6f..25b7e1faf7d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -146,7 +146,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, }, i.numSeriesInTSDB) } - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer) if err != nil { return nil, err } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 388d8181ebc..a3067fc0a9f 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -150,7 +150,11 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa if gatewayCfg.ShardingEnabled { storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() - storesRingBackend, err := kv.NewClient(storegateway.RingNameForClient+"-block-store", storesRingCfg.KVStore, ring.GetCodec(), reg) + storesRingBackend, err := kv.NewClient( + storesRingCfg.KVStore, + ring.GetCodec(), + prometheus.WrapRegistererWith(prometheus.Labels{"name": storegateway.RingNameForClient + "-block-store"}, reg), + ) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") } diff --git a/pkg/ring/kv/client.go b/pkg/ring/kv/client.go index 8852746a2a5..c417fc4432f 100644 --- a/pkg/ring/kv/client.go +++ b/pkg/ring/kv/client.go @@ -99,15 +99,15 @@ type Client interface { // NewClient creates a new Client (consul, etcd or inmemory) based on the config, // encodes and decodes data for storage using the codec. -func NewClient(name string, cfg Config, codec codec.Codec, reg prometheus.Registerer) (Client, error) { +func NewClient(cfg Config, codec codec.Codec, reg prometheus.Registerer) (Client, error) { if cfg.Mock != nil { return cfg.Mock, nil } - return createClient(name, cfg.Store, cfg.Prefix, cfg.StoreConfig, codec, reg) + return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec, reg) } -func createClient(name string, backend string, prefix string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { +func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { var client Client var err error @@ -137,7 +137,7 @@ func createClient(name string, backend string, prefix string, cfg StoreConfig, c } case "multi": - client, err = buildMultiClient(name, cfg, codec, reg) + client, err = buildMultiClient(cfg, codec, reg) default: return nil, fmt.Errorf("invalid KV store type: %s", backend) @@ -151,10 +151,10 @@ func createClient(name string, backend string, prefix string, cfg StoreConfig, c client = PrefixClient(client, prefix) } - return newMetricsClient(name, backend, client, reg), nil + return newMetricsClient(backend, client, reg), nil } -func buildMultiClient(name string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { +func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" { return nil, fmt.Errorf("primary or secondary store not set") } @@ -165,12 +165,12 @@ func buildMultiClient(name string, cfg StoreConfig, codec codec.Codec, reg prome return nil, fmt.Errorf("primary and secondary stores must be different") } - primary, err := createClient(name+"-primary", cfg.Multi.Primary, "", cfg, codec, reg) + primary, err := createClient(cfg.Multi.Primary, "", cfg, codec, prometheus.WrapRegistererWith(primaryLabel, reg)) if err != nil { return nil, err } - secondary, err := createClient(name+"-secondary", cfg.Multi.Secondary, "", cfg, codec, reg) + secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec, prometheus.WrapRegistererWith(secondaryLabel, reg)) if err != nil { return nil, err } diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index ebb0667b329..7a6318278df 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -10,6 +10,11 @@ import ( "github.com/weaveworks/common/instrument" ) +var ( + primaryLabel = prometheus.Labels{"role": "primary"} + secondaryLabel = prometheus.Labels{"role": "secondary"} +) + // errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument func errorCode(err error) string { if err == nil { @@ -26,7 +31,7 @@ type metrics struct { requestDuration *instrument.HistogramCollector } -func newMetricsClient(name string, backend string, c Client, reg prometheus.Registerer) Client { +func newMetricsClient(backend string, c Client, reg prometheus.Registerer) Client { // If no Registerer is provided return the raw client if reg == nil { return c @@ -41,7 +46,6 @@ func newMetricsClient(name string, backend string, c Client, reg prometheus.Regi Help: "Time spent on kv store requests.", Buckets: prometheus.DefBuckets, ConstLabels: prometheus.Labels{ - "name": name, "type": backend, }, }, []string{"operation", "status_code"}), diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index d41c7a33500..6a012323fed 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -140,7 +140,7 @@ type Lifecycler struct { } // NewLifecycler creates new Lifecycler. It must be started via StartAsync. -func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool) (*Lifecycler, error) { +func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, reg prometheus.Registerer) (*Lifecycler, error) { addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames) if err != nil { return nil, err @@ -148,7 +148,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa port := GetInstancePort(cfg.Port, cfg.ListenPort) codec := GetCodec() // Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler - store, err := kv.NewClient(ringName+"-lifecycler", cfg.RingConfig.KVStore, codec, nil) + store, err := kv.NewClient( + cfg.RingConfig.KVStore, + codec, + prometheus.WrapRegistererWith(prometheus.Labels{"name": ringName + "-lifecycler"}, reg), + ) if err != nil { return nil, err } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index ed9187c9149..179340d3b69 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -58,7 +58,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -68,7 +68,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig1.JoinAfter = 100 * time.Millisecond - lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey, true) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler1.HealthyInstancesCount()) @@ -84,7 +84,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig2.JoinAfter = 100 * time.Millisecond - lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler2.HealthyInstancesCount()) @@ -108,7 +108,7 @@ func TestLifecycler_NilFlushTransferer(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Create a lifecycler with nil FlushTransferer to make sure it operates correctly - lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true) + lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) @@ -132,12 +132,12 @@ func TestLifecycler_TwoRingsWithDifferentKeysOnTheSameKVStore(t *testing.T) { lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "instance-1") lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "instance-2") - lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1", true) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1", true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler1)) defer services.StopAndAwaitTerminated(context.Background(), lifecycler1) //nolint:errcheck - lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2", true) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2", true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler2)) defer services.StopAndAwaitTerminated(context.Background(), lifecycler2) //nolint:errcheck @@ -166,14 +166,14 @@ func TestRingRestart(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck // Add an 'ingester' with normalised tokens. lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") - l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -187,7 +187,7 @@ func TestRingRestart(t *testing.T) { token := l1.tokens[0] // Add a second ingester with the same settings, so it will think it has restarted - l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) @@ -261,7 +261,7 @@ func TestCheckReady(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = &MockClient{} - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, r.StartAsync(context.Background())) // This is very atypical, but if we used AwaitRunning, that would fail, because of how quickly service terminates ... @@ -271,7 +271,7 @@ func TestCheckReady(t *testing.T) { cfg := testLifecyclerConfig(ringConfig, "ring1") cfg.MinReadyDuration = 1 * time.Nanosecond - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -293,7 +293,7 @@ func TestTokensOnDisk(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -309,7 +309,7 @@ func TestTokensOnDisk(t *testing.T) { lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" // Start first ingester. - l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -333,7 +333,7 @@ func TestTokensOnDisk(t *testing.T) { // Start new ingester at same token directory. lifecyclerConfig.ID = "ing2" - l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck @@ -368,7 +368,7 @@ func TestJoinInLeavingState(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -395,7 +395,7 @@ func TestJoinInLeavingState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -420,7 +420,7 @@ func TestJoinInJoiningState(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -447,7 +447,7 @@ func TestJoinInJoiningState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -476,7 +476,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { codec := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(codec) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -502,7 +502,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index b362a14697d..660bfa27e53 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -116,10 +116,14 @@ type Ring struct { } // New creates a new Ring. Being a service, Ring needs to be started to do anything. -func New(cfg Config, name, key string) (*Ring, error) { +func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) { codec := GetCodec() // Suffix all client names with "-ring" to denote this kv client is used by the ring - store, err := kv.NewClient(name+"-ring", cfg.KVStore, codec, nil) + store, err := kv.NewClient( + cfg.KVStore, + codec, + prometheus.WrapRegistererWith(prometheus.Labels{"name": name + "-ring"}, reg), + ) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a80b99a0f98..feb548f882f 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -190,7 +190,11 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable } if cfg.EnableSharding { - ringStore, err := kv.NewClient(ring.RulerRingKey, cfg.Ring.KVStore, ring.GetCodec(), reg) + ringStore, err := kv.NewClient( + cfg.Ring.KVStore, + ring.GetCodec(), + prometheus.WrapRegistererWith(prometheus.Labels{"name": ring.RulerRingKey}, reg), + ) if err != nil { return nil, errors.Wrap(err, "create KV store client") } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index b1be787a808..684efc2a913 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -81,7 +81,11 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.Config, logLevel } if gatewayCfg.ShardingEnabled { - ringStore, err = kv.NewClient(RingNameForClient, gatewayCfg.ShardingRing.KVStore, ring.GetCodec(), reg) + ringStore, err = kv.NewClient( + gatewayCfg.ShardingRing.KVStore, + ring.GetCodec(), + prometheus.WrapRegistererWith(prometheus.Labels{"name": RingNameForClient}, reg), + ) if err != nil { return nil, errors.Wrap(err, "create KV store client") } From c8516c2a9bfaf09c44e5fed85a62a13ac5ce02bd Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 13:47:17 -0400 Subject: [PATCH 06/10] ensure nil registry is not wrapped and provided to KV store Signed-off-by: Jacob Lisi --- pkg/distributor/ha_tracker.go | 2 +- pkg/querier/blocks_store_queryable.go | 2 +- pkg/ring/kv/metrics.go | 10 ++++++++++ pkg/ring/lifecycler.go | 2 +- pkg/ring/ring.go | 2 +- pkg/ruler/ruler.go | 2 +- pkg/storegateway/gateway.go | 2 +- 7 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 56e0afb1d35..a3944cb6748 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -160,7 +160,7 @@ func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTrack client, err := kv.NewClient( cfg.KVStore, GetReplicaDescCodec(), - prometheus.WrapRegistererWith(prometheus.Labels{"name": RingNameForClient}, reg), + kv.RegistererWithKVName(reg, RingNameForClient), ) if err != nil { return nil, err diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index a3067fc0a9f..1ae5950da61 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -153,7 +153,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa storesRingBackend, err := kv.NewClient( storesRingCfg.KVStore, ring.GetCodec(), - prometheus.WrapRegistererWith(prometheus.Labels{"name": storegateway.RingNameForClient + "-block-store"}, reg), + kv.RegistererWithKVName(reg, storegateway.RingNameForClient+"-block-store"), ) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 7a6318278df..242b7ba9872 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -15,6 +15,16 @@ var ( secondaryLabel = prometheus.Labels{"role": "secondary"} ) +// RegistererWithKVName wraps the provided Registerer with the KV name label. If a nil reg +// is provided, a nil registry is returned +func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Registerer { + if reg == nil { + return nil + } + + return prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg) +} + // errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument func errorCode(err error) string { if err == nil { diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 6a012323fed..6e9e4e4b347 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -151,7 +151,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa store, err := kv.NewClient( cfg.RingConfig.KVStore, codec, - prometheus.WrapRegistererWith(prometheus.Labels{"name": ringName + "-lifecycler"}, reg), + kv.RegistererWithKVName(reg, ringName+"-lifecycler"), ) if err != nil { return nil, err diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 660bfa27e53..0b52c4a09a0 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -122,7 +122,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) store, err := kv.NewClient( cfg.KVStore, codec, - prometheus.WrapRegistererWith(prometheus.Labels{"name": name + "-ring"}, reg), + kv.RegistererWithKVName(reg, name+"-ring"), ) if err != nil { return nil, err diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index feb548f882f..ff59d53f186 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -193,7 +193,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable ringStore, err := kv.NewClient( cfg.Ring.KVStore, ring.GetCodec(), - prometheus.WrapRegistererWith(prometheus.Labels{"name": ring.RulerRingKey}, reg), + kv.RegistererWithKVName(reg, ring.RulerRingKey), ) if err != nil { return nil, errors.Wrap(err, "create KV store client") diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 684efc2a913..0e67e8bc8e8 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -84,7 +84,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.Config, logLevel ringStore, err = kv.NewClient( gatewayCfg.ShardingRing.KVStore, ring.GetCodec(), - prometheus.WrapRegistererWith(prometheus.Labels{"name": RingNameForClient}, reg), + kv.RegistererWithKVName(reg, RingNameForClient), ) if err != nil { return nil, errors.Wrap(err, "create KV store client") From a62c493b8852dcf32d5fb8928bce902fa77ab8ae Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 13:49:37 -0400 Subject: [PATCH 07/10] update distributor ha-tracker client name Signed-off-by: Jacob Lisi --- pkg/distributor/ha_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index a3944cb6748..5bfbd8a1eb8 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -160,7 +160,7 @@ func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTrack client, err := kv.NewClient( cfg.KVStore, GetReplicaDescCodec(), - kv.RegistererWithKVName(reg, RingNameForClient), + kv.RegistererWithKVName(reg, "distributor-hatracker"), ) if err != nil { return nil, err From 5f3671e74a4066b76352e61c3230bbb79713c5e8 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 14:01:16 -0400 Subject: [PATCH 08/10] re-add consul instrumentation Signed-off-by: Jacob Lisi --- pkg/ring/kv/consul/client.go | 19 +++++--- pkg/ring/kv/consul/metrics.go | 82 +++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 pkg/ring/kv/consul/metrics.go diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index 7349e6a0afe..31bb4e58db3 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" + "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" "github.com/cortexproject/cortex/pkg/ring/kv/codec" @@ -84,7 +85,7 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) { return nil, err } c := &Client{ - kv: client.KV(), + kv: consulMetrics{client.KV()}, codec: codec, cfg: cfg, } @@ -98,17 +99,21 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - _, err = c.kv.Put(&consul.KVPair{ - Key: key, - Value: bytes, - }, nil) - return err + return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _, err := c.kv.Put(&consul.KVPair{ + Key: key, + Value: bytes, + }, nil) + return err + }) } // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return c.cas(ctx, key, f) + return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return c.cas(ctx, key, f) + }) } func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { diff --git a/pkg/ring/kv/consul/metrics.go b/pkg/ring/kv/consul/metrics.go new file mode 100644 index 00000000000..99910a16a33 --- /dev/null +++ b/pkg/ring/kv/consul/metrics.go @@ -0,0 +1,82 @@ +package consul + +import ( + "context" + + consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" +) + +var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "consul_request_duration_seconds", + Help: "Time spent on consul requests.", + Buckets: prometheus.DefBuckets, +}, []string{"operation", "status_code"})) + +func init() { + consulRequestDuration.Register() +} + +type consulMetrics struct { + kv +} + +func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) { + var ok bool + var result *consul.WriteMeta + err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + options = options.WithContext(ctx) + var err error + ok, result, err = c.kv.CAS(p, options) + return err + }) + return ok, result, err +} + +func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { + var kvp *consul.KVPair + var meta *consul.QueryMeta + err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + options = options.WithContext(ctx) + var err error + kvp, meta, err = c.kv.Get(key, options) + return err + }) + return kvp, meta, err +} + +func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { + var kvps consul.KVPairs + var meta *consul.QueryMeta + err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + options = options.WithContext(ctx) + var err error + kvps, meta, err = c.kv.List(path, options) + return err + }) + return kvps, meta, err +} + +func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) { + var meta *consul.WriteMeta + err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + options = options.WithContext(ctx) + var err error + meta, err = c.kv.Delete(key, options) + return err + }) + return meta, err +} + +func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) { + var result *consul.WriteMeta + err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + options = options.WithContext(ctx) + var err error + result, err = c.kv.Put(p, options) + return err + }) + return result, err +} From 40c71c7836fad43a89a762655f16341c8d089d56 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 1 Jun 2020 16:06:51 -0400 Subject: [PATCH 09/10] update changelog Signed-off-by: Jacob Lisi --- CHANGELOG.md | 1 + pkg/cortex/modules.go | 2 +- pkg/querier/blocks_store_queryable.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc91c4c20c2..25fc82cd21f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] Metric `cortex_kv_request_duration_seconds` now includes `name` label to denote which client is being used as well as the `backend` label to denote the KV backend implemnetation in use. #2648 * [CHANGE] Experimental Ruler: Rule groups persisted to object storage using the experimental API have an updated object key encoding to better handle special characters. Rule groups previously-stored using object storage must be renamed to the new format. #2646 * [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553 * [CHANGE] `-promql.lookback-delta` is now deprecated and has been replaced by `-querier.lookback-delta` along with `lookback_delta` entry under `querier` in the config file. `-promql.lookback-delta` will be removed in v1.4.0. #2604 diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 694aacd4747..a82de089990 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -149,7 +149,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, nil) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 1ae5950da61..c052d8134a0 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -153,7 +153,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa storesRingBackend, err := kv.NewClient( storesRingCfg.KVStore, ring.GetCodec(), - kv.RegistererWithKVName(reg, storegateway.RingNameForClient+"-block-store"), + kv.RegistererWithKVName(reg, "querier-store-gateway"), ) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") From 9097f3dde850a97a0217a275329792c7583cce2e Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Wed, 3 Jun 2020 10:32:57 -0400 Subject: [PATCH 10/10] refactor per PR comments Signed-off-by: Jacob Lisi --- CHANGELOG.md | 2 +- pkg/distributor/distributor.go | 2 +- pkg/distributor/distributor_ring.go | 4 ---- pkg/distributor/distributor_test.go | 2 +- pkg/ring/kv/client.go | 10 ++++++++++ pkg/ring/kv/metrics.go | 12 +----------- pkg/ruler/ruler.go | 2 +- pkg/storegateway/gateway.go | 2 +- 8 files changed, 16 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25fc82cd21f..02c7a07af1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased -* [CHANGE] Metric `cortex_kv_request_duration_seconds` now includes `name` label to denote which client is being used as well as the `backend` label to denote the KV backend implemnetation in use. #2648 +* [CHANGE] Metric `cortex_kv_request_duration_seconds` now includes `name` label to denote which client is being used as well as the `backend` label to denote the KV backend implementation in use. #2648 * [CHANGE] Experimental Ruler: Rule groups persisted to object storage using the experimental API have an updated object key encoding to better handle special characters. Rule groups previously-stored using object storage must be renamed to the new format. #2646 * [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553 * [CHANGE] `-promql.lookback-delta` is now deprecated and has been replaced by `-querier.lookback-delta` along with `lookback_delta` entry under `querier` in the config file. `-promql.lookback-delta` will be removed in v1.4.0. #2604 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 08e42d84c97..165887c7fbd 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -201,7 +201,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, RingNameForClient, ring.DistributorRingKey, true, reg) + distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, reg) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index 5cb6ce9b757..2f3fba14b21 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -13,10 +13,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" ) -var ( - RingNameForClient = "distributor" -) - // RingConfig masks the ring lifecycler config which contains // many options not really required by the distributors ring. This config // is used to strip down the config to the minimum, and avoid confusion diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d6eb5f75d80..8aa2ed240cf 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -369,7 +369,7 @@ func TestDistributor_PushHAInstances(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: 100 * time.Millisecond, FailoverTimeout: time.Second, - }, prometheus.NewRegistry()) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) d.HATracker = r diff --git a/pkg/ring/kv/client.go b/pkg/ring/kv/client.go index c417fc4432f..2c334e7d997 100644 --- a/pkg/ring/kv/client.go +++ b/pkg/ring/kv/client.go @@ -151,10 +151,20 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co client = PrefixClient(client, prefix) } + // If no Registerer is provided return the raw client + if reg == nil { + return client, nil + } + return newMetricsClient(backend, client, reg), nil } func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { + var ( + primaryLabel = prometheus.Labels{"role": "primary"} + secondaryLabel = prometheus.Labels{"role": "secondary"} + ) + if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" { return nil, fmt.Errorf("primary or secondary store not set") } diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 242b7ba9872..6fa84503648 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -10,11 +10,6 @@ import ( "github.com/weaveworks/common/instrument" ) -var ( - primaryLabel = prometheus.Labels{"role": "primary"} - secondaryLabel = prometheus.Labels{"role": "secondary"} -) - // RegistererWithKVName wraps the provided Registerer with the KV name label. If a nil reg // is provided, a nil registry is returned func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Registerer { @@ -22,7 +17,7 @@ func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Reg return nil } - return prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg) + return prometheus.WrapRegistererWith(prometheus.Labels{"kv_name": name}, reg) } // errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument @@ -42,11 +37,6 @@ type metrics struct { } func newMetricsClient(backend string, c Client, reg prometheus.Registerer) Client { - // If no Registerer is provided return the raw client - if reg == nil { - return c - } - return &metrics{ c: c, requestDuration: instrument.NewHistogramCollector( diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ff59d53f186..faec4731a2d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -193,7 +193,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable ringStore, err := kv.NewClient( cfg.Ring.KVStore, ring.GetCodec(), - kv.RegistererWithKVName(reg, ring.RulerRingKey), + kv.RegistererWithKVName(reg, "ruler"), ) if err != nil { return nil, errors.Wrap(err, "create KV store client") diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 0e67e8bc8e8..49609b0af81 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -84,7 +84,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.Config, logLevel ringStore, err = kv.NewClient( gatewayCfg.ShardingRing.KVStore, ring.GetCodec(), - kv.RegistererWithKVName(reg, RingNameForClient), + kv.RegistererWithKVName(reg, "store-gateway"), ) if err != nil { return nil, errors.Wrap(err, "create KV store client")