Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.

Extract ipAddress from context #26

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Histogram `cortex_memcache_request_duration_seconds` `method` label value changes from `Memcached.Get` to `Memcached.GetBatched` for batched lookups, and is not reported for non-batched lookups (label value `Memcached.GetMulti` remains, and had exactly the same value as `Get` in nonbatched lookups). The same change applies to tracing spans. #3046
* [CHANGE] TLS server validation is now enabled by default, a new parameter `tls_insecure_skip_verify` can be set to true to skip validation optionally. #3030
* [CHANGE] `cortex_ruler_config_update_failures_total` has been removed in favor of `cortex_ruler_config_last_reload_successful`. #3056
* [FEATURE] Logging of the source IP passed along by a reverse proxy is now supported by setting the `-server.log-source-ips-enabled`. For non standard headers the settings `-server.log-source-ips-header` and `-server.log-source-ips-regex` can be used. #2985
* [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988
* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994
* [ENHANCEMENT] Query-tee: add support for doing a passthrough of requests to preferred backend for unregistered routes #3018
Expand Down
17 changes: 17 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,20 @@ The DNS service discovery, inspired from Thanos DNS SD, supports different disco
The domain name after the prefix is looked up as a SRV query, and then each SRV record is resolved as an A/AAAA record. For example: `dnssrv+memcached.namespace.svc.cluster.local`
- **`dnssrvnoa+`**<br />
The domain name after the prefix is looked up as a SRV query, with no A/AAAA lookup made after that. For example: `dnssrvnoa+memcached.namespace.svc.cluster.local`

## Logging of IP of reverse proxy

If a reverse proxy is used in front of Cortex it might be diffult to troubleshoot errors. The following 3 settings can be used to log the IP address passed along by the reverse proxy in headers like X-Forwarded-For.

- `-server.log_source_ips_enabled`

Set this to `true` to add logging of the IP when a Forwarded, X-Real-IP or X-Forwarded-For header is used. A field called `sourceIPs` will be added to error logs when data is pushed into Cortex.

- `-server.log-source-ips-header`

Header field storing the source IPs. It is only used if `-server.log-source-ips-enabled` is true and if `-server.log-source-ips-regex` is set. If not set the default Forwarded, X-Real-IP or X-Forwarded-For headers are searched.

- `-server.log-source-ips-regex`

Regular expression for matching the source IPs. It should contain at least one capturing group the first of which will be returned. Only used if `-server.log-source-ips-enabled` is true and if `-server.log-source-ips-header` is set. If not set the default Forwarded, X-Real-IP or X-Forwarded-For headers are searched.

22 changes: 17 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,29 @@ type API struct {
authMiddleware middleware.Interface
server *server.Server
logger log.Logger
sourceIPs *middleware.SourceIPExtractor
}

func New(cfg Config, s *server.Server, logger log.Logger) (*API, error) {
func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
// Ensure the encoded path is used. Required for the rules API
s.HTTP.UseEncodedPath()

var sourceIPs *middleware.SourceIPExtractor
if serverCfg.LogSourceIPs {
var err error
sourceIPs, err = middleware.NewSourceIPs(serverCfg.LogSourceIPsHeader, serverCfg.LogSourceIPsRegex)
if err != nil {
// This should have already been caught in the Server creation
return nil, err
}
}

api := &API{
cfg: cfg,
authMiddleware: cfg.HTTPAuthMiddleware,
server: s,
logger: logger,
sourceIPs: sourceIPs,
}

// If no authentication middleware is present in the config, use the default authentication middleware.
Expand Down Expand Up @@ -161,12 +173,12 @@ func (a *API) RegisterAPI(cfg interface{}) {

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, d.Push), true)
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true)
a.RegisterRoute("/distributor/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false)
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false)

// Legacy Routes
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig, d.Push), true)
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true)
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false)
a.RegisterRoute("/ha-tracker", d.HATracker, false)
}
Expand All @@ -177,12 +189,12 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf

a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false)
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false)
a.RegisterRoute("/ingester/push", push.Handler(pushConfig, i.Push), true) // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true) // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false)
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false)
a.RegisterRoute("/push", push.Handler(pushConfig, i.Push), true) // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true) // For testing and debugging.
}

// RegisterPurger registers the endpoints associated with the Purger/DeleteStore. They do not exactly
Expand Down
61 changes: 61 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package api

import (
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"
)

type FakeLogger struct{}

func (fl *FakeLogger) Log(keyvals ...interface{}) error {
return nil
}

func TestNewApiWithoutSourceIPExtractor(t *testing.T) {
cfg := Config{}
serverCfg := server.Config{
MetricsNamespace: "without_source_ip_extractor",
}
server, err := server.New(serverCfg)
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})

require.NoError(t, err)
require.Nil(t, api.sourceIPs)
}

func TestNewApiWithSourceIPExtractor(t *testing.T) {
cfg := Config{}
serverCfg := server.Config{
LogSourceIPs: true,
MetricsNamespace: "with_source_ip_extractor",
}
server, err := server.New(serverCfg)
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})

require.NoError(t, err)
require.NotNil(t, api.sourceIPs)
}

