Skip to content

Executing child queries on queriers #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 97 commits into from

Conversation

owen-d
Copy link
Contributor

@owen-d owen-d commented Oct 11, 2019

Disclaimer

This is not finished, but I'd like feedback on the design.

What

These changes aim to introduce a path towards further distributing queries. Currently Cortex dispatches queries to backend queriers, but as the throughput of metrics increase, running entire queries on a single querier can become a bottleneck.

Prior Art

Sharding

The v10 schema introduced a shard factor for data which spreads series across n shards. This is a prerequisite for allowing us to query data from these shards in parallel.

Problem

Although the v10 schema change introduces a shard factor, all shards must still be processed on a single querier. This is compounded by the fact that query ASTs are not broken down before execution. Therefore, while sharding lets us split up data at it's source location, we still re-aggregate it and process it all in one place. The two goals of this PR are to fix these, by

  1. splitting up child ASTs for individual execution
  2. Splitting up selectors into n separate selectors where n = shard factor

Details

Mapping ASTs

Firstly, we introduce a package called astmapper with the interface:

// ASTMapper is the exported interface for mapping between multiple AST representations
type ASTMapper interface {
	Map(node promql.Node) (promql.Node, error)
}

This interface is used to map ASTs into different ASTs. We use this to turn i.e.

sum by (foo) (rate(bar{baz=”blip”}[1m])) ->

sum by (foo) (
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
  ...
  sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
)

This works largely because summing the sub-sums is equivalent. The principle should hold true for any merge operation that is associative, but sums are the initial focus.

Hijacking Queryable + embedding queries

Queries are executed in unison by a promql.Engine and a storage.Queryable. Since the Engine is a concrete type, it's implementation is locked. This means that the remaining option is to hijack the queryable to dispatch child queries. However, the queryable is only called to retrieve vector and matrix selector nodes -- all aggregations, functions, etc are handled by the Engine itself. Therefore, in order to regain control of these entire subtrees, we must encode them in vector/matrix selectors. This is done by stringifying an entire subtree and replacing the node with a vector or matrix selector. Currently queries are hex-encoded, but this could be something more human-friendly.

Using our previous example with a shard-factor of 3, sum by(foo) (rate(bar1{baz="blip"}[1m])) is turned into

sum by(foo) (
  __embedded_query__{__cortex_query__="73756d20627928666f6f2920287261746528626172317b5f5f636f727465785f73686172645f5f3d22305f6f665f33222c62617a3d22626c6970227d5b316d5d2929"} or
  __embedded_query__{__cortex_query__="73756d20627928666f6f2920287261746528626172317b5f5f636f727465785f73686172645f5f3d22315f6f665f33222c62617a3d22626c6970227d5b316d5d2929"} or
  __embedded_query__{__cortex_query__="73756d20627928666f6f2920287261746528626172317b5f5f636f727465785f73686172645f5f3d22325f6f665f33222c62617a3d22626c6970227d5b316d5d2929"}
)

The queryable implementation will look for these __embedded_query__ and __cortex_query__ labels and upon finding them, shell out to a downstream querier with the encoded query. The Engine will then reassemble the resulting SeriesSet, applying parent operations and merging multiple child queries as necessary.

Remaining Work

label grouping should include shard labels

  • implemented

sum by (foo) when sharded will return a vector with labels that only include foo=<value>. Due to the merge behavior of the union operator, (or), this would result in discarding data in later vectors which have the same label value for foo. Therefore, we need to turn these into:

sum by (foo) (
  sum by (foo, __cortex_shard__) (rate(bar{baz="blip", __cortex_shard__="0_of_3"})) or
  ...
)

Improve AST mapping

Splitting non-sum queries

  • implemented

We need a way to handle shard splitting for non-parallelized queries. In the sum example, we introduce __cortex_shard__ labels in the AST and parallelize them. We need to ensure we're querying the right shards for non-sum queries as well. This may be handled either in the AST (ideal as it isolates logic) or in the backend (i.e. a backend could detect which queries do not have __cortex_shard__ labels and fan-out/collect them at that level)

Better logic for determining which subtrees to execute

  • implemented this optimization will be left for a later pr

Currently, parallelizable sums will be executed on a downstream querier, but non-sum subtrees will not. In the example rate(<selector>[5m]), the selector matrix would be dispatched to a querier, but the rate would be computed over the entire series by the frontend. This is certainly suboptimal and I'll be adding more logic to correct this.

Custom impls for specific functions

  • implemented this optimization will be left for a later pr

As an example, the average function is not associative and is thus difficult to combine across shards without knowing the number of data points the average was calculated over. However, average may be remapped to be a sum(count), which is associative and parallelizes nicely.

Injecting shard configurations into the frontend

  • implemented

This could be done similarly to how PeriodConfigs are used to create composite stores (i.e. as a configuration in a yaml file) or some other method.

Nice to haves/Ideal Solution/etc

  • It would be nice to have an interface for promql.Engine so that evaluation wouldn't need to be handled in the storage.Queryable interface. It would remove the need to splice in selectors with encoded queries and allow for a cleaner, less obfuscated implementation. This may be a significant lift, though. As an example, the interface may look something like
type Engine Interface {
  // signatures for every AST node type
  EvalBinOp(left promql.Expr, right promql.Expr)  (promql.Expr, error)
  // others
}

@owen-d owen-d changed the title Executing subqueries on queriers Executing child queries on queriers Oct 11, 2019
@owen-d owen-d force-pushed the feature/query-sharding branch from 89ca6fd to 11bc349 Compare October 16, 2019 08:50
@owen-d owen-d force-pushed the feature/query-sharding branch 3 times, most recently from 97713c4 to 2fc0914 Compare October 23, 2019 16:06
@cyriltovena cyriltovena force-pushed the feature/query-sharding branch 5 times, most recently from ba5d4da to 440d515 Compare October 25, 2019 18:12
@owen-d owen-d force-pushed the feature/query-sharding branch from edfb63d to 68295bd Compare October 27, 2019 22:36
@@ -265,6 +265,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from

// Just get series for metric if there are no matchers
if len(matchers) == 0 {
level.Debug(log).Log("msg", "lookupSeriesByMetricNameMatchers: empty matcher")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably unnecessary to redeclare lookupSeriesByMetricNameMatchers as a spanlogger with this name is created above

if shard != nil {
matchers = append(matchers[:shardLabelIndex], matchers[shardLabelIndex+1:]...)
// Just get series for metric if there are no matchers
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd still need to filter the resulting seriesIDs here. I think it can more be more succinctly expressed by calculating the shard/splicing shard labels then running the len(matchers) == 0 logic once.

@owen-d owen-d force-pushed the feature/query-sharding branch 4 times, most recently from d92db3d to cf022c8 Compare November 15, 2019 22:22
@owen-d owen-d force-pushed the feature/query-sharding branch from cf022c8 to 7144afe Compare November 20, 2019 20:45
@owen-d owen-d force-pushed the feature/query-sharding branch from 0d2444f to dd72c85 Compare December 2, 2019 14:55
@owen-d owen-d force-pushed the feature/query-sharding branch from 3acf0fe to 582788f Compare December 4, 2019 15:29
@owen-d owen-d force-pushed the feature/query-sharding branch from 582788f to b3efc15 Compare December 4, 2019 15:40
@owen-d owen-d mentioned this pull request Dec 4, 2019
@owen-d
Copy link
Contributor Author

owen-d commented Dec 4, 2019

deprecated in favor of #1878

@owen-d owen-d closed this Dec 4, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants