Skip to content

Make max outstanding queries per tenant config in limits #4991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [ENHANCEMENT] Update Go version to 1.19.3. #4988
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005
Expand Down
21 changes: 14 additions & 7 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ runtime_config:
[memberlist: <memberlist_config>]

query_scheduler:
# Maximum number of outstanding requests per tenant per query-scheduler.
# In-flight requests above this limit will fail with HTTP response status code
# 429.
# Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and
# will be removed in v1.17.0: Maximum number of outstanding requests per
# tenant per query-scheduler. In-flight requests above this limit will fail
# with HTTP response status code 429.
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]
[max_outstanding_requests_per_tenant: <int> | default = 0]

# If a querier disconnects without sending notification about graceful
# shutdown, the query-scheduler will keep the querier in the tenant's shard
Expand Down Expand Up @@ -916,10 +917,11 @@ The `query_frontend_config` configures the Cortex query-frontend.
# CLI flag: -frontend.query-stats-enabled
[query_stats_enabled: <boolean> | default = false]

# Maximum number of outstanding requests per tenant per frontend; requests
# beyond this error with HTTP 429.
# Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will
# be removed in v1.17.0: Maximum number of outstanding requests per tenant per
# frontend; requests beyond this error with HTTP 429.
# CLI flag: -querier.max-outstanding-requests-per-tenant
[max_outstanding_per_tenant: <int> | default = 100]
[max_outstanding_per_tenant: <int> | default = 0]

# If a querier disconnects without sending notification about graceful shutdown,
# the query-frontend will keep the querier in the tenant's shard until the
Expand Down Expand Up @@ -2724,6 +2726,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <int> | default = 0]

# Maximum number of outstanding requests per tenant per request queue (either
# query frontend or query scheduler); requests beyond this error with HTTP 429.
# CLI flag: -frontend.max-outstanding-requests-per-tenant
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. i think make sense this limit be related to frontend if we use or not the scheduler.

[max_outstanding_requests_per_tenant: <int> | default = 100]

# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
11 changes: 2 additions & 9 deletions pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/frontend/transport"
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -253,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

rt, v1, v2, err := InitFrontend(config, limits{}, 0, logger, nil)
rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil)
require.NoError(t, err)
require.NotNil(t, rt)
// v1 will be nil if DownstreamURL is defined.
Expand Down Expand Up @@ -306,11 +307,3 @@ func defaultFrontendConfig() CombinedFrontendConfig {
flagext.DefaultValues(&config.FrontendV2)
return config
}

type limits struct {
queriers int
}

func (l limits) MaxQueriersPerUser(_ string) int {
return l.queriers
}
16 changes: 14 additions & 2 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,25 @@ type Config struct {

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 0, "Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will be removed in v1.17.0: Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
}

type Limits interface {
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int

queue.Limits
}

// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
Queriers int
queue.MockLimits
}

