Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7c93a11
feat: Added support for logging of HTTP Headers
zspeaks Jul 27, 2022
0926efb
Fixed test failing due to namespace overlap
zspeaks Jul 27, 2022
8ada6b0
Updates to reflect PR feedback
zspeaks Jul 28, 2022
dfa9443
Merge remote-tracking branch 'origin/HTTPHeaderLogging' into HTTPHead…
zspeaks Aug 12, 2022
2943504
Rough draft propogation to ingester + logging in ingester
zspeaks Aug 12, 2022
d9b1907
Merge branch 'cortexproject:master' into HTTPHeaderLogging
zspeaks Aug 23, 2022
81a1d95
Merge branch 'cortexproject:master' into HTTPHeaderLogging
zspeaks Aug 23, 2022
d404f94
Added Header Propogation to Ingester and Querier
zspeaks Aug 23, 2022
6429d04
Addressed whitespace and linting issues
zspeaks Aug 23, 2022
a61937c
Additional linting fixes
zspeaks Aug 23, 2022
dc22472
Fixed failing DCO check
zspeaks Aug 23, 2022
0b9fa31
Removed outdated line from prior test design
zspeaks Aug 23, 2022
ddd90b9
Updates to reflect PR feedback
zspeaks Aug 25, 2022
f183fda
Additional Updates to reflect feedback
zspeaks Aug 25, 2022
a10aefc
Improved string comparison when decoding
zspeaks Aug 25, 2022
f617e45
Added header decoding to frontend_processor.go for when scheduler ser…
zspeaks Aug 25, 2022
65effc7
Merge branch 'cortexproject:master' into HTTPHeaderLogging
zspeaks Aug 26, 2022
8e5c3ba
Added encoding/decoding interceptors for streams and updated changelog
zspeaks Aug 26, 2022
0e37794
Undo accidental updates to single-process-config-block.yaml file
zspeaks Aug 26, 2022
5800d4c
Whitespace updates
zspeaks Aug 26, 2022
162ca7f
Refactored encode/decode headers to inject/extract to better align wi…
zspeaks Aug 30, 2022
d238040
Fixed failing linter
zspeaks Aug 31, 2022
58bc638
Switched extraction to being done by a middleware on the querier API
zspeaks Aug 31, 2022
af7c250
Linting
zspeaks Aug 31, 2022
2507235
Changes to reflect PR feedback
zspeaks Sep 1, 2022
1ebee10
Updates to tests to match go conventions
zspeaks Sep 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
* [FEATURE] Ruler: Add support to pass custom implementations of queryable and pusher. #4782
* [FEATURE] Create OpenTelemetry Bridge for Tracing. Now cortex can send traces to multiple destinations using OTEL Collectors. #4834
* [FEATURE] Added `-api.http-request-headers-to-log` allowing for the addition of HTTP Headers to logs #4803
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767

Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ api:
# CLI flag: -http.prometheus-http-prefix
[prometheus_http_prefix: <string> | default = "/prometheus"]

# Which HTTP Request headers to add to logs
# CLI flag: -api.http-request-headers-to-log
[http_request_headers_to_log: <list of string> | default = []]

# The server_config configures the HTTP and gRPC server of the launched
# service(s).
[server: <server_config>]
Expand Down
28 changes: 21 additions & 7 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/push"
)

