diff --git a/.circleci/config.yml b/.circleci/config.yml index 11e10d7fce4..635695238e0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -56,7 +56,7 @@ jobs: - checkout - run: name: Integration Test - command: MIGRATIONS_DIR=$(pwd)/pkg/configs/db/migrations make BUILD_IN_CONTAINER=false configs-integration-test + command: MIGRATIONS_DIR=$(pwd)/cmd/cortex/migrations make BUILD_IN_CONTAINER=false configs-integration-test build: <<: *defaults diff --git a/.gitignore b/.gitignore index 35af20e5d3d..9ec3a8ec562 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,5 @@ -cmd/alertmanager/alertmanager -cmd/configs/configs -cmd/distributor/distributor -cmd/ingester/ingester -cmd/querier/querier -cmd/query-frontend/query-frontend -cmd/ruler/ruler -cmd/table-manager/table-manager -cmd/lite/lite cmd/test-exporter/test-exporter -cmd/*/.migrations +cmd/cortex/cortex .uptodate .pkg .cache diff --git a/Makefile b/Makefile index b0ea5d21aea..6ea4a758668 100644 --- a/Makefile +++ b/Makefile @@ -34,9 +34,6 @@ images: PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) -# Ensure a target for each image that needs a migration directory -MIGRATION_DIRS := cmd/alertmanager/.migrations cmd/configs/.migrations cmd/ruler/.migrations cmd/lite/.migrations - # Building binaries is now automated. The convention is to build a binary # for every directory with main.go in it, in the ./cmd directory. MAIN_GO := $(shell find . $(DONT_FIND) -type f -name 'main.go' -print) @@ -86,7 +83,7 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ ifeq ($(BUILD_IN_CONTAINER),true) -exes $(EXES) generated $(MIGRATION_DIRS) $(PROTO_GOS) lint test shell mod-check: build-image/$(UPTODATE) +exes $(EXES) generated $(PROTO_GOS) lint test shell mod-check: build-image/$(UPTODATE) @mkdir -p $(shell pwd)/.pkg @mkdir -p $(shell pwd)/.cache $(SUDO) time docker run $(RM) $(TTY) -i \ @@ -103,10 +100,10 @@ configs-integration-test: build-image/$(UPTODATE) -v $(shell pwd)/.cache:/go/cache \ -v $(shell pwd)/.pkg:/go/pkg \ -v $(shell pwd):/go/src/github.com/cortexproject/cortex \ - -v $(shell pwd)/pkg/configs/db/migrations:/migrations \ - -e MIGRATIONS_DIR=/migrations \ + -v $(shell pwd)/cmd/cortex/migrations:/migrations \ --workdir /go/src/github.com/cortexproject/cortex \ --link "$$DB_CONTAINER":configs-db.cortex.local \ + -e DB_ADDR=configs-db.cortex.local \ $(IMAGE_PREFIX)build-image $@; \ status=$$?; \ test -n "$(CIRCLECI)" || docker rm -f "$$DB_CONTAINER"; \ @@ -139,7 +136,7 @@ shell: bash configs-integration-test: - /bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..." + /bin/bash -c "go test -v -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..." mod-check: GO111MODULE=on go mod download @@ -148,15 +145,10 @@ mod-check: GO111MODULE=on go mod vendor @git diff --exit-code -- go.sum go.mod vendor/ -%/.migrations: - # Ensure a each image that requires a migration dir has one in the build context - cp -r pkg/configs/db/migrations $@ - endif clean: $(SUDO) docker rmi $(IMAGE_NAMES) >/dev/null 2>&1 || true - rm -rf $(MIGRATION_DIRS) rm -rf $(UPTODATE_FILES) $(EXES) $(PROTO_GOS) .cache go clean ./... @@ -175,7 +167,7 @@ load-images: fi \ done -# Loads the built Docker images into the minikube environmen, and tags them with +# Loads the built Docker images into the minikube environment, and tags them with # "latest" so the k8s manifests shipped with this code work. prime-minikube: save-images eval $$(minikube docker-env) ; \ diff --git a/cmd/alertmanager/Dockerfile b/cmd/alertmanager/Dockerfile deleted file mode 100644 index d1e2bbcdcf9..00000000000 --- a/cmd/alertmanager/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY alertmanager /bin/alertmanager -COPY .migrations /migrations/ -EXPOSE 80 -ENTRYPOINT [ "/bin/alertmanager" ] - -ARG revision -LABEL org.opencontainers.image.title="alertmanager" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/alertmanager" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go deleted file mode 100644 index c61364b8192..00000000000 --- a/cmd/alertmanager/main.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2015 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "flag" - - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/alertmanager" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - alertmanagerConfig alertmanager.MultitenantAlertmanagerConfig - ) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ingester") - defer trace.Close() - - flagext.RegisterFlags(&serverConfig, &alertmanagerConfig) - flag.Parse() - - util.InitLogger(&serverConfig) - - multiAM, err := alertmanager.NewMultitenantAlertmanager(&alertmanagerConfig) - util.CheckFatal("initializing MultitenantAlertmanager", err) - go multiAM.Run() - defer multiAM.Stop() - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - server.HTTP.PathPrefix("/status").Handler(multiAM.GetStatusHandler()) - server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(multiAM)) - server.Run() -} diff --git a/cmd/configs/Dockerfile b/cmd/configs/Dockerfile deleted file mode 100644 index 0e65820df99..00000000000 --- a/cmd/configs/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM alpine:3.8 -WORKDIR / -RUN apk add --no-cache ca-certificates -COPY configs /bin/configs -COPY .migrations /migrations/ -ENTRYPOINT [ "/bin/configs" ] - -ARG revision -LABEL org.opencontainers.image.title="configs" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/configs" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/configs/main.go b/cmd/configs/main.go deleted file mode 100644 index 8faaf3cf4a5..00000000000 --- a/cmd/configs/main.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "flag" - - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/configs/api" - "github.com/cortexproject/cortex/pkg/configs/db" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - // XXX: Cargo-culted from distributor. Probably don't need this - // for configs just yet? - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - dbConfig db.Config - ) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ingester") - defer trace.Close() - - flagext.RegisterFlags(&serverConfig, &dbConfig) - flag.Parse() - - util.InitLogger(&serverConfig) - - db, err := db.New(dbConfig) - util.CheckFatal("initializing database", err) - defer db.Close() - - a := api.New(db) - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - a.RegisterRoutes(server.HTTP) - server.Run() -} diff --git a/cmd/lite/Dockerfile b/cmd/cortex/Dockerfile similarity index 55% rename from cmd/lite/Dockerfile rename to cmd/cortex/Dockerfile index b235919cac1..6f70a6f1b18 100644 --- a/cmd/lite/Dockerfile +++ b/cmd/cortex/Dockerfile @@ -1,11 +1,12 @@ FROM alpine:3.8 RUN apk add --no-cache ca-certificates -COPY lite /bin/cortex -COPY .migrations /migrations/ +COPY migrations /migrations/ +COPY cortex /bin/cortex +COPY single-process-config.yaml /etc EXPOSE 80 ENTRYPOINT [ "/bin/cortex" ] ARG revision -LABEL org.opencontainers.image.title="lite" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/lite" \ +LABEL org.opencontainers.image.title="cortex" \ + org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/cortex" \ org.opencontainers.image.revision="${revision}" diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go new file mode 100644 index 00000000000..689d2f6c5ba --- /dev/null +++ b/cmd/cortex/main.go @@ -0,0 +1,75 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "os" + + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/version" + "github.com/weaveworks/common/tracing" + "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/cortex" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +func init() { + prometheus.MustRegister(version.NewCollector("cortex")) +} + +func main() { + var ( + cfg cortex.Config + configFile = "" + eventSampleRate int + ) + flag.StringVar(&configFile, "config.file", "", "Configuration file to load.") + flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).") + + flagext.RegisterFlags(&cfg) + flag.Parse() + + if configFile != "" { + if err := LoadConfig(configFile, &cfg); err != nil { + fmt.Printf("error loading config from %s: %v\n", configFile, err) + os.Exit(1) + } + } + + // Parse a second time, as command line flags should take precedent over the config file. + flag.Parse() + + util.InitLogger(&cfg.Server) + util.InitEvents(eventSampleRate) + + // Setting the environment variable JAEGER_AGENT_HOST enables tracing + trace := tracing.NewFromEnv("cortex-" + cfg.Target.String()) + defer trace.Close() + + t, err := cortex.New(cfg) + util.CheckFatal("initializing cortex", err) + + level.Info(util.Logger).Log("msg", "Starting Cortex", "version", version.Info()) + + if err := t.Run(); err != nil { + level.Error(util.Logger).Log("msg", "error running Cortex", "err", err) + } + + err = t.Stop() + util.CheckFatal("initializing cortex", err) +} + +// LoadConfig read YAML-formatted config from filename into cfg. +func LoadConfig(filename string, cfg interface{}) error { + buf, err := ioutil.ReadFile(filename) + if err != nil { + return errors.Wrap(err, "Error reading config file") + } + + return yaml.UnmarshalStrict(buf, cfg) +} diff --git a/pkg/configs/db/migrations/001_initial_schema.up.sql b/cmd/cortex/migrations/001_initial_schema.up.sql similarity index 100% rename from pkg/configs/db/migrations/001_initial_schema.up.sql rename to cmd/cortex/migrations/001_initial_schema.up.sql diff --git a/pkg/configs/db/migrations/002_immutable_configs.up.sql b/cmd/cortex/migrations/002_immutable_configs.up.sql similarity index 100% rename from pkg/configs/db/migrations/002_immutable_configs.up.sql rename to cmd/cortex/migrations/002_immutable_configs.up.sql diff --git a/cmd/cortex/single-process-config.yaml b/cmd/cortex/single-process-config.yaml new file mode 100644 index 00000000000..1065dbb812e --- /dev/null +++ b/cmd/cortex/single-process-config.yaml @@ -0,0 +1,65 @@ + +# Configuration for running Cortex in single-process mode. +# This should not be used in production. It is only for getting started +# and development. + +# Disable the requirement that every request to Cortex has a +# X-Scope-OrgID header. `fake` will be substituted in instead. +auth_enabled: false + +server: + http_listen_port: 9009 + + # Configure the server to allow messages up to 100MB. + grpc_server_max_recv_msg_size: 104857600 + grpc_server_max_send_msg_size: 104857600 + grpc_server_max_concurrent_streams: 1000 + +distributor: + shard_by_all_labels: true + pool: + health_check_ingesters: true + +ingester_client: + grpc_client_config: + # Configure the client to allow messages up to 100MB. + max_recv_msg_size: 104857600 + max_send_msg_size: 104857600 + use_gzip_compression: true + +ingester: + #chunk_idle_period: 15m + + lifecycler: + address: 127.0.0.1 + + # We want to start immediately and flush on shutdown. + join_after: 0 + claim_on_rollout: false + final_sleep: 0s + num_tokens: 512 + + # Use an in memory ring store, so we don't need to launch a Consul. + ring: + store: inmemory + replication_factor: 1 + +# Use local storage - BoltDB for the index, and the filesystem +# for the chunks. +schema: + configs: + - from: 2019-03-25 + store: boltdb + object_store: filesystem + schema: v10 + index: + prefix: index_ + period: 168h + +storage: + boltdb: + directory: /tmp/cortex/index + + filesystem: + directory: /tmp/cortex/chunks + diff --git a/cmd/distributor/Dockerfile b/cmd/distributor/Dockerfile deleted file mode 100644 index 867365c9d4d..00000000000 --- a/cmd/distributor/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY distributor /bin/distributor -EXPOSE 80 -ENTRYPOINT [ "/bin/distributor" ] - -ARG revision -LABEL org.opencontainers.image.title="distributor" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/distributor" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/distributor/main.go b/cmd/distributor/main.go deleted file mode 100644 index 160d3fdf871..00000000000 --- a/cmd/distributor/main.go +++ /dev/null @@ -1,96 +0,0 @@ -package main - -import ( - "flag" - "net/http" - - "github.com/opentracing-contrib/go-stdlib/nethttp" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/distributor" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - // The pattern for main functions is a series of config objects, which are - // registered for command line flags, and then a series of components that - // are instantiated and composed. Some rules of thumb: - // - Config types should only contain 'simple' types (ints, strings, urls etc). - // - Flag validation should be done by the flag; use a flag.Value where - // appropriate. - // - Config types should map 1:1 with a component type. - // - Config types should define flags with a common prefix. - // - It's fine to nest configs within configs, but this should match the - // nesting of components within components. - // - Limit as much is possible sharing of configuration between config types. - // Where necessary, use a pointer for this - avoid repetition. - // - Where a nesting of components its not obvious, it's fine to pass - // references to other components constructors to compose them. - // - First argument for a components constructor should be its matching config - // object. - - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - ExcludeRequestInLog: true, - } - ringConfig ring.Config - distributorConfig distributor.Config - clientConfig client.Config - limits validation.Limits - preallocConfig client.PreallocConfig - ) - flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, - &preallocConfig) - flag.Parse() - - util.InitLogger(&serverConfig) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("distributor") - defer trace.Close() - - r, err := ring.New(ringConfig) - util.CheckFatal("initializing ring", err) - prometheus.MustRegister(r) - defer r.Stop() - - overrides, err := validation.NewOverrides(limits) - util.CheckFatal("initializing overrides", err) - - dist, err := distributor.New(distributorConfig, clientConfig, overrides, r) - util.CheckFatal("initializing distributor", err) - defer dist.Stop() - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - // Administrator functions - server.HTTP.Handle("/ring", r) - server.HTTP.HandleFunc("/all_user_stats", dist.AllUserStatsHandler) - - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - server.HTTP.Handle("/api/prom/push", middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - middleware.AuthenticateUser, - ).Wrap(http.HandlerFunc(dist.PushHandler))) - - server.Run() -} diff --git a/cmd/ingester/Dockerfile b/cmd/ingester/Dockerfile deleted file mode 100644 index fe3007a0b19..00000000000 --- a/cmd/ingester/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY ingester /bin/ingester -EXPOSE 80 -ENTRYPOINT [ "/bin/ingester" ] - -ARG revision -LABEL org.opencontainers.image.title="ingester" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/ingester" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go deleted file mode 100644 index 4ba2757d4b3..00000000000 --- a/cmd/ingester/main.go +++ /dev/null @@ -1,95 +0,0 @@ -package main - -import ( - "flag" - "net/http" - - "google.golang.org/grpc" - _ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered - "google.golang.org/grpc/health/grpc_health_v1" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/cortexproject/cortex/pkg/ingester" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - GRPCStreamMiddleware: []grpc.StreamServerInterceptor{ - func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - // Don't check auth header on TransferChunks, as we weren't originally - // sending it and this could cause transfers to fail on update. - if info.FullMethod == "/cortex.Ingester/TransferChunks" { - return handler(srv, ss) - } - - return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) - }, - }, - ExcludeRequestInLog: true, - } - chunkStoreConfig chunk.StoreConfig - schemaConfig chunk.SchemaConfig - storageConfig storage.Config - ingesterConfig ingester.Config - preallocConfig client.PreallocConfig - clientConfig client.Config - encodingConfig encoding.Config - limits validation.Limits - eventSampleRate int - maxStreams uint - ) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ingester") - defer trace.Close() - - // Ingester needs to know our gRPC listen port. - ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort - - flagext.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig, - &schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig, &encodingConfig) - flag.UintVar(&maxStreams, "ingester.max-concurrent-streams", 1000, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") - flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).") - flag.Parse() - - util.InitLogger(&serverConfig) - util.InitEvents(eventSampleRate) - - if maxStreams > 0 { - serverConfig.GRPCOptions = append(serverConfig.GRPCOptions, grpc.MaxConcurrentStreams(uint32(maxStreams))) - } - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - overrides, err := validation.NewOverrides(limits) - util.CheckFatal("initializing overrides", err) - chunkStore, err := storage.NewStore(storageConfig, chunkStoreConfig, schemaConfig, overrides) - util.CheckFatal("", err) - defer chunkStore.Stop() - - ingester, err := ingester.New(ingesterConfig, clientConfig, overrides, chunkStore) - util.CheckFatal("", err) - defer ingester.Shutdown() - - client.RegisterIngesterServer(server.GRPC, ingester) - grpc_health_v1.RegisterHealthServer(server.GRPC, ingester) - server.HTTP.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler)) - server.HTTP.Path("/flush").Handler(http.HandlerFunc(ingester.FlushHandler)) - server.Run() -} diff --git a/cmd/lite/main.go b/cmd/lite/main.go deleted file mode 100644 index 5824af473b2..00000000000 --- a/cmd/lite/main.go +++ /dev/null @@ -1,181 +0,0 @@ -package main - -import ( - "flag" - "net/http" - "regexp" - - "github.com/opentracing-contrib/go-stdlib/nethttp" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/config" - v1 "github.com/prometheus/prometheus/web/api/v1" - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/cortexproject/cortex/pkg/distributor" - "github.com/cortexproject/cortex/pkg/ingester" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/querier" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/ruler" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" - "github.com/weaveworks/common/user" -) - -var ( - serverConfig server.Config - chunkStoreConfig chunk.StoreConfig - distributorConfig distributor.Config - querierConfig querier.Config - ingesterConfig ingester.Config - configStoreConfig ruler.ConfigStoreConfig - rulerConfig ruler.Config - schemaConfig chunk.SchemaConfig - storageConfig storage.Config - tbmConfig chunk.TableManagerConfig - - ingesterClientConfig client.Config - limitsConfig validation.Limits - - unauthenticated bool -) - -func main() { - getConfigsFromCommandLine() - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ingester") - defer trace.Close() - - util.InitLogger(&serverConfig) - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - overrides, err := validation.NewOverrides(limitsConfig) - util.CheckFatal("initializing overrides", err) - schemaConfig.Load() - chunkStore, err := storage.NewStore(storageConfig, chunkStoreConfig, schemaConfig, overrides) - util.CheckFatal("", err) - defer chunkStore.Stop() - - r, err := ring.New(ingesterConfig.LifecyclerConfig.RingConfig) - util.CheckFatal("initializing ring", err) - prometheus.MustRegister(r) - defer r.Stop() - - dist, err := distributor.New(distributorConfig, ingesterClientConfig, overrides, r) - util.CheckFatal("initializing distributor", err) - defer dist.Stop() - - ingester, err := ingester.New(ingesterConfig, ingesterClientConfig, overrides, chunkStore) - util.CheckFatal("", err) - defer ingester.Shutdown() - - // Assume the newest config is the one to use - storeName := schemaConfig.Configs[len(schemaConfig.Configs)-1].IndexType - tableClient, err := storage.NewTableClient(storeName, storageConfig) - util.CheckFatal("initializing table client", err) - - tableManager, err := chunk.NewTableManager(tbmConfig, schemaConfig, ingesterConfig.MaxChunkAge, tableClient) - util.CheckFatal("initializing table manager", err) - tableManager.Start() - defer tableManager.Stop() - - queryable, engine := querier.New(querierConfig, dist, chunkStore) - - if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" { - rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) - util.CheckFatal("initializing ruler config store", err) - rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist) - util.CheckFatal("initializing ruler", err) - defer rlr.Stop() - - rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) - util.CheckFatal("initializing ruler server", err) - defer rulerServer.Stop() - } - - api := v1.NewAPI( - engine, - queryable, - querier.DummyTargetRetriever{}, - querier.DummyAlertmanagerRetriever{}, - func() config.Config { return config.Config{} }, - map[string]string{}, // TODO: include configuration flags - func(f http.HandlerFunc) http.HandlerFunc { return f }, - func() v1.TSDBAdmin { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. - util.Logger, - querier.DummyRulesRetriever{}, - 0, 0, // Remote read samples and concurrency limit. - regexp.MustCompile(".*"), - ) - promRouter := route.New().WithPrefix("/api/prom/api/v1") - api.Register(promRouter) - - activeMiddleware := middleware.AuthenticateUser - if unauthenticated { - activeMiddleware = middleware.Func(func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := user.InjectOrgID(r.Context(), "0") - next.ServeHTTP(w, r.WithContext(ctx)) - }) - }) - } - - // Only serve the API for setting & getting rules configs if we're not - // serving configs from the configs API. Allows for smoother - // migration. See https://github.com/cortexproject/cortex/issues/619 - if configStoreConfig.ConfigsAPIURL.URL == nil { - a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) - util.CheckFatal("initializing public rules API", err) - a.RegisterRoutes(server.HTTP) - } - - subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() - subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter)) - subrouter.Path("/read").Handler(activeMiddleware.Wrap(querier.RemoteReadHandler(queryable))) - subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler))) - subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler))) - - client.RegisterIngesterServer(server.GRPC, ingester) - server.HTTP.Handle("/ready", http.HandlerFunc(ingester.ReadinessHandler)) - server.HTTP.Handle("/flush", http.HandlerFunc(ingester.FlushHandler)) - server.HTTP.Handle("/ring", r) - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - server.HTTP.Handle("/api/prom/push", middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - activeMiddleware, - ).Wrap(http.HandlerFunc(dist.PushHandler))) - server.Run() -} - -func getConfigsFromCommandLine() { - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - // Ingester needs to know our gRPC listen port. - ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort - flagext.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig, - &ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, - &ingesterClientConfig, &limitsConfig, &tbmConfig) - flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.") - flag.Parse() -} diff --git a/cmd/lite/main_test.go b/cmd/lite/main_test.go deleted file mode 100644 index 947fa8d7fcb..00000000000 --- a/cmd/lite/main_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestFlags(t *testing.T) { - assert.NotPanics(t, getConfigsFromCommandLine) -} diff --git a/cmd/querier/Dockerfile b/cmd/querier/Dockerfile deleted file mode 100644 index 14e0a1ecd1b..00000000000 --- a/cmd/querier/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY querier /bin/querier -EXPOSE 80 -ENTRYPOINT [ "/bin/querier" ] - -ARG revision -LABEL org.opencontainers.image.title="querier" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/querier" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/querier/main.go b/cmd/querier/main.go deleted file mode 100644 index a1192791f8b..00000000000 --- a/cmd/querier/main.go +++ /dev/null @@ -1,115 +0,0 @@ -package main - -import ( - "flag" - "net/http" - "regexp" - - "google.golang.org/grpc" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/config" - v1 "github.com/prometheus/prometheus/web/api/v1" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/storage" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" - "github.com/cortexproject/cortex/pkg/distributor" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/querier" - "github.com/cortexproject/cortex/pkg/querier/frontend" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" - httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - ringConfig ring.Config - distributorConfig distributor.Config - clientConfig client.Config - limits validation.Limits - querierConfig querier.Config - chunkStoreConfig chunk.StoreConfig - schemaConfig chunk.SchemaConfig - storageConfig storage.Config - workerConfig frontend.WorkerConfig - queryParallelism int - ) - flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, - &querierConfig, &chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig) - flag.IntVar(&queryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.") - flag.Parse() - chunk_util.QueryParallelism = queryParallelism - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("querier") - defer trace.Close() - - util.InitLogger(&serverConfig) - - r, err := ring.New(ringConfig) - util.CheckFatal("initializing ring", err) - prometheus.MustRegister(r) - defer r.Stop() - - overrides, err := validation.NewOverrides(limits) - util.CheckFatal("initializing overrides", err) - - dist, err := distributor.New(distributorConfig, clientConfig, overrides, r) - util.CheckFatal("initializing distributor", err) - defer dist.Stop() - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - server.HTTP.Handle("/ring", r) - - chunkStore, err := storage.NewStore(storageConfig, chunkStoreConfig, schemaConfig, overrides) - util.CheckFatal("initializing storage client", err) - defer chunkStore.Stop() - - worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTPServer.Handler), util.Logger) - util.CheckFatal("", err) - defer worker.Stop() - - queryable, engine := querier.New(querierConfig, dist, chunkStore) - api := v1.NewAPI( - engine, - queryable, - querier.DummyTargetRetriever{}, - querier.DummyAlertmanagerRetriever{}, - func() config.Config { return config.Config{} }, - map[string]string{}, // TODO: include configuration flags - func(f http.HandlerFunc) http.HandlerFunc { return f }, - func() v1.TSDBAdmin { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. - util.Logger, - querier.DummyRulesRetriever{}, - 0, 0, // Remote read samples and concurrency limit. - regexp.MustCompile(".*"), - ) - promRouter := route.New().WithPrefix("/api/prom/api/v1") - api.Register(promRouter) - - subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() - subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter)) - subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable))) - subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler))) - subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler))) - subrouter.Path("/chunks").Handler(middleware.AuthenticateUser.Wrap(querier.ChunksHandler(queryable))) - - server.Run() -} diff --git a/cmd/query-frontend/Dockerfile b/cmd/query-frontend/Dockerfile deleted file mode 100644 index 45dff705f6c..00000000000 --- a/cmd/query-frontend/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY query-frontend /bin/query-frontend -EXPOSE 80 -ENTRYPOINT [ "/bin/query-frontend" ] - -ARG revision -LABEL org.opencontainers.image.title="query-frontend" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/query-frontend" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/query-frontend/main.go b/cmd/query-frontend/main.go deleted file mode 100644 index a56d7901674..00000000000 --- a/cmd/query-frontend/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "flag" - "os" - - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/go-kit/kit/log/level" - - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/querier/frontend" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - frontendConfig frontend.Config - maxMessageSize int - defaultLimits validation.Limits - ) - flagext.RegisterFlags(&serverConfig, &frontendConfig, &defaultLimits) - flag.IntVar(&maxMessageSize, "query-frontend.max-recv-message-size-bytes", 1024*1024*64, "Limit on the size of a grpc message this server can receive.") - flag.Parse() - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("query-frontend") - defer trace.Close() - - util.InitLogger(&serverConfig) - - serverConfig.GRPCOptions = append(serverConfig.GRPCOptions, grpc.MaxRecvMsgSize(maxMessageSize)) - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - limits, err := validation.NewOverrides(defaultLimits) - if err != nil { - level.Error(util.Logger).Log("msg", "failed to initialise limits", "err", err) - os.Exit(1) - } - - f, err := frontend.New(frontendConfig, util.Logger, limits) - util.CheckFatal("initializing frontend", err) - defer f.Close() - - frontend.RegisterFrontendServer(server.GRPC, f) - server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(f.Handler())) - server.Run() -} diff --git a/cmd/ruler/Dockerfile b/cmd/ruler/Dockerfile deleted file mode 100644 index 17eb5497c08..00000000000 --- a/cmd/ruler/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY ruler /bin/ruler -COPY .migrations /migrations/ -EXPOSE 80 -ENTRYPOINT [ "/bin/ruler" ] - -ARG revision -LABEL org.opencontainers.image.title="ruler" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/ruler" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go deleted file mode 100644 index cb24eded2b7..00000000000 --- a/cmd/ruler/main.go +++ /dev/null @@ -1,100 +0,0 @@ -package main - -import ( - "flag" - - "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/cortexproject/cortex/pkg/distributor" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/querier" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/ruler" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - ringConfig ring.Config - distributorConfig distributor.Config - clientConfig client.Config - limits validation.Limits - - rulerConfig ruler.Config - chunkStoreConfig chunk.StoreConfig - schemaConfig chunk.SchemaConfig - storageConfig storage.Config - configStoreConfig ruler.ConfigStoreConfig - querierConfig querier.Config - ) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ruler") - defer trace.Close() - - flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, - &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, - &querierConfig) - flag.Parse() - - util.InitLogger(&serverConfig) - - overrides, err := validation.NewOverrides(limits) - util.CheckFatal("initializing overrides", err) - chunkStore, err := storage.NewStore(storageConfig, chunkStoreConfig, schemaConfig, overrides) - util.CheckFatal("", err) - defer chunkStore.Stop() - - r, err := ring.New(ringConfig) - util.CheckFatal("initializing ring", err) - prometheus.MustRegister(r) - defer r.Stop() - - dist, err := distributor.New(distributorConfig, clientConfig, overrides, r) - util.CheckFatal("initializing distributor", err) - defer dist.Stop() - - querierConfig.MaxConcurrent = rulerConfig.NumWorkers - querierConfig.Timeout = rulerConfig.GroupTimeout - queryable, engine := querier.New(querierConfig, dist, chunkStore) - rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist) - util.CheckFatal("initializing ruler", err) - defer rlr.Stop() - - rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) - util.CheckFatal("initializing rules API", err) - - rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) - util.CheckFatal("initializing ruler server", err) - defer rulerServer.Stop() - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - // Only serve the API for setting & getting rules configs if we're not - // serving configs from the configs API. Allows for smoother - // migration. See https://github.com/cortexproject/cortex/issues/619 - if configStoreConfig.ConfigsAPIURL.URL == nil { - a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) - util.CheckFatal("initializing public rules API", err) - a.RegisterRoutes(server.HTTP) - } - - server.HTTP.Handle("/ring", r) - server.Run() -} diff --git a/cmd/table-manager/Dockerfile b/cmd/table-manager/Dockerfile deleted file mode 100644 index 827adce7bac..00000000000 --- a/cmd/table-manager/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.8 -RUN apk add --no-cache ca-certificates -COPY table-manager /bin/table-manager -EXPOSE 9094 -ENTRYPOINT [ "/bin/table-manager" ] - -ARG revision -LABEL org.opencontainers.image.title="table-manager" \ - org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/cmd/table-manager" \ - org.opencontainers.image.revision="${revision}" diff --git a/cmd/table-manager/main.go b/cmd/table-manager/main.go deleted file mode 100644 index c974e9da5ac..00000000000 --- a/cmd/table-manager/main.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "flag" - "os" - - "github.com/go-kit/kit/log/level" - "google.golang.org/grpc" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/cortexproject/cortex/pkg/ingester" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" -) - -func main() { - var ( - serverConfig = server.Config{ - MetricsNamespace: "cortex", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - - ingesterConfig ingester.Config - storageConfig storage.Config - schemaConfig chunk.SchemaConfig - tbmConfig chunk.TableManagerConfig - ) - - // Setting the environment variable JAEGER_AGENT_HOST enables tracing - trace := tracing.NewFromEnv("ingester") - defer trace.Close() - - flagext.RegisterFlags(&ingesterConfig, &serverConfig, &storageConfig, &schemaConfig, &tbmConfig) - flag.Parse() - - util.InitLogger(&serverConfig) - - err := schemaConfig.Load() - util.CheckFatal("loading schema config", err) - // Assume the newest config is the one to use - lastConfig := &schemaConfig.Configs[len(schemaConfig.Configs)-1] - - if (tbmConfig.ChunkTables.WriteScale.Enabled || - tbmConfig.IndexTables.WriteScale.Enabled || - tbmConfig.ChunkTables.InactiveWriteScale.Enabled || - tbmConfig.IndexTables.InactiveWriteScale.Enabled || - tbmConfig.ChunkTables.ReadScale.Enabled || - tbmConfig.IndexTables.ReadScale.Enabled || - tbmConfig.ChunkTables.InactiveReadScale.Enabled || - tbmConfig.IndexTables.InactiveReadScale.Enabled) && - (storageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && storageConfig.AWSStorageConfig.Metrics.URL == "") { - level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided") - os.Exit(1) - } - - tableClient, err := storage.NewTableClient(lastConfig.IndexType, storageConfig) - util.CheckFatal("initializing table client", err) - - tableManager, err := chunk.NewTableManager(tbmConfig, schemaConfig, ingesterConfig.MaxChunkAge, tableClient) - util.CheckFatal("initializing table manager", err) - tableManager.Start() - defer tableManager.Stop() - - server, err := server.New(serverConfig) - util.CheckFatal("initializing server", err) - defer server.Shutdown() - - server.Run() -} diff --git a/docs/getting_started.md b/docs/getting_started.md new file mode 100644 index 00000000000..6b74ea630c4 --- /dev/null +++ b/docs/getting_started.md @@ -0,0 +1,93 @@ +# Getting Started + +Cortex can be runs as a single binary or as multiple independent microservices. +The single-binary mode is easier to deploy and is aimed mainly at users wanting to try out Cortex or develop on it. +The microservices mode is intended for production usage, as it allows you to independently scale different services and isolate failures. +This document will focus on single-process Cortex. +See [the architecture doc](architecture.md) For more information about the microservices. + +Separately from single process vs microservices decision, Cortex can be configured to use local storage or cloud storage (DynamoDB, Bigtable, Cassandra, S3, GCS etc). +This document will focus on using local storage. +Local storage is explicitly not production ready at this time. +Cortex can also make use of external memcacheds for caching and although these are not mandatory, they should be used in production. + +## Single instance, single process. + +For simplicity & to get started, we'll run it as a single process with no dependencies: + +```sh +$ make protos # Build all the protobuffer definitions. +$ go build ./cmd/cortex +$ ./cortex -config.file=./cmd/cortex/single-process-config.yaml +``` + +This starts a single Cortex node storing chunks and index to your local filesystem in `/tmp/cortex`. +It is not intended for production use. + +Add the following to your prometheus config: + +```yaml +remote_write: +- url: http://localhost:9009/api/prom/push +``` + +And start Prometheus with that config file: + +```sh +$ ./prometheus --config.file=./documentation/examples/prometheus.yml +``` + +Your Prometheus instance will now start pushing data to Cortex. To query that data, start a Grafana instance: + +```sh +$ docker run -p 3000:3000 grafana/grafana +``` + +In [the Grafana UI](http://localhost:3000) (username/password admin/admin), add a Prometheus datasource for Cortex (`http://host.docker.internal:9009/api/prom`). + +## Horizontally scale out + +Next we're going to show how you can run a scale out Cortex cluster using Docker. +We'll need: +- A built Cortex image. +- A Docker network to put these containers on so they can resolve each other by name. +- A single node Consul instance to coordinate the Cortex cluster. + + +```sh +$ make ./cmd/cortex/.uptodate +$ docker network create cortex +$ docker run -d --name=consul --network=cortex -e CONSUL_BIND_INTERFACE=eth0 consul +``` + +Next we'll run a couple of Cortex instances pointed at that Consul. You'll note with Cortex configuration can be specified in either a config file or overridden on the command line. See [the arguments documentation](arguments.md) for more information about Cortex configuration options. + +```sh +$ docker run -d --name=cortex1 --network=cortex -p 9001:9009 quay.io/cortexproject/cortex -config.file=/etc/single-process-config.yaml -ring.store=consul -consul.hostname=consul:8500 +$ docker run -d --name=cortex2 --network=cortex -p 9002:9009 quay.io/cortexproject/cortex -config.file=/etc/single-process-config.yaml -ring.store=consul -consul.hostname=consul:8500 +``` + +If you go to http://localhost:9001/ring (or http://localhost:9002/ring) you should see both Cortex nodes join the ring. + +To demonstrate the correct operation of Cortex clustering, we'll send samples to one of the instances +and queries to another. In production, you'd want to load balance both pushes and queries evenly among +all the nodes. + +Point Prometheus at the first: + +```yaml +remote_write: +- url: http://localhost:9001/api/prom/push +``` + +```sh +$ ./prometheus --config.file=./documentation/examples/prometheus.yml +``` + +And Grafana at the second: + +```sh +$ docker run -d --network=cortex -p 3000:3000 grafana/grafana +``` + +In [the Grafana UI](http://localhost:3000) (username/password admin/admin), add a Prometheus datasource for Cortex (`http://cortex2:9009/api/prom`). \ No newline at end of file diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 24fef2c3354..7f71cf69b9b 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -19,14 +19,13 @@ import ( "github.com/go-kit/kit/log/level" amconfig "github.com/prometheus/alertmanager/config" "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/user" + "github.com/weaveworks/mesh" "github.com/cortexproject/cortex/pkg/configs" configs_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/instrument" - "github.com/weaveworks/common/user" - "github.com/weaveworks/mesh" ) var backoffConfig = util.BackoffConfig{ @@ -93,15 +92,9 @@ const ( var ( totalConfigs = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", - Name: "configs", + Name: "alertmanager_configs_total", Help: "How many configs the multitenant alertmanager knows about.", }) - configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "configs_request_duration_seconds", - Help: "Time spent requesting configs.", - Buckets: prometheus.DefBuckets, - }, []string{"operation", "status_code"})) totalPeers = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "mesh_peers", @@ -112,7 +105,6 @@ var ( ) func init() { - configsRequestDuration.Register() prometheus.MustRegister(totalConfigs) prometheus.MustRegister(totalPeers) statusTemplate = template.Must(template.New("statusPage").Funcs(map[string]interface{}{ @@ -176,12 +168,10 @@ func counts(counts map[string]int, keys []string) string { // MultitenantAlertmanagerConfig is the configuration for a multitenant Alertmanager. type MultitenantAlertmanagerConfig struct { - DataDir string - Retention time.Duration - ExternalURL flagext.URLValue - ConfigsAPIURL flagext.URLValue - PollInterval time.Duration - ClientTimeout time.Duration + DataDir string + Retention time.Duration + ExternalURL flagext.URLValue + PollInterval time.Duration MeshListenAddr string MeshHWAddr string @@ -204,12 +194,10 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { flag.Var(&cfg.ExternalURL, "alertmanager.web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.") - flag.Var(&cfg.ConfigsAPIURL, "alertmanager.configs.url", "URL of configs API server.") flag.StringVar(&cfg.FallbackConfigFile, "alertmanager.configs.fallback", "", "Filename of fallback config to use if none specified for instance.") flag.StringVar(&cfg.AutoWebhookRoot, "alertmanager.configs.auto-webhook-root", "", "Root of URL to generate if config is "+autoWebhookURL) flag.StringVar(&cfg.AutoSlackRoot, "alertmanager.configs.auto-slack-root", "", "Root of URL to generate if config is "+autoSlackURL) flag.DurationVar(&cfg.PollInterval, "alertmanager.configs.poll-interval", 15*time.Second, "How frequently to poll Cortex configs") - flag.DurationVar(&cfg.ClientTimeout, "alertmanager.configs.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") flag.StringVar(&cfg.MeshListenAddr, "alertmanager.mesh.listen-address", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "Mesh listen address") flag.StringVar(&cfg.MeshHWAddr, "alertmanager.mesh.hardware-address", mustHardwareAddr(), "MAC address, i.e. Mesh peer ID") @@ -226,7 +214,7 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { type MultitenantAlertmanager struct { cfg *MultitenantAlertmanagerConfig - configsAPI configs_client.AlertManagerConfigsAPI + configsAPI configs_client.Client // The fallback config is stored as a string and parsed every time it's needed // because we mutate the parsed results and don't want those changes to take @@ -250,21 +238,20 @@ type MultitenantAlertmanager struct { } // NewMultitenantAlertmanager creates a new MultitenantAlertmanager. -func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig) (*MultitenantAlertmanager, error) { +func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg configs_client.Config) (*MultitenantAlertmanager, error) { err := os.MkdirAll(cfg.DataDir, 0777) if err != nil { return nil, fmt.Errorf("unable to create Alertmanager data directory %q: %s", cfg.DataDir, err) } - mrouter := initMesh(cfg.MeshListenAddr, cfg.MeshHWAddr, cfg.MeshNickname, cfg.MeshPassword) + configsAPI, err := configs_client.New(cfgCfg) + if err != nil { + return nil, err + } + mrouter := initMesh(cfg.MeshListenAddr, cfg.MeshHWAddr, cfg.MeshNickname, cfg.MeshPassword) mrouter.Start() - configsAPI := configs_client.AlertManagerConfigsAPI{ - URL: cfg.ConfigsAPIURL.URL, - Timeout: cfg.ClientTimeout, - } - var fallbackConfig []byte if cfg.FallbackConfigFile != "" { fallbackConfig, err = ioutil.ReadFile(cfg.FallbackConfigFile) @@ -364,12 +351,7 @@ func (am *MultitenantAlertmanager) updateConfigs(now time.Time) error { // poll the configuration server. Not re-entrant. func (am *MultitenantAlertmanager) poll() (map[string]configs.View, error) { configID := am.latestConfig - var cfgs *configs_client.ConfigsResponse - err := instrument.CollectedRequest(context.Background(), "Configs.GetOrgConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - var err error - cfgs, err = am.configsAPI.GetConfigs(configID) - return err - }) + cfgs, err := am.configsAPI.GetAlerts(configID) if err != nil { level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: configs server poll failed", "err", err) return nil, err @@ -471,7 +453,7 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi return fmt.Errorf("unable to load fallback configuration for %v: %v", userID, err) } } else { - amConfig, err = configs_client.AlertmanagerConfigFromConfig(config) + amConfig, err = alertmanagerConfigFromConfig(config) if err != nil && hasExisting { // XXX: This means that if a user has a working configuration and // they submit a broken one, we'll keep processing the last known @@ -506,6 +488,15 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi return nil } +// alertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration. +func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) { + cfg, err := amconfig.Load(c.AlertmanagerConfig) + if err != nil { + return nil, fmt.Errorf("error parsing Alertmanager config: %s", err) + } + return cfg, nil +} + func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amconfig.Config) (*Alertmanager, error) { newAM, err := New(&Config{ UserID: userID, diff --git a/pkg/chunk/aws/dynamodb_table_client_test.go b/pkg/chunk/aws/dynamodb_table_client_test.go index fe1c69b4baa..68f5f8d407d 100644 --- a/pkg/chunk/aws/dynamodb_table_client_test.go +++ b/pkg/chunk/aws/dynamodb_table_client_test.go @@ -154,7 +154,7 @@ func TestTableManagerAutoScaling(t *testing.T) { }, { IndexType: "aws-dynamo", - From: model.TimeFromUnix(0), + From: chunk.DayTime{Time: model.TimeFromUnix(0)}, IndexTables: fixturePeriodicTableConfig(tablePrefix), ChunkTables: fixturePeriodicTableConfig(chunkTablePrefix), }}, diff --git a/pkg/chunk/fixtures.go b/pkg/chunk/fixtures.go index 9c6898d23fa..abc07327975 100644 --- a/pkg/chunk/fixtures.go +++ b/pkg/chunk/fixtures.go @@ -33,7 +33,7 @@ func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig { Configs: []PeriodConfig{{ IndexType: store, Schema: schema, - From: from, + From: DayTime{from}, ChunkTables: PeriodicTableConfig{ Prefix: "cortex", Period: 7 * 24 * time.Hour, diff --git a/pkg/chunk/local/fixtures.go b/pkg/chunk/local/fixtures.go index baf338d70c4..323a51ae8c1 100644 --- a/pkg/chunk/local/fixtures.go +++ b/pkg/chunk/local/fixtures.go @@ -51,7 +51,7 @@ func (f *fixture) Clients() ( schemaConfig = chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{{ IndexType: "boltdb", - From: model.Now(), + From: chunk.DayTime{Time: model.Now()}, ChunkTables: chunk.PeriodicTableConfig{ Prefix: "chunks", Period: 10 * time.Minute, diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index aa522284852..40b499a9fa4 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -25,16 +25,40 @@ const ( // PeriodConfig defines the schema and tables to use for a period of time type PeriodConfig struct { - From model.Time `yaml:"-"` // used when working with config - FromStr string `yaml:"from,omitempty"` // used when loading from yaml - IndexType string `yaml:"store"` // type of index client to use. - ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. + From DayTime `yaml:"from"` // used when working with config + IndexType string `yaml:"store"` // type of index client to use. + ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. Schema string `yaml:"schema"` IndexTables PeriodicTableConfig `yaml:"index"` ChunkTables PeriodicTableConfig `yaml:"chunks,omitempty"` RowShards uint32 `yaml:"row_shards"` } +// DayTime is a model.Time what holds day-aligned values, and marshals to/from +// YAML in YYYY-MM-DD format. +type DayTime struct { + model.Time +} + +// MarshalYAML implements yaml.Marshaller. +func (d DayTime) MarshalYAML() (interface{}, error) { + return d.Time.Time().Format("2006-01-02"), nil +} + +// UnmarshalYAML implements yaml.Unmarshaller. +func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error { + var from string + if err := unmarshal(&from); err != nil { + return err + } + t, err := time.Parse("2006-01-02", from) + if err != nil { + return err + } + d.Time = model.TimeFromUnix(t.Unix()) + return nil +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` @@ -98,8 +122,7 @@ func (cfg *SchemaConfig) translate() error { add := func(t string, f model.Time) { cfg.Configs = append(cfg.Configs, PeriodConfig{ - From: f, - FromStr: f.Time().Format("2006-01-02"), + From: DayTime{f}, Schema: t, IndexType: cfg.legacy.StorageClient, IndexTables: PeriodicTableConfig{ @@ -153,13 +176,13 @@ func (cfg *SchemaConfig) translate() error { // entries if necessary so there is an entry starting at t func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) { for i := 0; i < len(cfg.Configs); i++ { - if t > cfg.Configs[i].From && - (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) { + if t > cfg.Configs[i].From.Time && + (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) { // Split the i'th entry by duplicating then overwriting the From time cfg.Configs = append(cfg.Configs[:i+1], cfg.Configs[i:]...) - cfg.Configs[i+1].From = t + cfg.Configs[i+1].From = DayTime{t} } - if cfg.Configs[i].From >= t { + if cfg.Configs[i].From.Time >= t { f(&cfg.Configs[i]) } } @@ -211,25 +234,11 @@ func (cfg *SchemaConfig) Load() error { decoder := yaml.NewDecoder(f) decoder.SetStrict(true) - if err := decoder.Decode(&cfg); err != nil { - return err - } - for i := range cfg.Configs { - t, err := time.Parse("2006-01-02", cfg.Configs[i].FromStr) - if err != nil { - return err - } - cfg.Configs[i].From = model.TimeFromUnix(t.Unix()) - } - - return nil + return decoder.Decode(&cfg) } // PrintYaml dumps the yaml to stdout, to aid in migration func (cfg SchemaConfig) PrintYaml() { - for i := range cfg.Configs { - cfg.Configs[i].FromStr = cfg.Configs[i].From.Time().Format("2006-01-02") - } encoder := yaml.NewEncoder(os.Stdout) encoder.Encode(cfg) } @@ -425,7 +434,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr // ChunkTableFor calculates the chunk table shard for a given point in time. func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { for i := range cfg.Configs { - if t >= cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) { + if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) { return cfg.Configs[i].ChunkTables.TableFor(t), nil } } diff --git a/pkg/chunk/schema_config_test.go b/pkg/chunk/schema_config_test.go index 00f31013608..6e401ec3bd1 100644 --- a/pkg/chunk/schema_config_test.go +++ b/pkg/chunk/schema_config_test.go @@ -187,7 +187,7 @@ func TestChunkTableFor(t *testing.T) { periodConfigs := []PeriodConfig{ { - FromStr: "1970-01-01", + From: MustParseDayTime("1970-01-01"), IndexTables: PeriodicTableConfig{ Prefix: "index_1_", Period: tablePeriod, @@ -198,7 +198,7 @@ func TestChunkTableFor(t *testing.T) { }, }, { - FromStr: "2019-01-02", + From: MustParseDayTime("2019-01-02"), IndexTables: PeriodicTableConfig{ Prefix: "index_2_", Period: tablePeriod, @@ -209,7 +209,7 @@ func TestChunkTableFor(t *testing.T) { }, }, { - FromStr: "2019-03-06", + From: MustParseDayTime("2019-03-06"), IndexTables: PeriodicTableConfig{ Prefix: "index_3_", Period: tablePeriod, @@ -221,13 +221,6 @@ func TestChunkTableFor(t *testing.T) { }, } - for i, cfg := range periodConfigs { - ts, err := time.Parse("2006-01-02", cfg.FromStr) - require.NoError(t, err) - - periodConfigs[i].From = model.TimeFromUnix(ts.Unix()) - } - schemaCfg := SchemaConfig{ Configs: periodConfigs, } @@ -276,3 +269,11 @@ func TestChunkTableFor(t *testing.T) { require.Equal(t, tc.chunkTable, table) } } + +func MustParseDayTime(s string) DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return DayTime{model.TimeFromUnix(t.Unix())} +} diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 2ea0f4ad5f4..0b6737d94d0 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -21,11 +21,11 @@ func TestFactoryStop(t *testing.T) { flagext.DefaultValues(&cfg, &storeConfig, &schemaConfig, &defaults) schemaConfig.Configs = []chunk.PeriodConfig{ { - From: model.Time(0), + From: chunk.DayTime{Time: model.Time(0)}, IndexType: "inmemory", }, { - From: model.Time(1), + From: chunk.DayTime{Time: model.Time(1)}, IndexType: "inmemory", }, } diff --git a/pkg/chunk/table_manager.go b/pkg/chunk/table_manager.go index b32034001ff..9bbe325da24 100644 --- a/pkg/chunk/table_manager.go +++ b/pkg/chunk/table_manager.go @@ -195,7 +195,7 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { result := []TableDesc{} for i, config := range m.schemaCfg.Configs { - if config.From.Time().After(mtime.Now()) { + if config.From.Time.Time().After(mtime.Now()) { continue } if config.IndexTables.Period == 0 { // non-periodic table @@ -240,18 +240,18 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { } else { endTime := mtime.Now().Add(m.cfg.CreationGracePeriod) if i+1 < len(m.schemaCfg.Configs) { - nextFrom := m.schemaCfg.Configs[i+1].From.Time() + nextFrom := m.schemaCfg.Configs[i+1].From.Time.Time() if endTime.After(nextFrom) { endTime = nextFrom } } endModelTime := model.TimeFromUnix(endTime.Unix()) result = append(result, config.IndexTables.periodicTables( - config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, + config.From.Time, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) if config.ChunkTables.Prefix != "" { result = append(result, config.ChunkTables.periodicTables( - config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, + config.From.Time, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) } } diff --git a/pkg/chunk/table_manager_test.go b/pkg/chunk/table_manager_test.go index 6cbcc416642..4bd82d24808 100644 --- a/pkg/chunk/table_manager_test.go +++ b/pkg/chunk/table_manager_test.go @@ -125,13 +125,13 @@ func TestTableManager(t *testing.T) { cfg := SchemaConfig{ Configs: []PeriodConfig{ { - From: model.TimeFromUnix(baseTableStart.Unix()), + From: DayTime{model.TimeFromUnix(baseTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: baseTableName, }, }, { - From: model.TimeFromUnix(weeklyTableStart.Unix()), + From: DayTime{model.TimeFromUnix(weeklyTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: tablePrefix, Period: tablePeriod, @@ -143,7 +143,7 @@ func TestTableManager(t *testing.T) { }, }, { - From: model.TimeFromUnix(weeklyTable2Start.Unix()), + From: DayTime{model.TimeFromUnix(weeklyTable2Start.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: table2Prefix, Period: tablePeriod, @@ -308,13 +308,13 @@ func TestTableManagerAutoscaleInactiveOnly(t *testing.T) { cfg := SchemaConfig{ Configs: []PeriodConfig{ { - From: model.TimeFromUnix(baseTableStart.Unix()), + From: DayTime{model.TimeFromUnix(baseTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: baseTableName, }, }, { - From: model.TimeFromUnix(weeklyTableStart.Unix()), + From: DayTime{model.TimeFromUnix(weeklyTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: tablePrefix, Period: tablePeriod, @@ -394,13 +394,13 @@ func TestTableManagerDynamicIOModeInactiveOnly(t *testing.T) { cfg := SchemaConfig{ Configs: []PeriodConfig{ { - From: model.TimeFromUnix(baseTableStart.Unix()), + From: DayTime{model.TimeFromUnix(baseTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: baseTableName, }, }, { - From: model.TimeFromUnix(weeklyTableStart.Unix()), + From: DayTime{model.TimeFromUnix(weeklyTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: tablePrefix, Period: tablePeriod, @@ -561,7 +561,7 @@ func TestTableManagerRetentionOnly(t *testing.T) { cfg := SchemaConfig{ Configs: []PeriodConfig{ { - From: model.TimeFromUnix(baseTableStart.Unix()), + From: DayTime{model.TimeFromUnix(baseTableStart.Unix())}, IndexTables: PeriodicTableConfig{ Prefix: tablePrefix, Period: tablePeriod, @@ -673,7 +673,7 @@ func TestTableManagerRetentionOnly(t *testing.T) { // Verify that with a retention period of zero no tables outside the configs 'From' range are removed tableManager.cfg.RetentionPeriod = 0 - tableManager.schemaCfg.Configs[0].From = model.TimeFromUnix(baseTableStart.Add(tablePeriod).Unix()) + tableManager.schemaCfg.Configs[0].From = DayTime{model.TimeFromUnix(baseTableStart.Add(tablePeriod).Unix())} // Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) diff --git a/pkg/configs/client/client.go b/pkg/configs/client/client.go new file mode 100644 index 00000000000..288ff9c04d1 --- /dev/null +++ b/pkg/configs/client/client.go @@ -0,0 +1,164 @@ +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/configs/db" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" +) + +// Client is what the ruler and altermanger needs from a config store to process rules. +type Client interface { + // GetRules returns all Cortex configurations from a configs API server + // that have been updated after the given configs.ID was last updated. + GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) + + // GetAlerts fetches all the alerts that have changes since since. + GetAlerts(since configs.ID) (*ConfigsResponse, error) +} + +// New creates a new ConfigClient. +func New(cfg Config) (Client, error) { + // All of this falderal is to allow for a smooth transition away from + // using the configs server and toward directly connecting to the database. + // See https://github.com/cortexproject/cortex/issues/619 + if cfg.ConfigsAPIURL.URL != nil { + return instrumented{ + next: configsClient{ + URL: cfg.ConfigsAPIURL.URL, + Timeout: cfg.ClientTimeout, + }, + }, nil + } + + db, err := db.New(cfg.DBConfig) + if err != nil { + return nil, err + } + return instrumented{ + next: dbStore{ + db: db, + }, + }, nil +} + +// configsClient allows retrieving recording and alerting rules from the configs server. +type configsClient struct { + URL *url.URL + Timeout time.Duration +} + +// GetRules implements ConfigClient. +func (c configsClient) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + suffix := "" + if since != 0 { + suffix = fmt.Sprintf("?since=%d", since) + } + endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) + response, err := doRequest(endpoint, c.Timeout, since) + if err != nil { + return nil, err + } + configs := map[string]configs.VersionedRulesConfig{} + for id, view := range response.Configs { + cfg := view.GetVersionedRulesConfig() + if cfg != nil { + configs[id] = *cfg + } + } + return configs, nil +} + +// GetAlerts implements ConfigClient. +func (c configsClient) GetAlerts(since configs.ID) (*ConfigsResponse, error) { + suffix := "" + if since != 0 { + suffix = fmt.Sprintf("?since=%d", since) + } + endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix) + return doRequest(endpoint, c.Timeout, since) +} + +func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, err + } + + client := &http.Client{Timeout: timeout} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Invalid response from configs server: %v", resp.StatusCode) + } + + var config ConfigsResponse + if err := json.NewDecoder(resp.Body).Decode(&config); err != nil { + level.Error(util.Logger).Log("msg", "configs: couldn't decode JSON body", "err", err) + return nil, err + } + + config.since = since + return &config, nil +} + +type dbStore struct { + db db.DB +} + +// GetRules implements ConfigClient. +func (d dbStore) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + if since == 0 { + return d.db.GetAllRulesConfigs() + } + return d.db.GetRulesConfigs(since) +} + +// GetAlerts implements ConfigClient. +func (d dbStore) GetAlerts(since configs.ID) (*ConfigsResponse, error) { + var resp map[string]configs.View + var err error + if since == 0 { + resp, err = d.db.GetAllConfigs() + + } + resp, err = d.db.GetConfigs(since) + if err != nil { + return nil, err + } + + return &ConfigsResponse{ + since: since, + Configs: resp, + }, nil +} + +// ConfigsResponse is a response from server for GetConfigs. +type ConfigsResponse struct { + // The version since which these configs were changed + since configs.ID + + // Configs maps user ID to their latest configs.View. + Configs map[string]configs.View `json:"configs"` +} + +// GetLatestConfigID returns the last config ID from a set of configs. +func (c ConfigsResponse) GetLatestConfigID() configs.ID { + latest := c.since + for _, config := range c.Configs { + if config.ID > latest { + latest = config.ID + } + } + return latest +} diff --git a/pkg/configs/client/config.go b/pkg/configs/client/config.go new file mode 100644 index 00000000000..292de3ee8c6 --- /dev/null +++ b/pkg/configs/client/config.go @@ -0,0 +1,70 @@ +package client + +import ( + "context" + "flag" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/configs/db" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +// Config says where we can find the ruler configs. +type Config struct { + DBConfig db.Config + + // DEPRECATED + ConfigsAPIURL flagext.URLValue + + // DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud + // configs service. + ClientTimeout time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.DBConfig.RegisterFlags(f) + f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.") + f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") + flag.Var(&cfg.ConfigsAPIURL, "alertmanager.configs.url", "URL of configs API server.") + flag.DurationVar(&cfg.ClientTimeout, "alertmanager.configs.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") +} + +var configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "configs_request_duration_seconds", + Help: "Time spent requesting configs.", + Buckets: prometheus.DefBuckets, +}, []string{"operation", "status_code"})) + +func init() { + configsRequestDuration.Register() +} + +type instrumented struct { + next Client +} + +func (i instrumented) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + var cfgs map[string]configs.VersionedRulesConfig + err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { + var err error + cfgs, err = i.next.GetRules(since) // Warning: this will produce an incorrect result if the configID ever overflows + return err + }) + return cfgs, err +} + +func (i instrumented) GetAlerts(since configs.ID) (*ConfigsResponse, error) { + var cfgs *ConfigsResponse + err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { + var err error + cfgs, err = i.next.GetAlerts(since) // Warning: this will produce an incorrect result if the configID ever overflows + return err + }) + return cfgs, err +} diff --git a/pkg/configs/client/configs.go b/pkg/configs/client/configs.go deleted file mode 100644 index 9a9f6994f24..00000000000 --- a/pkg/configs/client/configs.go +++ /dev/null @@ -1,93 +0,0 @@ -package client - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "time" - - "github.com/cortexproject/cortex/pkg/configs" - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/alertmanager/config" -) - -// TODO: Extract configs client logic into go client library (ala users) - -// ConfigsResponse is a response from server for GetConfigs. -type ConfigsResponse struct { - // The version since which these configs were changed - since configs.ID - // Configs maps user ID to their latest configs.View. - Configs map[string]configs.View `json:"configs"` -} - -func configsFromJSON(body io.Reader) (*ConfigsResponse, error) { - var configs ConfigsResponse - if err := json.NewDecoder(body).Decode(&configs); err != nil { - level.Error(util.Logger).Log("msg", "configs: couldn't decode JSON body", "err", err) - return nil, err - } - return &configs, nil -} - -// GetLatestConfigID returns the last config ID from a set of configs. -func (c ConfigsResponse) GetLatestConfigID() configs.ID { - latest := c.since - for _, config := range c.Configs { - if config.ID > latest { - latest = config.ID - } - } - return latest -} - -// AlertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration. -func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) { - cfg, err := config.Load(c.AlertmanagerConfig) - if err != nil { - return nil, fmt.Errorf("error parsing Alertmanager config: %s", err) - } - return cfg, nil -} - -// GetConfigs gets configurations from the configs server. -func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - return nil, err - } - client := &http.Client{Timeout: timeout} - res, err := client.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("Invalid response from configs server: %v", res.StatusCode) - } - resp, err := configsFromJSON(res.Body) - if err == nil { - resp.since = since - } - return resp, err -} - -// AlertManagerConfigsAPI allows retrieving alert configs. -type AlertManagerConfigsAPI struct { - URL *url.URL - Timeout time.Duration -} - -// GetConfigs returns all Cortex configurations from a configs API server -// that have been updated after the given configs.ID was last updated. -func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) { - suffix := "" - if since != 0 { - suffix = fmt.Sprintf("?since=%d", since) - } - endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix) - return GetConfigs(endpoint, c.Timeout, since) -} diff --git a/pkg/configs/client/configs_test.go b/pkg/configs/client/configs_test.go index 59ff2395356..af7acf25b45 100644 --- a/pkg/configs/client/configs_test.go +++ b/pkg/configs/client/configs_test.go @@ -1,16 +1,18 @@ package client import ( - "strings" + "net/http" + "net/http/httptest" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/configs" "github.com/stretchr/testify/assert" ) -func TestJSONDecoding(t *testing.T) { - observed, err := configsFromJSON(strings.NewReader(` -{ +var response = `{ "configs": { "2": { "id": 1, @@ -23,8 +25,18 @@ func TestJSONDecoding(t *testing.T) { } } } -`)) +` + +func TestDoRequest(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte(response)) + require.NoError(t, err) + })) + defer server.Close() + + resp, err := doRequest(server.URL, 1*time.Second, 0) assert.Nil(t, err) + expected := ConfigsResponse{Configs: map[string]configs.View{ "2": { ID: 1, @@ -38,5 +50,5 @@ func TestJSONDecoding(t *testing.T) { }, }, }} - assert.Equal(t, &expected, observed) + assert.Equal(t, &expected, resp) } diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index 3158ae91e3d..bb31e30dfeb 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -16,6 +16,9 @@ type Config struct { URI string MigrationsDir string PasswordFile string + + // Allow injection of mock DBs for unit testing. + Mock DB } // RegisterFlags adds the flags required to configure this to the given FlagSet. @@ -25,10 +28,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.StringVar(&cfg.PasswordFile, "database.password-file", "", "File containing password (username goes in URI)") } -// RulesDB has ruler-specific DB interfaces. -type RulesDB interface { +// DB is the interface for the database. +type DB interface { // GetRulesConfig gets the user's ruler config GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) + // SetRulesConfig does a compare-and-swap (CAS) on the user's rules config. // `oldConfig` must precisely match the current config in order to change the config to `newConfig`. // Will return `true` if the config was updated, `false` otherwise. @@ -36,14 +40,10 @@ type RulesDB interface { // GetAllRulesConfigs gets all of the ruler configs GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) + // GetRulesConfigs gets all of the configs that have been added or have // changed since the provided config. GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) -} - -// DB is the interface for the database. -type DB interface { - RulesDB GetConfig(userID string) (configs.View, error) SetConfig(userID string, cfg configs.Config) error @@ -59,6 +59,10 @@ type DB interface { // New creates a new database. func New(cfg Config) (DB, error) { + if cfg.Mock != nil { + return cfg.Mock, nil + } + u, err := url.Parse(cfg.URI) if err != nil { return nil, err @@ -89,12 +93,3 @@ func New(cfg Config) (DB, error) { } return traced{timed{d}}, nil } - -// NewRulesDB creates a new rules config database. -func NewRulesDB(cfg Config) (RulesDB, error) { - db, err := New(cfg) - if err != nil { - return nil, err - } - return db, err -} diff --git a/pkg/configs/db/dbtest/integration.go b/pkg/configs/db/dbtest/integration.go index 58631fc343d..71cefd0d315 100644 --- a/pkg/configs/db/dbtest/integration.go +++ b/pkg/configs/db/dbtest/integration.go @@ -16,16 +16,29 @@ import ( var ( done chan error + dbAddr string + migrationsDir string errRollback = fmt.Errorf("Rolling back test data") - migrationsDir = os.Getenv("MIGRATIONS_DIR") ) +func init() { + dbAddr = os.Getenv("DB_ADDR") + if dbAddr == "" { + dbAddr = "127.0.0.1" + } + + migrationsDir = os.Getenv("MIGRATIONS_DIR") + if migrationsDir == "" { + migrationsDir = "/migrations" + } +} + // Setup sets up stuff for testing, creating a new database func Setup(t *testing.T) db.DB { require.NoError(t, logging.Setup("debug")) // Don't use db.MustNew, here so we can do a transaction around the whole test, to rollback. pg, err := postgres.New( - "postgres://postgres@127.0.0.1/configs_test?sslmode=disable", + fmt.Sprintf("postgres://postgres@%s/configs_test?sslmode=disable", dbAddr), migrationsDir, ) require.NoError(t, err) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go new file mode 100644 index 00000000000..6024836b3d3 --- /dev/null +++ b/pkg/cortex/cortex.go @@ -0,0 +1,292 @@ +package cortex + +import ( + "flag" + "fmt" + "net/http" + "os" + + "github.com/go-kit/kit/log/level" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + "google.golang.org/grpc" + "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/alertmanager" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/chunk/storage" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/configs/api" + config_client "github.com/cortexproject/cortex/pkg/configs/client" + "github.com/cortexproject/cortex/pkg/configs/db" + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/frontend" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ruler" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// The design pattern for Cortex is a series of config objects, which are +// registered for command line flags, and then a series of components that +// are instantiated and composed. Some rules of thumb: +// - Config types should only contain 'simple' types (ints, strings, urls etc). +// - Flag validation should be done by the flag; use a flag.Value where +// appropriate. +// - Config types should map 1:1 with a component type. +// - Config types should define flags with a common prefix. +// - It's fine to nest configs within configs, but this should match the +// nesting of components within components. +// - Limit as much is possible sharing of configuration between config types. +// Where necessary, use a pointer for this - avoid repetition. +// - Where a nesting of components its not obvious, it's fine to pass +// references to other components constructors to compose them. +// - First argument for a components constructor should be its matching config +// object. + +// Config is the root config for Cortex. +type Config struct { + Target moduleName `yaml:"target,omitempty"` + AuthEnabled bool `yaml:"auth_enabled,omitempty"` + PrintConfig bool `yaml:"-"` + + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + Storage storage.Config `yaml:"storage,omitempty"` + ChunkStore chunk.StoreConfig `yaml:"chunk_store,omitempty"` + Schema chunk.SchemaConfig `yaml:"schema,omitempty"` + LimitsConfig validation.Limits `yaml:"limits,omitempty"` + Prealloc client.PreallocConfig `yaml:"prealloc,omitempty"` + Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"` + Frontend frontend.Config `yaml:"frontend,omitempty"` + TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` + Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. + + Ruler ruler.Config `yaml:"ruler,omitempty"` + ConfigStore config_client.Config `yaml:"config_store,omitempty"` + Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"` +} + +// RegisterFlags registers flag. +func (c *Config) RegisterFlags(f *flag.FlagSet) { + c.Server.MetricsNamespace = "cortex" + c.Target = All + c.Server.ExcludeRequestInLog = true + f.Var(&c.Target, "target", "target module (default All)") + f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") + f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") + + c.Server.RegisterFlags(f) + c.Distributor.RegisterFlags(f) + c.Querier.RegisterFlags(f) + c.IngesterClient.RegisterFlags(f) + c.Ingester.RegisterFlags(f) + c.Storage.RegisterFlags(f) + c.ChunkStore.RegisterFlags(f) + c.Schema.RegisterFlags(f) + c.LimitsConfig.RegisterFlags(f) + c.Prealloc.RegisterFlags(f) + c.Worker.RegisterFlags(f) + c.Frontend.RegisterFlags(f) + c.TableManager.RegisterFlags(f) + c.Encoding.RegisterFlags(f) + + c.Ruler.RegisterFlags(f) + c.ConfigStore.RegisterFlags(f) + c.Alertmanager.RegisterFlags(f) + + // These don't seem to have a home. + flag.IntVar(&chunk_util.QueryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.") +} + +// Cortex is the root datastructure for Cortex. +type Cortex struct { + target moduleName + httpAuthMiddleware middleware.Interface + + server *server.Server + ring *ring.Ring + overrides *validation.Overrides + distributor *distributor.Distributor + ingester *ingester.Ingester + store chunk.Store + worker frontend.Worker + frontend *frontend.Frontend + tableManager *chunk.TableManager + + ruler *ruler.Ruler + rulerServer *ruler.Server + configAPI *api.API + configDB db.DB + alertmanager *alertmanager.MultitenantAlertmanager +} + +// New makes a new Cortex. +func New(cfg Config) (*Cortex, error) { + if cfg.PrintConfig { + if err := yaml.NewEncoder(os.Stdout).Encode(&cfg); err != nil { + fmt.Println("Error encoding config:", err) + } + os.Exit(0) + } + + cortex := &Cortex{ + target: cfg.Target, + } + + operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { + return r.URL.RequestURI() + }) + cfg.Server.HTTPMiddleware = []middleware.Interface{ + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) + }), + } + + cortex.setupAuthMiddleware(&cfg) + + if err := cortex.init(&cfg, cfg.Target); err != nil { + return nil, err + } + + return cortex, nil +} + +func (t *Cortex) setupAuthMiddleware(cfg *Config) { + if cfg.AuthEnabled { + cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + middleware.ServerUserHeaderInterceptor, + } + cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + switch info.FullMethod { + // Don't check auth header on TransferChunks, as we weren't originally + // sending it and this could cause transfers to fail on update. + // + // Also don't check auth /frontend.Frontend/Process, as this handles + // queries for multiple users. + case "/cortex.Ingester/TransferChunks", "/frontend.Frontend/Process": + return handler(srv, ss) + default: + return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) + } + }, + } + t.httpAuthMiddleware = middleware.AuthenticateUser + } else { + cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + fakeGRPCAuthUniaryMiddleware, + } + cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + fakeGRPCAuthStreamMiddleware, + } + t.httpAuthMiddleware = fakeHTTPAuthMiddleware + } +} + +func (t *Cortex) init(cfg *Config, m moduleName) error { + // initialize all of our dependencies first + for _, dep := range orderedDeps(m) { + if err := t.initModule(cfg, dep); err != nil { + return err + } + } + // lastly, initialize the requested module + return t.initModule(cfg, m) +} + +func (t *Cortex) initModule(cfg *Config, m moduleName) error { + level.Info(util.Logger).Log("msg", "initialising", "module", m) + if modules[m].init != nil { + if err := modules[m].init(t, cfg); err != nil { + return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m)) + } + } + return nil +} + +// Run starts Cortex running, and blocks until a signal is received. +func (t *Cortex) Run() error { + return t.server.Run() +} + +// Stop gracefully stops a Cortex. +func (t *Cortex) Stop() error { + t.server.Shutdown() + t.stop(t.target) + return nil +} + +func (t *Cortex) stop(m moduleName) { + t.stopModule(m) + deps := orderedDeps(m) + // iterate over our deps in reverse order and call stopModule + for i := len(deps) - 1; i >= 0; i-- { + t.stopModule(deps[i]) + } +} + +func (t *Cortex) stopModule(m moduleName) { + level.Info(util.Logger).Log("msg", "stopping", "module", m) + if modules[m].stop != nil { + if err := modules[m].stop(t); err != nil { + level.Error(util.Logger).Log("msg", "error stopping", "module", m, "err", err) + } + } +} + +// listDeps recursively gets a list of dependencies for a passed moduleName +func listDeps(m moduleName) []moduleName { + deps := modules[m].deps + for _, d := range modules[m].deps { + deps = append(deps, listDeps(d)...) + } + return deps +} + +// orderedDeps gets a list of all dependencies ordered so that items are always after any of their dependencies. +func orderedDeps(m moduleName) []moduleName { + deps := listDeps(m) + + // get a unique list of moduleNames, with a flag for whether they have been added to our result + uniq := map[moduleName]bool{} + for _, dep := range deps { + uniq[dep] = false + } + + result := make([]moduleName, 0, len(uniq)) + + // keep looping through all modules until they have all been added to the result. + + for len(result) < len(uniq) { + OUTER: + for name, added := range uniq { + if added { + continue + } + for _, dep := range modules[name].deps { + // stop processing this module if one of its dependencies has + // not been added to the result yet. + if !uniq[dep] { + continue OUTER + } + } + + // if all of the module's dependencies have been added to the result slice, + // then we can safely add this module to the result slice as well. + uniq[name] = true + result = append(result, name) + } + } + return result +} diff --git a/pkg/cortex/fake_auth.go b/pkg/cortex/fake_auth.go new file mode 100644 index 00000000000..5e3d4c15d35 --- /dev/null +++ b/pkg/cortex/fake_auth.go @@ -0,0 +1,42 @@ +package cortex + +import ( + "context" + "net/http" + + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" +) + +// Fake auth middlewares just injects a fake userID, so the rest of the code +// can continue to be multitenant. + +var fakeHTTPAuthMiddleware = middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := user.InjectOrgID(r.Context(), "fake") + next.ServeHTTP(w, r.WithContext(ctx)) + }) +}) + +var fakeGRPCAuthUniaryMiddleware = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ctx = user.InjectOrgID(ctx, "fake") + return handler(ctx, req) +} + +var fakeGRPCAuthStreamMiddleware = func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := user.InjectOrgID(ss.Context(), "fake") + return handler(srv, serverStream{ + ctx: ctx, + ServerStream: ss, + }) +} + +type serverStream struct { + ctx context.Context + grpc.ServerStream +} + +func (ss serverStream) Context() context.Context { + return ss.ctx +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go new file mode 100644 index 00000000000..bc3d1a373b7 --- /dev/null +++ b/pkg/cortex/modules.go @@ -0,0 +1,467 @@ +package cortex + +import ( + "fmt" + "net/http" + "os" + "regexp" + "strings" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" + v1 "github.com/prometheus/prometheus/web/api/v1" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/alertmanager" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/configs/api" + config_client "github.com/cortexproject/cortex/pkg/configs/client" + "github.com/cortexproject/cortex/pkg/configs/db" + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/frontend" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ruler" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type moduleName int + +// The various modules that make up Loki. +const ( + Ring moduleName = iota + Overrides + Server + Distributor + Ingester + Querier + QueryFrontend + Store + TableManager + Ruler + Configs + AlertManager + All +) + +func (m moduleName) String() string { + switch m { + case Ring: + return "ring" + case Overrides: + return "overrides" + case Server: + return "server" + case Distributor: + return "distributor" + case Store: + return "store" + case Ingester: + return "ingester" + case Querier: + return "querier" + case QueryFrontend: + return "query-frontend" + case TableManager: + return "table-manager" + case Ruler: + return "ruler" + case Configs: + return "configs" + case AlertManager: + return "alertmanager" + case All: + return "all" + default: + panic(fmt.Sprintf("unknown module name: %d", m)) + } +} + +func (m *moduleName) Set(s string) error { + switch strings.ToLower(s) { + case "ring": + *m = Ring + return nil + case "overrides": + *m = Overrides + return nil + case "server": + *m = Server + return nil + case "distributor": + *m = Distributor + return nil + case "store": + *m = Store + return nil + case "ingester": + *m = Ingester + return nil + case "querier": + *m = Querier + return nil + case "query-frontend": + *m = QueryFrontend + return nil + case "table-manager": + *m = TableManager + return nil + case "ruler": + *m = Ruler + return nil + case "configs": + *m = Configs + return nil + case "alertmanager": + *m = AlertManager + return nil + case "all": + *m = All + return nil + default: + return fmt.Errorf("unrecognised module name: %s", s) + } +} + +func (t *Cortex) initServer(cfg *Config) (err error) { + t.server, err = server.New(cfg.Server) + return +} + +func (t *Cortex) stopServer() (err error) { + t.server.Shutdown() + return +} + +func (t *Cortex) initRing(cfg *Config) (err error) { + t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig) + if err != nil { + return + } + prometheus.MustRegister(t.ring) + t.server.HTTP.Handle("/ring", t.ring) + return +} + +func (t *Cortex) initOverrides(cfg *Config) (err error) { + t.overrides, err = validation.NewOverrides(cfg.LimitsConfig) + return err +} + +func (t *Cortex) stopOverrides() error { + t.overrides.Stop() + return nil +} + +func (t *Cortex) initDistributor(cfg *Config) (err error) { + t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.overrides, t.ring) + if err != nil { + return + } + + t.server.HTTP.HandleFunc("/user_stats", t.distributor.UserStatsHandler) + t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler) + t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler))) + return +} + +func (t *Cortex) stopDistributor() (err error) { + t.distributor.Stop() + return nil +} + +func (t *Cortex) initQuerier(cfg *Config) (err error) { + t.worker, err = frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger) + if err != nil { + return + } + + queryable, engine := querier.New(cfg.Querier, t.distributor, t.store) + api := v1.NewAPI( + engine, + queryable, + querier.DummyTargetRetriever{}, + querier.DummyAlertmanagerRetriever{}, + func() config.Config { return config.Config{} }, + map[string]string{}, // TODO: include configuration flags + func(f http.HandlerFunc) http.HandlerFunc { return f }, + func() v1.TSDBAdmin { return nil }, // Only needed for admin APIs. + false, // Disable admin APIs. + util.Logger, + querier.DummyRulesRetriever{}, + 0, 0, // Remote read samples and concurrency limit. + regexp.MustCompile(".*"), + ) + promRouter := route.New().WithPrefix("/api/prom/api/v1") + api.Register(promRouter) + + subrouter := t.server.HTTP.PathPrefix("/api/prom").Subrouter() + subrouter.PathPrefix("/api/v1").Handler(t.httpAuthMiddleware.Wrap(promRouter)) + subrouter.Path("/read").Handler(t.httpAuthMiddleware.Wrap(querier.RemoteReadHandler(queryable))) + subrouter.Path("/validate_expr").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.ValidateExprHandler))) + subrouter.Path("/chunks").Handler(t.httpAuthMiddleware.Wrap(querier.ChunksHandler(queryable))) + return +} + +func (t *Cortex) stopQuerier() error { + t.worker.Stop() + return nil +} + +func (t *Cortex) initIngester(cfg *Config) (err error) { + cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort + t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store) + if err != nil { + return + } + + client.RegisterIngesterServer(t.server.GRPC, t.ingester) + grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) + t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler)) + t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) + return +} + +func (t *Cortex) stopIngester() error { + t.ingester.Shutdown() + return nil +} + +func (t *Cortex) initStore(cfg *Config) (err error) { + err = cfg.Schema.Load() + if err != nil { + return + } + + t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides) + return +} + +func (t *Cortex) stopStore() error { + t.store.Stop() + return nil +} + +func (t *Cortex) initQueryFrontend(cfg *Config) (err error) { + t.frontend, err = frontend.New(cfg.Frontend, util.Logger, t.overrides) + if err != nil { + return + } + + frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) + t.server.HTTP.PathPrefix("/api/prom").Handler( + t.httpAuthMiddleware.Wrap( + t.frontend.Handler(), + ), + ) + return +} + +func (t *Cortex) stopQueryFrontend() (err error) { + t.frontend.Close() + return +} + +func (t *Cortex) initTableManager(cfg *Config) error { + err := cfg.Schema.Load() + if err != nil { + return err + } + + // Assume the newest config is the one to use + lastConfig := &cfg.Schema.Configs[len(cfg.Schema.Configs)-1] + + if (cfg.TableManager.ChunkTables.WriteScale.Enabled || + cfg.TableManager.IndexTables.WriteScale.Enabled || + cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || + cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || + cfg.TableManager.ChunkTables.ReadScale.Enabled || + cfg.TableManager.IndexTables.ReadScale.Enabled || + cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || + cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && + (cfg.Storage.AWSStorageConfig.ApplicationAutoScaling.URL == nil && cfg.Storage.AWSStorageConfig.Metrics.URL == "") { + level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided") + os.Exit(1) + } + + tableClient, err := storage.NewTableClient(lastConfig.IndexType, cfg.Storage) + if err != nil { + return err + } + + t.tableManager, err = chunk.NewTableManager(cfg.TableManager, cfg.Schema, cfg.Ingester.MaxChunkAge, tableClient) + if err != nil { + return err + } + t.tableManager.Start() + return nil +} + +func (t *Cortex) stopTableManager() error { + t.tableManager.Stop() + return nil +} + +func (t *Cortex) initRuler(cfg *Config) (err error) { + cfg.Querier.MaxConcurrent = cfg.Ruler.NumWorkers + cfg.Querier.Timeout = cfg.Ruler.GroupTimeout + queryable, engine := querier.New(cfg.Querier, t.distributor, t.store) + t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor) + if err != nil { + return + } + + rulesAPI, err := config_client.New(cfg.ConfigStore) + if err != nil { + return err + } + + t.rulerServer, err = ruler.NewServer(cfg.Ruler, t.ruler, rulesAPI) + if err != nil { + return err + } + + // Only serve the API for setting & getting rules configs if we're not + // serving configs from the configs API. Allows for smoother + // migration. See https://github.com/cortexproject/cortex/issues/619 + if cfg.ConfigStore.ConfigsAPIURL.URL == nil { + a, err := ruler.NewAPIFromConfig(cfg.ConfigStore.DBConfig) + if err != nil { + return err + } + a.RegisterRoutes(t.server.HTTP) + } + return +} + +func (t *Cortex) stopRuler() error { + t.rulerServer.Stop() + t.ruler.Stop() + return nil +} + +func (t *Cortex) initConfigs(cfg *Config) (err error) { + t.configDB, err = db.New(cfg.ConfigStore.DBConfig) + if err != nil { + return + } + + t.configAPI = api.New(t.configDB) + t.configAPI.RegisterRoutes(t.server.HTTP) + return +} + +func (t *Cortex) stopConfigs() error { + t.configDB.Close() + return nil +} + +func (t *Cortex) initAlertmanager(cfg *Config) (err error) { + t.alertmanager, err = alertmanager.NewMultitenantAlertmanager(&cfg.Alertmanager, cfg.ConfigStore) + if err != nil { + return + } + go t.alertmanager.Run() + + t.server.HTTP.PathPrefix("/status").Handler(t.alertmanager.GetStatusHandler()) + + // TODO this clashed with the queirer and the distributor, so we cannot + // run them in the same process. + t.server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(t.alertmanager)) + return +} + +func (t *Cortex) stopAlertmanager() error { + t.alertmanager.Stop() + return nil +} + +type module struct { + deps []moduleName + init func(t *Cortex, cfg *Config) error + stop func(t *Cortex) error +} + +var modules = map[moduleName]module{ + Server: { + init: (*Cortex).initServer, + stop: (*Cortex).stopServer, + }, + + Ring: { + deps: []moduleName{Server}, + init: (*Cortex).initRing, + }, + + Overrides: { + init: (*Cortex).initOverrides, + stop: (*Cortex).stopOverrides, + }, + + Distributor: { + deps: []moduleName{Ring, Server, Overrides}, + init: (*Cortex).initDistributor, + stop: (*Cortex).stopDistributor, + }, + + Store: { + deps: []moduleName{Overrides}, + init: (*Cortex).initStore, + stop: (*Cortex).stopStore, + }, + + Ingester: { + deps: []moduleName{Overrides, Store, Server}, + init: (*Cortex).initIngester, + stop: (*Cortex).stopIngester, + }, + + Querier: { + deps: []moduleName{Distributor, Store, Ring, Server}, + init: (*Cortex).initQuerier, + stop: (*Cortex).stopQuerier, + }, + + QueryFrontend: { + deps: []moduleName{Server, Overrides}, + init: (*Cortex).initQueryFrontend, + stop: (*Cortex).stopQueryFrontend, + }, + + TableManager: { + deps: []moduleName{Server}, + init: (*Cortex).initTableManager, + stop: (*Cortex).stopTableManager, + }, + + Ruler: { + deps: []moduleName{Distributor, Store}, + init: (*Cortex).initRuler, + stop: (*Cortex).stopRuler, + }, + + Configs: { + deps: []moduleName{Server}, + init: (*Cortex).initConfigs, + stop: (*Cortex).stopConfigs, + }, + + AlertManager: { + deps: []moduleName{Server}, + init: (*Cortex).initAlertmanager, + stop: (*Cortex).stopAlertmanager, + }, + + All: { + deps: []moduleName{Querier, Ingester, Distributor, TableManager}, + }, +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 753da6b9add..7e5faae2f93 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -95,15 +95,15 @@ type Distributor struct { // Config contains the configuration require to // create a Distributor type Config struct { - EnableBilling bool - BillingConfig billing.Config - PoolConfig ingester_client.PoolConfig + EnableBilling bool `yaml:"enable_billing,omitempty"` + BillingConfig billing.Config `yaml:"billing,omitempty"` + PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"` - RemoteTimeout time.Duration - ExtraQueryDelay time.Duration - LimiterReloadPeriod time.Duration + RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"` + ExtraQueryDelay time.Duration `yaml:"extra_queue_delay,omitempty"` + LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period,omitempty"` - ShardByAllLabels bool + ShardByAllLabels bool `yaml:"shard_by_all_labels,omitempty"` // for testing ingesterClientFactory client.Factory @@ -180,7 +180,6 @@ func (d *Distributor) loop() { // Stop stops the distributor's maintenance loop. func (d *Distributor) Stop() { close(d.quit) - d.limits.Stop() d.ingesterPool.Stop() } diff --git a/pkg/ingester/client/pool.go b/pkg/ingester/client/pool.go index 56e9b7d42e2..303c78f78c2 100644 --- a/pkg/ingester/client/pool.go +++ b/pkg/ingester/client/pool.go @@ -30,8 +30,8 @@ type Factory func(addr string) (grpc_health_v1.HealthClient, error) // PoolConfig is config for creating a Pool. type PoolConfig struct { - ClientCleanupPeriod time.Duration - HealthCheckIngesters bool + ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period,omitempty"` + HealthCheckIngesters bool `yaml:"health_check_ingesters,omitempty"` RemoteTimeout time.Duration } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index db4fb1a504a..f8c8a166929 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -78,10 +78,10 @@ var ( // Config for an Ingester. type Config struct { - LifecyclerConfig ring.LifecyclerConfig + LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` // Config for transferring chunks. - MaxTransferRetries int + MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` // Config for chunk flushing. FlushCheckPeriod time.Duration @@ -206,8 +206,6 @@ func (i *Ingester) loop() { // Shutdown beings the process to stop this ingester. func (i *Ingester) Shutdown() { - i.limits.Stop() - // First wait for our flush loop to stop. close(i.quit) i.done.Wait() diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 19d3a38886b..45d3dfc0510 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -50,13 +50,13 @@ var ( // Config for a Frontend. type Config struct { - MaxOutstandingPerTenant int - MaxRetries int - SplitQueriesByDay bool - AlignQueriesWithStep bool - CacheResults bool - CompressResponses bool - resultsCacheConfig + MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"` + MaxRetries int `yaml:"max_retries"` + SplitQueriesByDay bool `yaml:"split_queries_by_day"` + AlignQueriesWithStep bool `yaml:"align_queries_with_step"` + CacheResults bool `yaml:"cache_results"` + CompressResponses bool `yaml:"compress_responses"` + ResultsCacheConfig `yaml:"results_cache"` } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -67,7 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.") f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.") - cfg.resultsCacheConfig.RegisterFlags(f) + cfg.ResultsCacheConfig.RegisterFlags(f) } // Frontend queues HTTP requests, dispatches them to backends, and handles retries @@ -109,7 +109,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e queryRangeMiddleware = append(queryRangeMiddleware, splitByDayMiddleware(limits)) } if cfg.CacheResults { - queryCacheMiddleware, err := newResultsCacheMiddleware(cfg.resultsCacheConfig, limits) + queryCacheMiddleware, err := newResultsCacheMiddleware(cfg.ResultsCacheConfig, limits) if err != nil { return nil, err } diff --git a/pkg/querier/frontend/results_cache.go b/pkg/querier/frontend/results_cache.go index 1000ab04c3f..55a23769d9f 100644 --- a/pkg/querier/frontend/results_cache.go +++ b/pkg/querier/frontend/results_cache.go @@ -18,25 +18,27 @@ import ( "github.com/weaveworks/common/user" ) -type resultsCacheConfig struct { - cacheConfig cache.Config - MaxCacheFreshness time.Duration +// ResultsCacheConfig is the config for the results cache. +type ResultsCacheConfig struct { + CacheConfig cache.Config `yaml:"cache"` + MaxCacheFreshness time.Duration `yaml:"max_freshness"` } -func (cfg *resultsCacheConfig) RegisterFlags(f *flag.FlagSet) { - cfg.cacheConfig.RegisterFlagsWithPrefix("", "", f) +// RegisterFlags registers flags. +func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) { + cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f) f.DurationVar(&cfg.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.") } type resultsCache struct { - cfg resultsCacheConfig + cfg ResultsCacheConfig next queryRangeHandler cache cache.Cache limits *validation.Overrides } -func newResultsCacheMiddleware(cfg resultsCacheConfig, limits *validation.Overrides) (queryRangeMiddleware, error) { - c, err := cache.New(cfg.cacheConfig) +func newResultsCacheMiddleware(cfg ResultsCacheConfig, limits *validation.Overrides) (queryRangeMiddleware, error) { + c, err := cache.New(cfg.CacheConfig) if err != nil { return nil, err } diff --git a/pkg/querier/frontend/results_cache_test.go b/pkg/querier/frontend/results_cache_test.go index e5d3520bcd4..fb02875209d 100644 --- a/pkg/querier/frontend/results_cache_test.go +++ b/pkg/querier/frontend/results_cache_test.go @@ -167,8 +167,8 @@ func defaultOverrides(t *testing.T) *validation.Overrides { func TestResultsCache(t *testing.T) { calls := 0 rcm, err := newResultsCacheMiddleware( - resultsCacheConfig{ - cacheConfig: cache.Config{ + ResultsCacheConfig{ + CacheConfig: cache.Config{ Cache: cache.NewMockCache(), }, }, @@ -201,9 +201,9 @@ func TestResultsCache(t *testing.T) { } func TestResultsCacheRecent(t *testing.T) { - var cfg resultsCacheConfig + var cfg ResultsCacheConfig flagext.DefaultValues(&cfg) - cfg.cacheConfig.Cache = cache.NewMockCache() + cfg.CacheConfig.Cache = cache.NewMockCache() rcm, err := newResultsCacheMiddleware(cfg, defaultOverrides(t)) require.NoError(t, err) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 04260451256..75539b74f28 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -17,13 +17,13 @@ import ( // API implements the configs api. type API struct { - db db.RulesDB + db db.DB http.Handler } // NewAPIFromConfig makes a new API from our database config. func NewAPIFromConfig(cfg db.Config) (*API, error) { - db, err := db.NewRulesDB(cfg) + db, err := db.New(cfg) if err != nil { return nil, err } @@ -31,7 +31,7 @@ func NewAPIFromConfig(cfg db.Config) (*API, error) { } // NewAPI creates a new API. -func NewAPI(db db.RulesDB) *API { +func NewAPI(db db.DB) *API { a := &API{db: db} r := mux.NewRouter() a.RegisterRoutes(r) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index f01587bb1b2..58bf26fd5b3 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs" "github.com/cortexproject/cortex/pkg/configs/api" + "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/configs/db/dbtest" "github.com/weaveworks/common/user" @@ -28,7 +29,7 @@ var ( app *API database db.DB counter int - privateAPI RulesAPI + privateAPI client.Client ) // setup sets up the environment for the tests. @@ -36,7 +37,14 @@ func setup(t *testing.T) { database = dbtest.Setup(t) app = NewAPI(database) counter = 0 - privateAPI = dbStore{db: database} + var err error + privateAPI, err = client.New(client.Config{ + DBConfig: db.Config{ + URI: "mock", // trigger client.NewConfigClient to use the mock DB. + Mock: database, + }, + }) + require.NoError(t, err) } // cleanup cleans up the environment after a test. @@ -339,7 +347,7 @@ func Test_GetAllConfigs_Empty(t *testing.T) { setup(t) defer cleanup(t) - configs, err := privateAPI.GetConfigs(0) + configs, err := privateAPI.GetRules(0) assert.NoError(t, err, "error getting configs") assert.Equal(t, 0, len(configs)) } @@ -353,7 +361,7 @@ func Test_GetAllConfigs(t *testing.T) { config := makeRulerConfig(configs.RuleFormatV2) view := post(t, userID, configs.RulesConfig{}, config) - found, err := privateAPI.GetConfigs(0) + found, err := privateAPI.GetRules(0) assert.NoError(t, err, "error getting configs") assert.Equal(t, map[string]configs.VersionedRulesConfig{ userID: view, @@ -371,7 +379,7 @@ func Test_GetAllConfigs_Newest(t *testing.T) { config2 := post(t, userID, config1.Config, makeRulerConfig(configs.RuleFormatV2)) lastCreated := post(t, userID, config2.Config, makeRulerConfig(configs.RuleFormatV2)) - found, err := privateAPI.GetConfigs(0) + found, err := privateAPI.GetRules(0) assert.NoError(t, err, "error getting configs") assert.Equal(t, map[string]configs.VersionedRulesConfig{ userID: lastCreated, @@ -387,7 +395,7 @@ func Test_GetConfigs_IncludesNewerConfigsAndExcludesOlder(t *testing.T) { userID3 := makeUserID() config3 := post(t, userID3, configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - found, err := privateAPI.GetConfigs(config2.ID) + found, err := privateAPI.GetRules(config2.ID) assert.NoError(t, err, "error getting configs") assert.Equal(t, map[string]configs.VersionedRulesConfig{ userID3: config3, @@ -432,7 +440,7 @@ func Test_AlertmanagerConfig_NotInAllConfigs(t *testing.T) { - name: noop`) postAlertmanagerConfig(t, makeUserID(), config) - found, err := privateAPI.GetConfigs(0) + found, err := privateAPI.GetRules(0) assert.NoError(t, err, "error getting configs") assert.Equal(t, map[string]configs.VersionedRulesConfig{}, found) } diff --git a/pkg/ruler/configs.go b/pkg/ruler/configs.go deleted file mode 100644 index d209369e629..00000000000 --- a/pkg/ruler/configs.go +++ /dev/null @@ -1,108 +0,0 @@ -package ruler - -import ( - "flag" - "fmt" - "net/url" - "time" - - "github.com/cortexproject/cortex/pkg/configs" - configs_client "github.com/cortexproject/cortex/pkg/configs/client" - "github.com/cortexproject/cortex/pkg/configs/db" - "github.com/cortexproject/cortex/pkg/util/flagext" -) - -// ConfigStoreConfig says where we can find the ruler configs. -type ConfigStoreConfig struct { - DBConfig db.Config - - // DEPRECATED - ConfigsAPIURL flagext.URLValue - - // DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud - // configs service. - ClientTimeout time.Duration -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *ConfigStoreConfig) RegisterFlags(f *flag.FlagSet) { - cfg.DBConfig.RegisterFlags(f) - f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.") - f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") -} - -// RulesAPI is what the ruler needs from a config store to process rules. -type RulesAPI interface { - // GetConfigs returns all Cortex configurations from a configs API server - // that have been updated after the given configs.ID was last updated. - GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) -} - -// NewRulesAPI creates a new RulesAPI. -func NewRulesAPI(cfg ConfigStoreConfig) (RulesAPI, error) { - // All of this falderal is to allow for a smooth transition away from - // using the configs server and toward directly connecting to the database. - // See https://github.com/cortexproject/cortex/issues/619 - if cfg.ConfigsAPIURL.URL != nil { - return configsClient{ - URL: cfg.ConfigsAPIURL.URL, - Timeout: cfg.ClientTimeout, - }, nil - } - db, err := db.NewRulesDB(cfg.DBConfig) - if err != nil { - return nil, err - } - return dbStore{db: db}, nil -} - -// configsClient allows retrieving recording and alerting rules from the configs server. -type configsClient struct { - URL *url.URL - Timeout time.Duration -} - -// GetConfigs implements RulesAPI. -func (c configsClient) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - suffix := "" - if since != 0 { - suffix = fmt.Sprintf("?since=%d", since) - } - endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) - response, err := configs_client.GetConfigs(endpoint, c.Timeout, since) - if err != nil { - return nil, err - } - configs := map[string]configs.VersionedRulesConfig{} - for id, view := range response.Configs { - cfg := view.GetVersionedRulesConfig() - if cfg != nil { - configs[id] = *cfg - } - } - return configs, nil -} - -type dbStore struct { - db db.RulesDB -} - -// GetConfigs implements RulesAPI. -func (d dbStore) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - if since == 0 { - return d.db.GetAllRulesConfigs() - } - return d.db.GetRulesConfigs(since) -} - -// getLatestConfigID gets the latest configs ID. -// max [latest, max (map getID cfgs)] -func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID { - ret := latest - for _, config := range cfgs { - if config.ID > ret { - ret = config.ID - } - } - return ret -} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index c23c4a537b7..07e4cc41dac 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -31,6 +31,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" + "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -401,7 +402,7 @@ type Server struct { } // NewServer makes a new rule processing server. -func NewServer(cfg Config, ruler *Ruler, rulesAPI RulesAPI) (*Server, error) { +func NewServer(cfg Config, ruler *Ruler, rulesAPI client.Client) (*Server, error) { // TODO: Separate configuration for polling interval. s := newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup) if cfg.NumWorkers <= 0 { diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index d744b878ed5..3f89a647c69 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -16,8 +16,8 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/util" - "github.com/weaveworks/common/instrument" ) var backoffConfig = util.BackoffConfig{ @@ -33,7 +33,7 @@ const ( var ( totalConfigs = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", - Name: "configs", + Name: "scheduler_configs_total", Help: "How many configs the scheduler knows about.", }) configUpdates = prometheus.NewCounter(prometheus.CounterOpts{ @@ -41,16 +41,9 @@ var ( Name: "scheduler_config_updates_total", Help: "How many config updates the scheduler has made.", }) - configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "configs_request_duration_seconds", - Help: "Time spent requesting configs.", - Buckets: prometheus.DefBuckets, - }, []string{"operation", "status_code"})) ) func init() { - configsRequestDuration.Register() prometheus.MustRegister(totalConfigs) prometheus.MustRegister(configUpdates) } @@ -90,7 +83,7 @@ type userConfig struct { type groupFactory func(userID string, groupName string, rls []rules.Rule) (*group, error) type scheduler struct { - rulesAPI RulesAPI + rulesAPI client.Client evaluationInterval time.Duration // how often we re-evaluate each rule set q *SchedulingQueue @@ -106,7 +99,7 @@ type scheduler struct { } // newScheduler makes a new scheduler. -func newScheduler(rulesAPI RulesAPI, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) scheduler { +func newScheduler(rulesAPI client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) scheduler { return scheduler{ rulesAPI: rulesAPI, evaluationInterval: evaluationInterval, @@ -177,12 +170,8 @@ func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { s.Lock() configID := s.latestConfig s.Unlock() - var cfgs map[string]configs.VersionedRulesConfig - err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - var err error - cfgs, err = s.rulesAPI.GetConfigs(configID) // Warning: this will produce an incorrect result if the configID ever overflows - return err - }) + + cfgs, err := s.rulesAPI.GetRules(configID) // Warning: this will produce an incorrect result if the configID ever overflows if err != nil { level.Warn(util.Logger).Log("msg", "scheduler: configs server poll failed", "err", err) return nil, err @@ -193,6 +182,18 @@ func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { return cfgs, nil } +// getLatestConfigID gets the latest configs ID. +// max [latest, max (map getID cfgs)] +func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID { + ret := latest + for _, config := range cfgs { + if config.ID > ret { + ret = config.ID + } + } + return ret +} + // computeNextEvalTime Computes when a user's rules should be next evaluated, based on how far we are through an evaluation cycle func (s *scheduler) computeNextEvalTime(hasher hash.Hash64, now time.Time, userID string) time.Time { intervalNanos := float64(s.evaluationInterval.Nanoseconds()) diff --git a/vendor/modules.txt b/vendor/modules.txt index b9c0ab2f6aa..a51ee751711 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -139,9 +139,9 @@ github.com/golang/protobuf/ptypes github.com/golang/protobuf/ptypes/wrappers github.com/golang/protobuf/ptypes/any github.com/golang/protobuf/ptypes/timestamp +github.com/golang/protobuf/protoc-gen-go/descriptor github.com/golang/protobuf/jsonpb github.com/golang/protobuf/protoc-gen-go/generator -github.com/golang/protobuf/protoc-gen-go/descriptor github.com/golang/protobuf/ptypes/struct github.com/golang/protobuf/protoc-gen-go/generator/internal/remap github.com/golang/protobuf/protoc-gen-go/plugin @@ -291,11 +291,11 @@ github.com/prometheus/client_golang/prometheus/internal # github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f github.com/prometheus/client_model/go # github.com/prometheus/common v0.3.0 +github.com/prometheus/common/version github.com/prometheus/common/route github.com/prometheus/common/model github.com/prometheus/common/config github.com/prometheus/common/expfmt -github.com/prometheus/common/version github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg # github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 github.com/prometheus/procfs @@ -303,8 +303,6 @@ github.com/prometheus/procfs/nfs github.com/prometheus/procfs/xfs github.com/prometheus/procfs/internal/util # github.com/prometheus/prometheus v0.0.0-20190417125241-3cc5f9d88062 -github.com/prometheus/prometheus/config -github.com/prometheus/prometheus/web/api/v1 github.com/prometheus/prometheus/pkg/labels github.com/prometheus/prometheus/promql github.com/prometheus/prometheus/pkg/rulefmt @@ -316,19 +314,21 @@ github.com/prometheus/prometheus/storage github.com/prometheus/prometheus/util/stats github.com/prometheus/prometheus/util/strutil github.com/prometheus/prometheus/util/testutil +github.com/prometheus/prometheus/config +github.com/prometheus/prometheus/web/api/v1 github.com/prometheus/prometheus/scrape github.com/prometheus/prometheus/discovery github.com/prometheus/prometheus/discovery/config github.com/prometheus/prometheus/discovery/dns github.com/prometheus/prometheus/discovery/targetgroup github.com/prometheus/prometheus/notifier -github.com/prometheus/prometheus/pkg/relabel github.com/prometheus/prometheus/pkg/gate +github.com/prometheus/prometheus/template +github.com/prometheus/prometheus/storage/tsdb +github.com/prometheus/prometheus/pkg/relabel github.com/prometheus/prometheus/prompb github.com/prometheus/prometheus/storage/remote github.com/prometheus/prometheus/util/httputil -github.com/prometheus/prometheus/template -github.com/prometheus/prometheus/storage/tsdb github.com/prometheus/prometheus/pkg/pool github.com/prometheus/prometheus/discovery/azure github.com/prometheus/prometheus/discovery/consul @@ -370,8 +370,11 @@ github.com/stretchr/testify/assert # github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d github.com/tinylib/msgp/msgp # github.com/uber/jaeger-client-go v2.14.0+incompatible -github.com/uber/jaeger-client-go github.com/uber/jaeger-client-go/config +github.com/uber/jaeger-client-go +github.com/uber/jaeger-client-go/internal/baggage/remote +github.com/uber/jaeger-client-go/internal/throttler/remote +github.com/uber/jaeger-client-go/rpcmetrics github.com/uber/jaeger-client-go/internal/baggage github.com/uber/jaeger-client-go/internal/spanlog github.com/uber/jaeger-client-go/internal/throttler @@ -381,28 +384,25 @@ github.com/uber/jaeger-client-go/thrift-gen/jaeger github.com/uber/jaeger-client-go/thrift-gen/sampling github.com/uber/jaeger-client-go/thrift-gen/zipkincore github.com/uber/jaeger-client-go/utils -github.com/uber/jaeger-client-go/internal/baggage/remote -github.com/uber/jaeger-client-go/internal/throttler/remote -github.com/uber/jaeger-client-go/rpcmetrics -github.com/uber/jaeger-client-go/thrift-gen/agent github.com/uber/jaeger-client-go/thrift-gen/baggage +github.com/uber/jaeger-client-go/thrift-gen/agent # github.com/uber/jaeger-lib v1.5.0 github.com/uber/jaeger-lib/metrics/prometheus github.com/uber/jaeger-lib/metrics # github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 github.com/weaveworks/billing-client # github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e -github.com/weaveworks/common/middleware -github.com/weaveworks/common/server github.com/weaveworks/common/tracing +github.com/weaveworks/common/server github.com/weaveworks/common/user -github.com/weaveworks/common/httpgrpc/server -github.com/weaveworks/common/instrument github.com/weaveworks/common/errors github.com/weaveworks/common/httpgrpc +github.com/weaveworks/common/instrument github.com/weaveworks/common/mtime github.com/weaveworks/common/aws github.com/weaveworks/common/logging +github.com/weaveworks/common/httpgrpc/server +github.com/weaveworks/common/middleware github.com/weaveworks/common/mflag github.com/weaveworks/common/mflagext github.com/weaveworks/common/signals @@ -433,11 +433,11 @@ go.opencensus.io/plugin/ochttp/propagation/tracecontext # golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/crypto/nacl/box golang.org/x/crypto/nacl/secretbox -golang.org/x/crypto/ssh/terminal golang.org/x/crypto/curve25519 golang.org/x/crypto/salsa20/salsa golang.org/x/crypto/internal/subtle golang.org/x/crypto/poly1305 +golang.org/x/crypto/ssh/terminal golang.org/x/crypto/ed25519 golang.org/x/crypto/ed25519/internal/edwards25519 # golang.org/x/net v0.0.0-20190311183353-d8887717615a @@ -465,8 +465,8 @@ golang.org/x/sync/errgroup golang.org/x/sync/semaphore # golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 golang.org/x/sys/unix -golang.org/x/sys/windows golang.org/x/sys/cpu +golang.org/x/sys/windows # golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 golang.org/x/text/secure/bidirule golang.org/x/text/unicode/bidi @@ -517,10 +517,10 @@ google.golang.org/genproto/googleapis/rpc/code google.golang.org/genproto/googleapis/api/httpbody # google.golang.org/grpc v1.19.1 google.golang.org/grpc -google.golang.org/grpc/encoding/gzip -google.golang.org/grpc/health/grpc_health_v1 google.golang.org/grpc/codes google.golang.org/grpc/status +google.golang.org/grpc/health/grpc_health_v1 +google.golang.org/grpc/encoding/gzip google.golang.org/grpc/naming google.golang.org/grpc/metadata google.golang.org/grpc/balancer @@ -545,11 +545,11 @@ google.golang.org/grpc/resolver/dns google.golang.org/grpc/resolver/passthrough google.golang.org/grpc/stats google.golang.org/grpc/tap +google.golang.org/grpc/credentials/oauth google.golang.org/grpc/balancer/base google.golang.org/grpc/credentials/internal google.golang.org/grpc/binarylog/grpc_binarylog_v1 google.golang.org/grpc/internal/syscall -google.golang.org/grpc/credentials/oauth # gopkg.in/fsnotify/fsnotify.v1 v1.4.7 gopkg.in/fsnotify/fsnotify.v1 # gopkg.in/inf.v0 v0.9.1