func TestNewApiWithInvalidSourceIPExtractor(t *testing.T) {
cfg := Config{}
s := server.Server{
HTTP: &mux.Router{},
}
serverCfg := server.Config{
LogSourceIPs: true,
LogSourceIPsHeader: "SomeHeader",
LogSourceIPsRegex: "[*",
MetricsNamespace: "with_invalid_source_ip_extractor",
}

api, err := New(cfg, serverCfg, &s, &FakeLogger{})
require.Error(t, err)
require.Nil(t, api)
}
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *Cortex) initAPI() (services.Service, error) {
t.Cfg.API.ServerPrefix = t.Cfg.Server.PathPrefix
t.Cfg.API.LegacyHTTPPrefix = t.Cfg.HTTPPrefix

a, err := api.New(t.Cfg.API, t.Server, util.Logger)
a, err := api.New(t.Cfg.API, t.Cfg.Server, t.Server, util.Logger)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
if err != nil {
return nil, err
}
source := util.GetSourceIPsFromOutgoingCtx(ctx)

var firstPartialErr error
removeReplica := false
Expand Down Expand Up @@ -538,6 +539,10 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}

// Get clientIP(s) from Context and add it to localCtx
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/util/extract_forwarded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package util

import (
"context"

"google.golang.org/grpc/metadata"
)

// ipAddressesKey is key for the GRPC metadata where the IP addresses are stored
const ipAddressesKey = "github.com/cortexproject/cortex/util/extract_forwarded/x-forwarded-for"

// GetSourceIPsFromOutgoingCtx extracts the source field from the GRPC context
func GetSourceIPsFromOutgoingCtx(ctx context.Context) string {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
return ""
}
ipAddresses, ok := md[ipAddressesKey]
if !ok {
return ""
}
return ipAddresses[0]
}

// GetSourceIPsFromIncomingCtx extracts the source field from the GRPC context
func GetSourceIPsFromIncomingCtx(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
ipAddresses, ok := md[ipAddressesKey]
if !ok {
return ""
}
return ipAddresses[0]
}

// AddSourceIPsToOutgoingContext adds the given source to the GRPC context
func AddSourceIPsToOutgoingContext(ctx context.Context, source string) context.Context {
if source != "" {
ctx = metadata.AppendToOutgoingContext(ctx, ipAddressesKey, source)
}
return ctx
}

// AddSourceIPsToIncomingContext adds the given source to the GRPC context
func AddSourceIPsToIncomingContext(ctx context.Context, source string) context.Context {
if source != "" {
md := metadata.Pairs(ipAddressesKey, source)
ctx = metadata.NewIncomingContext(ctx, md)
}
return ctx
}
58 changes: 58 additions & 0 deletions pkg/util/extract_forwarded_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package util

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
)

func TestGetSourceFromOutgoingCtx(t *testing.T) {
tests := []struct {
name string
key string
value string
want string
}{
{
name: "No value in key",
key: ipAddressesKey,
value: "",
want: "",
},
{
name: "Value in key",
key: ipAddressesKey,
value: "172.16.1.1",
want: "172.16.1.1",
},
{
name: "Stored under wrong key",
key: "wrongkey",
value: "172.16.1.1",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test extracting from incoming context
ctx := context.Background()
if tt.value != "" {
md := metadata.Pairs(tt.key, tt.value)
ctx = metadata.NewIncomingContext(ctx, md)
}
got := GetSourceIPsFromIncomingCtx(ctx)
assert.Equal(t, tt.want, got)

// Test extracting from outgoing context
ctx = context.Background()
if tt.value != "" {
md := metadata.Pairs(tt.key, tt.value)
ctx = metadata.NewOutgoingContext(ctx, md)
}
got = GetSourceIPsFromOutgoingCtx(ctx)
assert.Equal(t, tt.want, got)
})
}
}
6 changes: 6 additions & 0 deletions pkg/util/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func WithTraceID(traceID string, l log.Logger) log.Logger {
return log.With(l, "traceID", traceID)
}

// WithSourceIPs returns a Logger that has information about the source IPs in
// its details.
func WithSourceIPs(sourceIPs string, l log.Logger) log.Logger {
return log.With(l, "sourceIPs", sourceIPs)
}

// CheckFatal prints an error and exits with error code 1 if err is non-nil
func CheckFatal(location string, err error) {
if err != nil {
Expand Down
17 changes: 13 additions & 4 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(cfg distributor.Config, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler {
func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util.WithContext(ctx, util.Logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
logger = util.WithSourceIPs(source, logger)
}
}
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
_, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -28,7 +37,7 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq
req.Source = client.API
}

if _, err := push(r.Context(), &req.WriteRequest); err != nil {
if _, err := push(ctx, &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -20,15 +21,16 @@ import (
func TestHandler_remoteWrite(t *testing.T) {
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t))
resp := httptest.NewRecorder()
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.API))
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, nil, verifyWriteRequestHandler(t, client.API))
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}

func TestHandler_cortexWriteRequest(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t))
resp := httptest.NewRecorder()
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.RULE))
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, sourceIPs, verifyWriteRequestHandler(t, client.RULE))
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand Down