func (l MockLimits) MaxQueriersPerUser(_ string) int {
return l.Queriers
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
Expand Down Expand Up @@ -100,7 +112,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

var err error
Expand Down
13 changes: 4 additions & 9 deletions pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestFrontendPropagateTrace(t *testing.T) {
}

func TestFrontendCheckReady(t *testing.T) {
limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
for _, tt := range []struct {
name string
connectedClients int
Expand All @@ -131,6 +132,7 @@ func TestFrontendCheckReady(t *testing.T) {
requestQueue: queue.NewRequestQueue(5, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
limits,
),
}
for i := 0; i < tt.connectedClients; i++ {
Expand Down Expand Up @@ -243,7 +245,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

v1, err := New(config, limits{}, logger, reg)
limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
v1, err := New(config, limits, logger, reg)
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
Expand Down Expand Up @@ -292,11 +295,3 @@ func defaultFrontendConfig() Config {
flagext.DefaultValues(&config)
return config
}

type limits struct {
queriers int
}

func (l limits) MaxQueriersPerUser(_ string) int {
return l.queriers
}
4 changes: 3 additions & 1 deletion pkg/frontend/v1/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import (
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)

func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
logger := log.NewNopLogger()

frontend, err := New(config, limits{queriers: 3}, logger, nil)
limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}}
frontend, err := New(config, limits, logger, nil)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type RequestQueue struct {
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
)
queues = append(queues, queue)

Expand Down Expand Up @@ -83,6 +84,7 @@ func BenchmarkQueueRequest(b *testing.B) {
q := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
)

for ix := 0; ix < queriers; ix++ {
Expand Down Expand Up @@ -115,7 +117,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe

queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
)

// Start the queue service.
ctx := context.Background()
Expand Down
29 changes: 27 additions & 2 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

// Limits needed for the Query Scheduler - interface used for decoupling.
type Limits interface {
// MaxOutstandingPerTenant returns the limit to the maximum number
// of outstanding requests per tenant per request queue.
MaxOutstandingPerTenant(user string) int
}

// querier holds information about a querier registered in the queue.
type querier struct {
// Number of active connections.
Expand Down Expand Up @@ -41,6 +48,8 @@ type queues struct {

// Sorted list of querier names, used when creating per-user shard.
sortedQueriers []string

limits Limits
}

type userQueue struct {
Expand All @@ -59,14 +68,15 @@ type userQueue struct {
index int
}

func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues {
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues {
return &queues{
userQueues: map[string]*userQueue{},
users: nil,
maxUserQueueSize: maxUserQueueSize,
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
sortedQueriers: nil,
limits: limits,
}
}

Expand Down Expand Up @@ -106,8 +116,14 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request {
uq := q.userQueues[userID]

if uq == nil {
queueSize := q.limits.MaxOutstandingPerTenant(userID)
// 0 is the default value of the flag. If the old flag is set
// then we use its value for compatibility reason.
if q.maxUserQueueSize != 0 {
queueSize = q.maxUserQueueSize
}
uq = &userQueue{
ch: make(chan Request, q.maxUserQueueSize),
ch: make(chan Request, queueSize),
seed: util.ShuffleShardSeed(userID, ""),
index: -1,
}
Expand Down Expand Up @@ -303,3 +319,12 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri

return result
}

// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
MaxOutstanding int
}

func (l MockLimits) MaxOutstandingPerTenant(_ string) int {
return l.MaxOutstanding
}
10 changes: 5 additions & 5 deletions pkg/scheduler/queue/user_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestQueues(t *testing.T) {
uq := newUserQueues(0, 0)
uq := newUserQueues(0, 0, MockLimits{})
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -68,7 +68,7 @@ func TestQueues(t *testing.T) {
}

func TestQueuesWithQueriers(t *testing.T) {
uq := newUserQueues(0, 0)
uq := newUserQueues(0, 0, MockLimits{})
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -145,7 +145,7 @@ func TestQueuesConsistency(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
uq := newUserQueues(0, testData.forgetDelay)
uq := newUserQueues(0, testData.forgetDelay, MockLimits{})
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -194,7 +194,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
)

now := time.Now()
uq := newUserQueues(0, forgetDelay)
uq := newUserQueues(0, forgetDelay, MockLimits{})
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -286,7 +286,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
)

now := time.Now()
uq := newUserQueues(0, forgetDelay)
uq := newUserQueues(0, forgetDelay, MockLimits{})
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Config struct {
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 0, "Deprecated (use frontend.max-outstanding-requests-per-tenant instead) and will be removed in v1.17.0: Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
}
Expand All @@ -111,7 +111,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests)
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits)

s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Expand Down Expand Up @@ -143,6 +143,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
type Limits interface {
// MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int

queue.Limits
}

type schedulerRequest struct {
Expand Down
12 changes: 3 additions & 9 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/httpgrpcutil"
Expand All @@ -35,7 +37,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu
flagext.DefaultValues(&cfg)
cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant

s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg)
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: 100}}, log.NewNopLogger(), reg)
require.NoError(t, err)

server := grpc.NewServer()
Expand Down Expand Up @@ -494,14 +496,6 @@ func verifyNoPendingRequestsLeft(t *testing.T, scheduler *Scheduler) {
})
}

type limits struct {
queriers int
}

func (l limits) MaxQueriersPerUser(_ string) int {
return l.queriers
}

type frontendMock struct {
mu sync.Mutex
resp map[uint64]*httpgrpc.HTTPResponse
Expand Down
Loading