diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a8db7190..d30e8bfea8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 * [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731 +* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 * [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index efea8a6e1b..fc6d6d1b81 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3968,6 +3968,16 @@ alertmanager_client: # CLI flag: -ruler.resend-delay [resend_delay: | default = 1m] +# If enabled, rules from a single rule group can be evaluated concurrently if +# there is no dependency between each other. Max concurrency for each rule group +# is controlled via ruler.max-concurrent-evals flag. +# CLI flag: -ruler.concurrent-evals-enabled +[concurrent_evals_enabled: | default = false] + +# Max concurrency for a single rule group to evaluate independent rules. +# CLI flag: -ruler.max-concurrent-evals +[max_concurrent_evals: | default = 1] + # Distribute rule evaluation using ring backend # CLI flag: -ruler.enable-sharding [enable_sharding: | default = false] diff --git a/go.mod b/go.mod index c88130adee..76529524f6 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/prometheus/client_model v0.5.0 github.com/prometheus/common v0.46.0 // Prometheus maps version 2.x.y to tags v0.x.y. - github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce + github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6 github.com/segmentio/fasthash v1.0.3 github.com/sony/gobreaker v0.5.0 github.com/spf13/afero v1.9.5 diff --git a/go.sum b/go.sum index db86cddb73..767725baff 100644 --- a/go.sum +++ b/go.sum @@ -1320,8 +1320,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce h1:gHCPxX6dJZJOZh/nYy0DmnTu2PbgWjs8hY0eLgofPfA= -github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU= +github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6 h1:E1dnG12fSlUeHST75LpGqPpd/YCOSNqKD2CUmm3Em90= +github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU= github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw= github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 989bcf9c43..891c66c596 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "flag" "fmt" + "io" "os" "path" "path/filepath" @@ -30,7 +31,6 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" - thanos_testutil "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ring" @@ -1065,8 +1065,19 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"}) b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"}) - err := thanos_testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20) + // Read bad index file. + indexFile, err := os.Open("testdata/out_of_order_chunks/index") require.NoError(t, err) + indexFileStat, err := indexFile.Stat() + require.NoError(t, err) + + dir := path.Join(tmpDir, "user-1", b1.String()) + outputFile, err := os.OpenFile(path.Join(dir, "index"), os.O_RDWR|os.O_TRUNC, 0755) + require.NoError(t, err) + + n, err := io.Copy(outputFile, indexFile) + require.NoError(t, err) + require.Equal(t, indexFileStat.Size(), n) cfg := prepareConfig() cfg.SkipBlocksWithOutOfOrderChunksEnabled = true @@ -1097,7 +1108,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} { - if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil { + if _, err := os.Stat(path.Join(dir, "no-compact-mark.json")); err == nil { return true } return false diff --git a/pkg/compactor/testdata/out_of_order_chunks/index b/pkg/compactor/testdata/out_of_order_chunks/index new file mode 100644 index 0000000000..18ea1d1529 Binary files /dev/null and b/pkg/compactor/testdata/out_of_order_chunks/index differ diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index c5c289aae9..8443c43115 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -325,17 +325,19 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries) return rules.NewManager(&rules.ManagerOptions{ - Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), - Queryable: q, - QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger), - Context: user.InjectOrgID(ctx, userID), - ExternalURL: cfg.ExternalURL.URL, - NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()), - Logger: log.With(logger, "user", userID), - Registerer: reg, - OutageTolerance: cfg.OutageTolerance, - ForGracePeriod: cfg.ForGracePeriod, - ResendDelay: cfg.ResendDelay, + Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), + Queryable: q, + QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger), + Context: user.InjectOrgID(ctx, userID), + ExternalURL: cfg.ExternalURL.URL, + NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()), + Logger: log.With(logger, "user", userID), + Registerer: reg, + OutageTolerance: cfg.OutageTolerance, + ForGracePeriod: cfg.ForGracePeriod, + ResendDelay: cfg.ResendDelay, + ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled, + MaxConcurrentEvals: cfg.MaxConcurrentEvals, }) } } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 7476eaadbe..42941bf59c 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -44,8 +44,9 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} // Validation errors. - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") ) const ( @@ -95,7 +96,7 @@ type Config struct { RulePath string `yaml:"rule_path"` // URL of the Alertmanager to send notifications to. - // If your are configuring the ruler to send to a Cortex Alertmanager, + // If you are configuring the ruler to send to a Cortex Alertmanager, // ensure this includes any path set in the Alertmanager external URL. AlertmanagerURL string `yaml:"alertmanager_url"` // Whether to use DNS SRV records to discover Alertmanager. @@ -118,6 +119,9 @@ type Config struct { // Minimum amount of time to wait before resending an alert to Alertmanager. ResendDelay time.Duration `yaml:"resend_delay"` + ConcurrentEvalsEnabled bool `yaml:"concurrent_evals_enabled"` + MaxConcurrentEvals int64 `yaml:"max_concurrent_evals"` + // Enable sharding rule groups. EnableSharding bool `yaml:"enable_sharding"` ShardingStrategy string `yaml:"sharding_strategy"` @@ -149,6 +153,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if err := cfg.ClientTLSConfig.Validate(log); err != nil { return errors.Wrap(err, "invalid ruler gRPC client config") } + + if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 { + return errInvalidMaxConcurrentEvals + } return nil } @@ -188,6 +196,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.OutageTolerance, "ruler.for-outage-tolerance", time.Hour, `Max time to tolerate outage for restoring "for" state of alert.`) f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`) f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`) + f.BoolVar(&cfg.ConcurrentEvalsEnabled, "ruler.concurrent-evals-enabled", false, `If enabled, rules from a single rule group can be evaluated concurrently if there is no dependency between each other. Max concurrency for each rule group is controlled via ruler.max-concurrent-evals flag.`) + f.Int64Var(&cfg.MaxConcurrentEvals, "ruler.max-concurrent-evals", 1, `Max concurrency for a single rule group to evaluate independent rules.`) f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.") f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.") diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/client.go b/vendor/github.com/prometheus/prometheus/storage/remote/client.go index fbb6804983..e765b47c3e 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/client.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/client.go @@ -141,24 +141,24 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { } t := httpClient.Transport + if len(conf.Headers) > 0 { + t = newInjectHeadersRoundTripper(conf.Headers, t) + } + if conf.SigV4Config != nil { - t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, httpClient.Transport) + t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, t) if err != nil { return nil, err } } if conf.AzureADConfig != nil { - t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, httpClient.Transport) + t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, t) if err != nil { return nil, err } } - if len(conf.Headers) > 0 { - t = newInjectHeadersRoundTripper(conf.Headers, t) - } - httpClient.Transport = otelhttp.NewTransport(t) return &Client{ diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go index 2856fc78f0..84c7716849 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go @@ -146,8 +146,11 @@ type Writer struct { labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. - lastSeries labels.Labels - lastRef storage.SeriesRef + lastSeries labels.Labels + lastSeriesRef storage.SeriesRef + + // Hold last added chunk reference to make sure that chunks are ordered properly. + lastChunkRef chunks.ChunkRef crc32 hash.Hash @@ -433,9 +436,27 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... return fmt.Errorf("out-of-order series added with label set %q", lset) } - if ref < w.lastRef && !w.lastSeries.IsEmpty() { + if ref < w.lastSeriesRef && !w.lastSeries.IsEmpty() { return fmt.Errorf("series with reference greater than %d already added", ref) } + + lastChunkRef := w.lastChunkRef + lastMaxT := int64(0) + for ix, c := range chunks { + if c.Ref < lastChunkRef { + return fmt.Errorf("unsorted chunk reference: %d, previous: %d", c.Ref, lastChunkRef) + } + lastChunkRef = c.Ref + + if ix > 0 && c.MinTime <= lastMaxT { + return fmt.Errorf("chunk minT %d is not higher than previous chunk maxT %d", c.MinTime, lastMaxT) + } + if c.MaxTime < c.MinTime { + return fmt.Errorf("chunk maxT %d is less than minT %d", c.MaxTime, c.MinTime) + } + lastMaxT = c.MaxTime + } + // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. if err := w.addPadding(seriesByteAlign); err != nil { @@ -510,7 +531,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... } w.lastSeries.CopyFrom(lset) - w.lastRef = ref + w.lastSeriesRef = ref + w.lastChunkRef = lastChunkRef return nil } diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/copy.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/copy.go deleted file mode 100644 index 6464cd02ba..0000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/copy.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package e2eutil - -import ( - "io" - "os" - "path/filepath" - "testing" - - "github.com/efficientgo/core/testutil" - "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/runutil" -) - -func Copy(t testing.TB, src, dst string) { - testutil.Ok(t, copyRecursive(src, dst)) -} - -func copyRecursive(src, dst string) error { - return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - relPath, err := filepath.Rel(src, path) - if err != nil { - return err - } - - if info.IsDir() { - return os.MkdirAll(filepath.Join(dst, relPath), os.ModePerm) - } - - if !info.Mode().IsRegular() { - return errors.Errorf("%s is not a regular file", path) - } - - source, err := os.Open(filepath.Clean(path)) - if err != nil { - return err - } - defer runutil.CloseWithErrCapture(&err, source, "close file") - - destination, err := os.Create(filepath.Join(dst, relPath)) - if err != nil { - return err - } - defer runutil.CloseWithErrCapture(&err, destination, "close file") - - _, err = io.Copy(destination, source) - return err - }) -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/port.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/port.go deleted file mode 100644 index 986f1c7d7f..0000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/port.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package e2eutil - -import "net" - -// FreePort returns port that is free now. -func FreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", ":0") - if err != nil { - return 0, err - } - - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - return l.Addr().(*net.TCPAddr).Port, l.Close() -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go deleted file mode 100644 index 453048a173..0000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go +++ /dev/null @@ -1,813 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package e2eutil - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "math" - "math/rand" - "net/http" - "os" - "os/exec" - "path" - "path/filepath" - "runtime" - "sort" - "strings" - "sync" - "syscall" - "testing" - "time" - - "github.com/efficientgo/core/testutil" - "github.com/go-kit/log" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/index" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/runutil" -) - -const ( - defaultPrometheusVersion = "v0.37.0" - defaultAlertmanagerVersion = "v0.20.0" - defaultMinioVersion = "RELEASE.2022-07-30T05-21-40Z" - - // Space delimited list of versions. - promPathsEnvVar = "THANOS_TEST_PROMETHEUS_PATHS" - alertmanagerBinEnvVar = "THANOS_TEST_ALERTMANAGER_PATH" - minioBinEnvVar = "THANOS_TEST_MINIO_PATH" - - // A placeholder for actual Prometheus instance address in the scrape config. - PromAddrPlaceHolder = "PROMETHEUS_ADDRESS" -) - -var ( - histogramSample = histogram.Histogram{ - Schema: 0, - Count: 20, - Sum: -3.1415, - ZeroCount: 12, - ZeroThreshold: 0.001, - NegativeSpans: []histogram.Span{ - {Offset: 0, Length: 4}, - {Offset: 1, Length: 1}, - }, - NegativeBuckets: []int64{1, 2, -2, 1, -1}, - } - - floatHistogramSample = histogram.FloatHistogram{ - ZeroThreshold: 0.01, - ZeroCount: 5.5, - Count: 15, - Sum: 11.5, - PositiveSpans: []histogram.Span{ - {Offset: -2, Length: 2}, - {Offset: 1, Length: 3}, - }, - PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5}, - NegativeSpans: []histogram.Span{ - {Offset: 3, Length: 2}, - {Offset: 3, Length: 2}, - }, - NegativeBuckets: []float64{1.5, 0.5, 2.5, 3}, - } -) - -func PrometheusBinary() string { - return "prometheus-" + defaultPrometheusVersion -} - -func AlertmanagerBinary() string { - b := os.Getenv(alertmanagerBinEnvVar) - if b == "" { - return fmt.Sprintf("alertmanager-%s", defaultAlertmanagerVersion) - } - return b -} - -func MinioBinary() string { - b := os.Getenv(minioBinEnvVar) - if b == "" { - return fmt.Sprintf("minio-%s", defaultMinioVersion) - } - return b -} - -// Prometheus represents a test instance for integration testing. -// It can be populated with data before being started. -type Prometheus struct { - dir string - db *tsdb.DB - prefix string - binPath string - - running bool - cmd *exec.Cmd - disabledCompaction bool - addr string - - config string - - stdout, stderr bytes.Buffer -} - -func NewTSDB() (*tsdb.DB, error) { - dir, err := os.MkdirTemp("", "prometheus-test") - if err != nil { - return nil, err - } - opts := tsdb.DefaultOptions() - opts.RetentionDuration = math.MaxInt64 - return tsdb.Open(dir, nil, nil, opts, nil) -} - -func ForeachPrometheus(t *testing.T, testFn func(t testing.TB, p *Prometheus)) { - paths := os.Getenv(promPathsEnvVar) - if paths == "" { - paths = PrometheusBinary() - } - - for _, path := range strings.Split(paths, " ") { - if ok := t.Run(path, func(t *testing.T) { - p, err := newPrometheus(path, "") - testutil.Ok(t, err) - - testFn(t, p) - testutil.Ok(t, p.Stop()) - }); !ok { - return - } - } -} - -// NewPrometheus creates a new test Prometheus instance that will listen on local address. -// Use ForeachPrometheus if you want to test against set of Prometheus versions. -// TODO(bwplotka): Improve it with https://github.com/thanos-io/thanos/issues/758. -func NewPrometheus() (*Prometheus, error) { - return newPrometheus("", "") -} - -// NewPrometheusOnPath creates a new test Prometheus instance that will listen on local address and given prefix path. -func NewPrometheusOnPath(prefix string) (*Prometheus, error) { - return newPrometheus("", prefix) -} - -func newPrometheus(binPath, prefix string) (*Prometheus, error) { - if binPath == "" { - binPath = PrometheusBinary() - } - - db, err := NewTSDB() - if err != nil { - return nil, err - } - - f, err := os.Create(filepath.Join(db.Dir(), "prometheus.yml")) - if err != nil { - return nil, err - } - defer f.Close() - - // Some well-known external labels so that we can test label resorting - if _, err = io.WriteString(f, "global:\n external_labels:\n region: eu-west"); err != nil { - return nil, err - } - - return &Prometheus{ - dir: db.Dir(), - db: db, - prefix: prefix, - binPath: binPath, - addr: "", - }, nil -} - -// Start running the Prometheus instance and return. -func (p *Prometheus) Start(ctx context.Context, l log.Logger) error { - if p.running { - return errors.New("Already started") - } - - if err := p.db.Close(); err != nil { - return err - } - if err := p.start(); err != nil { - return err - } - if err := p.waitPrometheusUp(ctx, l, p.prefix); err != nil { - return err - } - return nil -} - -func (p *Prometheus) start() error { - port, err := FreePort() - if err != nil { - return err - } - - var extra []string - if p.disabledCompaction { - extra = append(extra, - "--storage.tsdb.min-block-duration=2h", - "--storage.tsdb.max-block-duration=2h", - ) - } - p.addr = fmt.Sprintf("localhost:%d", port) - // Write the final config to the config file. - // The address placeholder will be replaced with the actual address. - if err := p.writeConfig(strings.ReplaceAll(p.config, PromAddrPlaceHolder, p.addr)); err != nil { - return err - } - args := append([]string{ - "--storage.tsdb.retention=2d", // Pass retention cause prometheus since 2.8.0 don't show default value for that flags in web/api: https://github.com/prometheus/prometheus/pull/5433. - "--storage.tsdb.path=" + p.db.Dir(), - "--web.listen-address=" + p.addr, - "--web.route-prefix=" + p.prefix, - "--web.enable-admin-api", - "--config.file=" + filepath.Join(p.db.Dir(), "prometheus.yml"), - }, extra...) - - p.cmd = exec.Command(p.binPath, args...) - p.cmd.SysProcAttr = SysProcAttr() - - p.stderr.Reset() - p.stdout.Reset() - - p.cmd.Stdout = &p.stdout - p.cmd.Stderr = &p.stderr - - if err := p.cmd.Start(); err != nil { - return fmt.Errorf("starting Prometheus failed: %w", err) - } - - p.running = true - return nil -} - -func (p *Prometheus) waitPrometheusUp(ctx context.Context, logger log.Logger, prefix string) error { - if !p.running { - return errors.New("method Start was not invoked.") - } - return runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - r, err := http.Get(fmt.Sprintf("http://%s%s/-/ready", p.addr, prefix)) - if err != nil { - return err - } - defer runutil.ExhaustCloseWithLogOnErr(logger, r.Body, "failed to exhaust and close body") - - if r.StatusCode != 200 { - return errors.Errorf("Got non 200 response: %v", r.StatusCode) - } - return nil - }) -} - -func (p *Prometheus) Restart(ctx context.Context, l log.Logger) error { - if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { - return errors.Wrap(err, "failed to kill Prometheus. Kill it manually") - } - _ = p.cmd.Wait() - if err := p.start(); err != nil { - return err - } - return p.waitPrometheusUp(ctx, l, p.prefix) -} - -// Dir returns TSDB dir. -func (p *Prometheus) Dir() string { - return p.dir -} - -// Addr returns correct address after Start method. -func (p *Prometheus) Addr() string { - return p.addr + p.prefix -} - -func (p *Prometheus) DisableCompaction() { - p.disabledCompaction = true -} - -// SetConfig updates the contents of the config. -func (p *Prometheus) SetConfig(s string) { - p.config = s -} - -// writeConfig writes the Prometheus config to the config file. -func (p *Prometheus) writeConfig(config string) (err error) { - f, err := os.Create(filepath.Join(p.dir, "prometheus.yml")) - if err != nil { - return err - } - defer runutil.CloseWithErrCapture(&err, f, "prometheus config") - _, err = f.Write([]byte(config)) - return err -} - -// Stop terminates Prometheus and clean up its data directory. -func (p *Prometheus) Stop() (rerr error) { - if !p.running { - return nil - } - - if p.cmd.Process != nil { - if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { - return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir()) - } - - err := p.cmd.Wait() - if err != nil { - var exitErr *exec.ExitError - if errors.As(err, &exitErr) { - if exitErr.ExitCode() != -1 { - fmt.Fprintln(os.Stderr, "Prometheus exited with", exitErr.ExitCode()) - fmt.Fprintln(os.Stderr, "stdout:\n", p.stdout.String(), "\nstderr:\n", p.stderr.String()) - } else { - err = nil - } - } - } - - if err != nil { - return fmt.Errorf("waiting for Prometheus to exit: %w", err) - } - } - - return p.cleanup() -} - -func (p *Prometheus) cleanup() error { - p.running = false - return os.RemoveAll(p.db.Dir()) -} - -// Appender returns a new appender to populate the Prometheus instance with data. -// All appenders must be closed before Start is called and no new ones must be opened -// afterwards. -func (p *Prometheus) Appender() storage.Appender { - if p.running { - panic("Appender must not be called after start") - } - return p.db.Appender(context.Background()) -} - -// CreateEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. -// (Prometheus pre v2.7.0). -func CreateEmptyBlock(dir string, mint, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) - - if err := os.Mkdir(path.Join(dir, uid.String()), os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - if err := os.Mkdir(path.Join(dir, uid.String(), "chunks"), os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - w, err := index.NewWriter(context.Background(), path.Join(dir, uid.String(), "index")) - if err != nil { - return ulid.ULID{}, errors.Wrap(err, "new index") - } - - if err := w.Close(); err != nil { - return ulid.ULID{}, errors.Wrap(err, "close index") - } - - m := tsdb.BlockMeta{ - Version: 1, - ULID: uid, - MinTime: mint, - MaxTime: maxt, - Compaction: tsdb.BlockMetaCompaction{ - Level: 1, - Sources: []ulid.ULID{uid}, - }, - } - b, err := json.Marshal(&m) - if err != nil { - return ulid.ULID{}, err - } - - if err := os.WriteFile(path.Join(dir, uid.String(), "meta.json"), b, os.ModePerm); err != nil { - return ulid.ULID{}, errors.Wrap(err, "saving meta.json") - } - - if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, uid.String()), metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: resolution}, - Source: metadata.TestSource, - }, nil); err != nil { - return ulid.ULID{}, errors.Wrap(err, "finalize block") - } - - return uid, nil -} - -// CreateBlock writes a block with the given series and numSamples samples each. -// Samples will be in the time range [mint, maxt). -func CreateBlock( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - extLset labels.Labels, - resolution int64, - hashFunc metadata.HashFunc, -) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, chunkenc.ValFloat) -} - -// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block. -func CreateBlockWithTombstone( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - extLset labels.Labels, - resolution int64, - hashFunc metadata.HashFunc, -) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true, hashFunc, chunkenc.ValFloat) -} - -// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each. -// Samples will be in the time range [mint, maxt) -// Block ID will be created with a delay of time duration blockDelay. -func CreateBlockWithBlockDelay( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - blockDelay time.Duration, - extLset labels.Labels, - resolution int64, - hashFunc metadata.HashFunc, -) (ulid.ULID, error) { - return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValFloat) -} - -// CreateHistogramBlockWithDelay writes a block with the given native histogram series and numSamples samples each. -// Samples will be in the time range [mint, maxt). -func CreateHistogramBlockWithDelay( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - blockDelay time.Duration, - extLset labels.Labels, - resolution int64, - hashFunc metadata.HashFunc, -) (id ulid.ULID, err error) { - return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValHistogram) -} - -// CreateFloatHistogramBlockWithDelay writes a block with the given float native histogram series and numSamples samples each. -// Samples will be in the time range [mint, maxt). -func CreateFloatHistogramBlockWithDelay( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - blockDelay time.Duration, - extLset labels.Labels, - resolution int64, - hashFunc metadata.HashFunc, -) (id ulid.ULID, err error) { - return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValFloatHistogram) -} - -func createBlockWithDelay(ctx context.Context, dir string, series []labels.Labels, numSamples int, mint int64, maxt int64, blockDelay time.Duration, extLset labels.Labels, resolution int64, hashFunc metadata.HashFunc, samplesType chunkenc.ValueType) (ulid.ULID, error) { - blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, samplesType) - if err != nil { - return ulid.ULID{}, errors.Wrap(err, "block creation") - } - - id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy())) - if err != nil { - return ulid.ULID{}, errors.Wrap(err, "create block id") - } - - bdir := path.Join(dir, blockID.String()) - m, err := metadata.ReadFromDir(bdir) - if err != nil { - return ulid.ULID{}, errors.Wrap(err, "open meta file") - } - - logger := log.NewNopLogger() - m.ULID = id - m.Compaction.Sources = []ulid.ULID{id} - if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { - return ulid.ULID{}, errors.Wrap(err, "write meta.json file") - } - - return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String())) -} - -func createBlock( - ctx context.Context, - dir string, - series []labels.Labels, - numSamples int, - mint, maxt int64, - extLset labels.Labels, - resolution int64, - tombstones bool, - hashFunc metadata.HashFunc, - sampleType chunkenc.ValueType, -) (id ulid.ULID, err error) { - headOpts := tsdb.DefaultHeadOptions() - headOpts.ChunkDirRoot = filepath.Join(dir, "chunks") - headOpts.ChunkRange = 10000000000 - headOpts.EnableNativeHistograms = *atomic.NewBool(true) - h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) - if err != nil { - return id, errors.Wrap(err, "create head block") - } - defer func() { - runutil.CloseWithErrCapture(&err, h, "TSDB Head") - if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil { - err = errors.Wrap(e, "delete chunks dir") - } - }() - - var g errgroup.Group - var timeStepSize = (maxt - mint) / int64(numSamples+1) - var batchSize = len(series) / runtime.GOMAXPROCS(0) - r := rand.New(rand.NewSource(int64(numSamples))) - var randMutex sync.Mutex - - for len(series) > 0 { - l := batchSize - if len(series) < 1000 { - l = len(series) - } - batch := series[:l] - series = series[l:] - - g.Go(func() error { - t := mint - - for i := 0; i < numSamples; i++ { - app := h.Appender(ctx) - - for _, lset := range batch { - var err error - if sampleType == chunkenc.ValFloat { - randMutex.Lock() - _, err = app.Append(0, lset, t, r.Float64()) - randMutex.Unlock() - } else if sampleType == chunkenc.ValHistogram { - _, err = app.AppendHistogram(0, lset, t, &histogramSample, nil) - } else if sampleType == chunkenc.ValFloatHistogram { - _, err = app.AppendHistogram(0, lset, t, nil, &floatHistogramSample) - } - if err != nil { - if rerr := app.Rollback(); rerr != nil { - err = errors.Wrapf(err, "rollback failed: %v", rerr) - } - - return errors.Wrap(err, "add sample") - } - } - if err := app.Commit(); err != nil { - return errors.Wrap(err, "commit") - } - t += timeStepSize - } - return nil - }) - } - if err := g.Wait(); err != nil { - return id, err - } - c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil) - if err != nil { - return id, errors.Wrap(err, "create compactor") - } - - id, err = c.Write(dir, h, mint, maxt, nil) - if err != nil { - return id, errors.Wrap(err, "write block") - } - - if id.Compare(ulid.ULID{}) == 0 { - return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) - } - - blockDir := filepath.Join(dir, id.String()) - logger := log.NewNopLogger() - seriesSize, err := gatherMaxSeriesSize(ctx, filepath.Join(blockDir, "index")) - if err != nil { - return id, errors.Wrap(err, "gather max series size") - } - - files := []metadata.File{} - if hashFunc != metadata.NoneFunc { - paths := []string{} - if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - return nil - } - paths = append(paths, path) - return nil - }); err != nil { - return id, errors.Wrapf(err, "walking %s", dir) - } - - for _, p := range paths { - pHash, err := metadata.CalculateHash(p, metadata.SHA256Func, log.NewNopLogger()) - if err != nil { - return id, errors.Wrapf(err, "calculating hash of %s", blockDir+p) - } - files = append(files, metadata.File{ - RelPath: strings.TrimPrefix(p, blockDir+"/"), - Hash: &pHash, - }) - } - } - - if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: resolution}, - Source: metadata.TestSource, - Files: files, - IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, - }, nil); err != nil { - return id, errors.Wrap(err, "finalize block") - } - - if !tombstones { - if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil { - return id, errors.Wrap(err, "remove tombstones") - } - } - - return id, nil -} - -func gatherMaxSeriesSize(ctx context.Context, fn string) (int64, error) { - r, err := index.NewFileReader(fn) - if err != nil { - return 0, errors.Wrap(err, "open index file") - } - defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") - - key, value := index.AllPostingsKey() - p, err := r.Postings(ctx, key, value) - if err != nil { - return 0, errors.Wrap(err, "get all postings") - } - - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - offsetMultiplier := 1 - version := r.Version() - if version >= 2 { - offsetMultiplier = 16 - } - - // Per series. - var ( - prevId storage.SeriesRef - maxSeriesSize int64 - ) - for p.Next() { - id := p.At() - if prevId != 0 { - // Approximate size. - seriesSize := int64(id-prevId) * int64(offsetMultiplier) - if seriesSize > maxSeriesSize { - maxSeriesSize = seriesSize - } - } - prevId = id - } - if p.Err() != nil { - return 0, errors.Wrap(err, "walk postings") - } - - return maxSeriesSize, nil -} - -var indexFilename = "index" - -type indexWriterSeries struct { - labels labels.Labels - chunks []chunks.Meta // series file offset of chunks -} - -type indexWriterSeriesSlice []*indexWriterSeries - -// PutOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk -// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346 -func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { - - if minTime >= maxTime || minTime+4 >= maxTime { - return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks") - } - - lbls := []labels.Labels{ - labels.FromStrings("lbl1", "1"), - } - - // Sort labels as the index writer expects series in sorted order. - sort.Sort(labels.Slice(lbls)) - - symbols := map[string]struct{}{} - for _, lset := range lbls { - lset.Range(func(l labels.Label) { - symbols[l.Name] = struct{}{} - symbols[l.Value] = struct{}{} - }) - } - - var input indexWriterSeriesSlice - - // Generate ChunkMetas for every label set. - // Ignoring gosec as it is only used for tests. - for _, lset := range lbls { - var metas []chunks.Meta - // only need two chunks that are out-of-order - chk1 := chunks.Meta{ - MinTime: maxTime - 2, - MaxTime: maxTime - 1, - Ref: chunks.ChunkRef(rand.Uint64()), // nolint:gosec - Chunk: chunkenc.NewXORChunk(), - } - metas = append(metas, chk1) - chk2 := chunks.Meta{ - MinTime: minTime + 1, - MaxTime: minTime + 2, - Ref: chunks.ChunkRef(rand.Uint64()), // nolint:gosec - Chunk: chunkenc.NewXORChunk(), - } - metas = append(metas, chk2) - - input = append(input, &indexWriterSeries{ - labels: lset, - chunks: metas, - }) - } - - iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename)) - if err != nil { - return err - } - - syms := []string{} - for s := range symbols { - syms = append(syms, s) - } - sort.Strings(syms) - for _, s := range syms { - if err := iw.AddSymbol(s); err != nil { - return err - } - } - - // Population procedure as done by compaction. - var ( - postings = index.NewMemPostings() - values = map[string]map[string]struct{}{} - ) - - for i, s := range input { - if err := iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...); err != nil { - return err - } - - s.labels.Range(func(l labels.Label) { - valset, ok := values[l.Name] - if !ok { - valset = map[string]struct{}{} - values[l.Name] = valset - } - valset[l.Value] = struct{}{} - }) - postings.Add(storage.SeriesRef(i), s.labels) - } - - return iw.Close() -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr.go deleted file mode 100644 index 53aaa7039f..0000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -//go:build !linux -// +build !linux - -package e2eutil - -import "syscall" - -func SysProcAttr() *syscall.SysProcAttr { - return &syscall.SysProcAttr{} -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr_linux.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr_linux.go deleted file mode 100644 index dd77ed32a1..0000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/sysprocattr_linux.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package e2eutil - -import "syscall" - -func SysProcAttr() *syscall.SysProcAttr { - return &syscall.SysProcAttr{ - // For linux only, kill this if the go test process dies before the cleanup. - Pdeathsig: syscall.SIGKILL, - } -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 67e1ec0608..21677d13d6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -798,7 +798,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce +# github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6 ## explicit; go 1.20 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -978,7 +978,6 @@ github.com/thanos-io/thanos/pkg/store/storepb/prompb github.com/thanos-io/thanos/pkg/strutil github.com/thanos-io/thanos/pkg/targets/targetspb github.com/thanos-io/thanos/pkg/tenancy -github.com/thanos-io/thanos/pkg/testutil/e2eutil github.com/thanos-io/thanos/pkg/tls github.com/thanos-io/thanos/pkg/tracing github.com/thanos-io/thanos/pkg/tracing/migration