Expand Down Expand Up @@ -61,11 +62,15 @@ type Config struct {
// initialized, the custom config handler will be used instead of
// DefaultConfigHandler.
CustomConfigHandler ConfigHandler `yaml:"-"`

// Allows and is used to configure the addition of HTTP Header fields to logs
HTTPRequestHeadersToLog flagext.StringSlice `yaml:"http_request_headers_to_log"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
cfg.RegisterFlagsWithPrefix("", f)
}

Expand All @@ -85,13 +90,13 @@ func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
}

type API struct {
AuthMiddleware middleware.Interface

cfg Config
server *server.Server
logger log.Logger
sourceIPs *middleware.SourceIPExtractor
indexPage *IndexPageContent
AuthMiddleware middleware.Interface
cfg Config
server *server.Server
logger log.Logger
sourceIPs *middleware.SourceIPExtractor
indexPage *IndexPageContent
HTTPHeaderMiddleware *HTTPHeaderMiddleware
}

func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
Expand Down Expand Up @@ -121,6 +126,9 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
if cfg.HTTPAuthMiddleware == nil {
api.AuthMiddleware = middleware.AuthenticateUser
}
if len(cfg.HTTPRequestHeadersToLog) > 0 {
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{TargetHeaders: cfg.HTTPRequestHeadersToLog}
}

return api, nil
}
Expand All @@ -139,6 +147,9 @@ func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, method
if a.cfg.ResponseCompression {
handler = gziphandler.GzipHandler(handler)
}
if a.HTTPHeaderMiddleware != nil {
handler = a.HTTPHeaderMiddleware.Wrap(handler)
}

if len(methods) == 0 {
a.server.HTTP.Path(path).Handler(handler)
Expand All @@ -156,6 +167,9 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
if a.cfg.ResponseCompression {
handler = gziphandler.GzipHandler(handler)
}
if a.HTTPHeaderMiddleware != nil {
handler = a.HTTPHeaderMiddleware.Wrap(handler)
}

if len(methods) == 0 {
a.server.HTTP.PathPrefix(prefix).Handler(handler)
Expand Down
34 changes: 34 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,37 @@ func TestNewApiWithInvalidSourceIPExtractor(t *testing.T) {
require.Error(t, err)
require.Nil(t, api)
}

func TestNewApiWithHeaderLogging(t *testing.T) {
cfg := Config{
HTTPRequestHeadersToLog: []string{"ForTesting"},
}
serverCfg := server.Config{
HTTPListenNetwork: server.DefaultNetwork,
MetricsNamespace: "with_header_logging",
}
server, err := server.New(serverCfg)
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})
require.NoError(t, err)
require.NotNil(t, api.HTTPHeaderMiddleware)

}

func TestNewApiWithoutHeaderLogging(t *testing.T) {
cfg := Config{
HTTPRequestHeadersToLog: []string{},
}
serverCfg := server.Config{
HTTPListenNetwork: server.DefaultNetwork,
MetricsNamespace: "without_header_logging",
}
server, err := server.New(serverCfg)
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})
require.NoError(t, err)
require.Nil(t, api.HTTPHeaderMiddleware)

}
34 changes: 34 additions & 0 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package api

import (
"context"
"net/http"

"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation
Expand All @@ -27,3 +29,35 @@ func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader purger.To
})
})
}

// HTTPHeaderMiddleware adds specified HTTPHeaders to the request context
type HTTPHeaderMiddleware struct {
TargetHeaders []string
}

// InjectTargetHeadersIntoHTTPRequest injects specified HTTPHeaders into the request context
func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request) context.Context {
headerMap := make(map[string]string)

// Check to make sure that Headers have not already been injected
checkMapInContext := util_log.HeaderMapFromContext(r.Context())
if checkMapInContext != nil {
return r.Context()
}

for _, target := range h.TargetHeaders {
contents := r.Header.Get(target)
if contents != "" {
headerMap[target] = contents
}
}
return util_log.ContextWithHeaderMap(r.Context(), headerMap)
}

// Wrap implements Middleware
func (h HTTPHeaderMiddleware) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := h.InjectTargetHeadersIntoHTTPRequest(r)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
74 changes: 74 additions & 0 deletions pkg/api/middlewares_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package api

import (
"context"
"net/http"
"testing"

"github.com/stretchr/testify/require"

util_log "github.com/cortexproject/cortex/pkg/util/log"
)

var HTTPTestMiddleware = HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}

func TestHeaderInjection(t *testing.T) {
ctx := context.Background()
h := http.Header{}
contentsMap := make(map[string]string)
contentsMap["TestHeader1"] = "RequestID"
contentsMap["TestHeader2"] = "ContentsOfTestHeader2"
contentsMap["Test3"] = "SomeInformation"

h.Add("TestHeader1", contentsMap["TestHeader1"])
h.Add("TestHeader2", contentsMap["TestHeader2"])
h.Add("Test3", contentsMap["Test3"])

req := &http.Request{
Method: "GET",
RequestURI: "/HTTPHeaderTest",
Body: http.NoBody,
Header: h,
}

req = req.WithContext(ctx)
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)

headerMap := util_log.HeaderMapFromContext(ctx)
require.NotNil(t, headerMap)

for _, header := range HTTPTestMiddleware.TargetHeaders {
require.Equal(t, contentsMap[header], headerMap[header])
}
for header, contents := range contentsMap {
require.Equal(t, contents, headerMap[header])
}
}

func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
ctx := context.Background()

h := http.Header{}
contentsMap := make(map[string]string)
contentsMap["TestHeader1"] = "RequestID"
contentsMap["TestHeader2"] = "ContentsOfTestHeader2"
contentsMap["Test3"] = "SomeInformation"

h.Add("TestHeader1", "Fail1")
h.Add("TestHeader2", "Fail2")
h.Add("Test3", "Fail3")

ctx = util_log.ContextWithHeaderMap(ctx, contentsMap)
req := &http.Request{
Method: "GET",
RequestURI: "/HTTPHeaderTest",
Body: http.NoBody,
Header: h,
}

req = req.WithContext(ctx)
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)

require.Equal(t, contentsMap, util_log.HeaderMapFromContext(ctx))

}
10 changes: 10 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func New(cfg Config) (*Cortex, error) {
}

cortex.setupThanosTracing()
cortex.setupGRPCHeaderForwarding()

if err := cortex.setupModuleManager(); err != nil {
return nil, err
Expand All @@ -368,6 +369,15 @@ func (t *Cortex) setupThanosTracing() {
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor)
}

// setupGRPCHeaderForwarding appends a gRPC middleware used to enable the propagation of
// HTTP Headers through child gRPC calls
func (t *Cortex) setupGRPCHeaderForwarding() {
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
}
}

// Run starts Cortex running, and blocks until a Cortex stops.
func (t *Cortex) Run() error {
// Register custom process metrics.
Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context.
internalQuerierRouter = t.API.AuthMiddleware.Wrap(internalQuerierRouter)

if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
}
}

// If neither frontend address or scheduler address is configured, no worker is needed.
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
// Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use
if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap)
}

// Get clientIP(s) from Context and add it to localCtx
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)
Expand Down
Loading