Skip to content

Allow HTTP pushes directly to ingesters. #1491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [CHANGE] Removed unused /validate_expr endpoint. #2152
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
* [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand All @@ -32,6 +33,7 @@
* [FEATURE] Add ability to override YAML config file settings using environment variables. #2147
* `-config.expand-env`
* [FEATURE] Add /config HTTP endpoint which exposes the current Cortex configuration as YAML. #2165
* [FEATURE] Allow Prometheus remote write directly to ingesters. #1491
* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to gauge the number of valid and invalid configs. #2125
* [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
Expand Down
6 changes: 4 additions & 2 deletions docs/apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ The API for reads also accepts HTTP/protobuf/snappy, and the path is `/api/prom/

See the Prometheus documentation for [more information on the Prometheus remote write format](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations).



## Alerts & Rules API

Cortex supports the Prometheus' [alerts](https://prometheus.io/docs/prometheus/latest/querying/api/#alerts) and [rules](https://prometheus.io/docs/prometheus/latest/querying/api/#rules) api endpoints. This is supported in the Ruler service and can be enabled using the `experimental.ruler.enable-api` flag.
Expand Down Expand Up @@ -166,3 +164,7 @@ Note that setting a new config will effectively "re-enable" the Rules and Alertm

- Normal Response Codes: NoContent(204)
- Error Response Codes: Unauthorized(401)

#### Testing APIs

`POST /push` - Push samples directly to ingesters. Accepts requests in Prometheus remote write format. Indended for performance testing and debugging.
17 changes: 0 additions & 17 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,6 @@ The `server_config` configures the HTTP and gRPC server of the launched service(
The `distributor_config` configures the Cortex distributor.

```yaml
# Report number of ingested samples to billing system.
# CLI flag: -distributor.enable-billing
[enable_billing: <boolean> | default = false]

billing:
# Maximum number of billing events to buffer in memory
# CLI flag: -billing.max-buffered-events
[maxbufferedevents: <int> | default = 1024]

# How often to retry sending events to the billing ingester.
# CLI flag: -billing.retry-delay
[retrydelay: <duration> | default = 500ms]

# points to the billing ingester sidecar (should be on localhost)
# CLI flag: -billing.ingester
[ingesterhostport: <string> | default = "localhost:24225"]

pool:
# How frequently to clean up clients for ingesters that have gone away.
# CLI flag: -distributor.client-cleanup-period
Expand Down
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.9.0
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb
Expand All @@ -47,7 +46,6 @@ require (
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
github.com/opentracing/opentracing-go v1.1.1-0.20200124165624-2876d2018785
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/alertmanager v0.19.0
github.com/prometheus/client_golang v1.2.1
Expand All @@ -59,10 +57,8 @@ require (
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.4.0
github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1S
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structtag v1.1.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
Expand Down Expand Up @@ -635,8 +633,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 h1:jkUFVqrKRttbdDqkTrvOmHxfqIsJK0Oe2WGi1ACAE+M=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -763,8 +759,6 @@ github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/T
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d h1:Ninez2SUm08xpmnw7kVxCeOc3DahF6IuMuRMCdM4wTQ=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand All @@ -776,8 +770,6 @@ github.com/uber/jaeger-lib v1.5.1-0.20181102163054-1fc5c315e03c/go.mod h1:ComeND
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 h1:qi+YkNiB7T3Ikw1DoDIFhdAPbDU7fUPDsKrUoZdupnQ=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1/go.mod h1:7gGdEUJaCrSlWi/mjd68CZv0sfqektYPDcro9cE+M9k=
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a h1:4Sm4LnEnP1yQ2NeNgGqLTuN2xrTvcBOU+EsljpB8Ed0=
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAqfQBFrE8X/XdJwZr8IKgh1chStuFR0mjU/UOUw=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
Expand Down
4 changes: 3 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/push"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (t *Cortex) initDistributor(cfg *Config) (err error) {
}

t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler)
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler)))
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(push.Handler(cfg.Distributor, t.distributor.Push)))
t.server.HTTP.Handle("/ha-tracker", t.distributor.Replicas)
return
}
Expand Down Expand Up @@ -361,6 +362,7 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
t.server.HTTP.Handle("/push", t.httpAuthMiddleware.Wrap(push.Handler(cfg.Distributor, t.ingester.Push)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. I'm a bit concerned about the inconsistency in the path between distributors (/api/prom/push) and ingesters (/push). Have you thought about it?
  2. Could you document it to docs/apis.md, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about the inconsistency in the path between distributors (/api/prom/push) and ingesters (/push). Have you thought about it?

Yeah, we can have them at the same path as the paths will collide in the single binary case.

I put it at /push to indicate its a private API; the /api/prom is for public endpoints.

return
}

Expand Down
37 changes: 0 additions & 37 deletions pkg/distributor/billing.go

This file was deleted.

18 changes: 1 addition & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -99,7 +98,6 @@ type Distributor struct {
ingestersRing ring.ReadRing
ingesterPool *ingester_client.Pool
limits *validation.Overrides
billingClient *billing.Client

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
Expand All @@ -115,9 +113,7 @@ type Distributor struct {
// Config contains the configuration require to
// create a Distributor
type Config struct {
EnableBilling bool `yaml:"enable_billing,omitempty"`
BillingConfig billing.Config `yaml:"billing,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`

HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`

Expand All @@ -136,12 +132,10 @@ type Config struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BillingConfig.RegisterFlags(f)
cfg.PoolConfig.RegisterFlags(f)
cfg.HATrackerConfig.RegisterFlags(f)
cfg.DistributorRing.RegisterFlags(f)

f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
Expand All @@ -162,15 +156,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}
}

var billingClient *billing.Client
if cfg.EnableBilling {
var err error
billingClient, err = billing.NewClient(cfg.BillingConfig)
if err != nil {
return nil, err
}
}

replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

Expand Down Expand Up @@ -204,7 +189,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
cfg: cfg,
ingestersRing: ingestersRing,
ingesterPool: ingester_client.NewPool(cfg.PoolConfig, ingestersRing, cfg.ingesterClientFactory, util.Logger),
billingClient: billingClient,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
Expand Down
40 changes: 0 additions & 40 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,9 @@ package distributor
import (
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// PushHandler is a http.Handler which accepts WriteRequests.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
buf, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), d.cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if d.cfg.EnableBilling {
var samples int64
for _, ts := range req.Timeseries {
samples += int64(len(ts.Samples))
}
if err := d.emitBillingRecord(r.Context(), buf, samples); err != nil {
level.Error(logger).Log("msg", "error emitting billing record", "err", err)
}
}

if _, err := d.Push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode() != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
}

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
Expand Down
41 changes: 41 additions & 0 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package push

import (
"context"
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(cfg distributor.Config, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if _, err := push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode() != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
})
}
Loading