Skip to content

Conversation

adarshsanjeev
Copy link
Contributor

@adarshsanjeev adarshsanjeev commented Apr 10, 2025

Description

This PR is a followup to #17863. This allows bringing up of copies of existing historicals. However, we might or might not want to query these historicals. We would not want to query historicals which are busy catching up to their source historical. On the other hand, once it has caught up, we might want to only query the clone, to see if it is performing as expected. This PR adds the logic for brokers to decide if they should query clones or not.

The coordinator dynamic configuration which contains the cloneServers configuration is synced to all brokers, by the Coordinator. This allows brokers to know which servers are clones, and their sources. This PR adds new internal Broker APIs to get and set the coordinator dynamic configuration.

There is a new Query context parameter cloneQueryMode. Setting this parameter to excludeClones (default) causes brokers to not query any clone historicals. Setting this parameter to preferClones causes brokers to only query the clones and not their source historicals. Setting this parameter to includeClones causes brokers to query all historicals without considering clone status.

This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.

The PR also adds some new experimental APIs which return the current status of the servers undergoing cloning.

Fixed a few misc. issues I ran into while testing the PR:

  • Fixed equals in CoordinatorDynamicConfig
  • Fixed CachingClusteredClientBenchmark, which was running into NPEs on master.

APIs

Add the following new experimental Broker APIs:

Method Path Description Required Permission
GET /druid-internal/v1/config/coordinator Get the coordinator dynamic config Read configs
POST /druid-internal/v1/config/coordinator Update the coordinator dynamic config Write configs

Add the following new experimental Coordinator APIs:

Method Path Description Required Permission
GET /druid/coordinator/v1/config/syncedBrokers Get the broker sync status for coordinator dynamic configs Read state
GET /druid/coordinator/v1/config/cloneStatus Returns the status of cloning operations Read state

Release note

  • Add a new Query Context parameter, cloneQueryMode. Setting this parameter to excludeClones (default) causes brokers to not query any clone historicals. Setting this parameter to preferClones causes brokers to only query the clones and not their source historicals. Setting this parameter to includeClones causes brokers to query all historicals without considering clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.
  • Add new Coordinator API /druid/coordinator/v1/cloneStatus to get information about ongoing cloning operations.
  • Add new Coordinator API /druid/coordinator/v1/brokerConfigurationStatus which returns the broker sync status for coordinator dynamic configs.

Benchmarks

Ran CachingClusteredClientBenchmark without and without the changes.

Without

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score        Error  Units
CachingClusteredClientBenchmark.groupByQuery                8              0                 all             75000  avgt    5   187233.351 ±   7838.503  us/op
CachingClusteredClientBenchmark.groupByQuery                8              0              minute             75000  avgt    5   407622.155 ± 103559.490  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1                 all             75000  avgt    5   187017.294 ±   7038.658  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1              minute             75000  avgt    5   349598.324 ±   8100.937  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4                 all             75000  avgt    5    97731.983 ±  16805.117  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4              minute             75000  avgt    5   191754.447 ±  43432.738  us/op
CachingClusteredClientBenchmark.groupByQuery               24              0                 all             75000  avgt    5   632823.321 ±  20617.937  us/op
CachingClusteredClientBenchmark.groupByQuery               24              0              minute             75000  avgt    5  1140958.279 ±  80029.871  us/op
CachingClusteredClientBenchmark.groupByQuery               24              1                 all             75000  avgt    5   618475.284 ±   9633.444  us/op
CachingClusteredClientBenchmark.groupByQuery               24              1              minute             75000  avgt    5  1393390.822 ± 153339.013  us/op
CachingClusteredClientBenchmark.groupByQuery               24              4                 all             75000  avgt    5   298711.218 ±  23541.337  us/op
CachingClusteredClientBenchmark.groupByQuery               24              4              minute             75000  avgt    5   507683.242 ±  77316.315  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              0                 all             75000  avgt    5     1826.244 ±     45.285  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              0              minute             75000  avgt    5    12003.012 ±    137.467  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1                 all             75000  avgt    5     1897.041 ±     26.805  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1              minute             75000  avgt    5     9521.420 ±    646.388  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4                 all             75000  avgt    5     1099.501 ±     51.809  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4              minute             75000  avgt    5     5251.361 ±    120.823  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              0                 all             75000  avgt    5     5475.769 ±     84.363  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              0              minute             75000  avgt    5    34702.377 ±    490.230  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              1                 all             75000  avgt    5     5711.180 ±    163.640  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              1              minute             75000  avgt    5    31741.588 ±    152.680  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              4                 all             75000  avgt    5     2142.773 ±     21.874  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              4              minute             75000  avgt    5    10736.056 ±    359.058  us/op
CachingClusteredClientBenchmark.topNQuery                   8              0                 all             75000  avgt    5    12423.740 ±     90.025  us/op
CachingClusteredClientBenchmark.topNQuery                   8              0              minute             75000  avgt    5   323617.067 ±   4802.340  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1                 all             75000  avgt    5    12568.755 ±    720.204  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1              minute             75000  avgt    5   307477.185 ±   4021.462  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4                 all             75000  avgt    5    12507.516 ±    164.989  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4              minute             75000  avgt    5   315878.214 ±   6988.225  us/op
CachingClusteredClientBenchmark.topNQuery                  24              0                 all             75000  avgt    5    37568.186 ±    557.597  us/op
CachingClusteredClientBenchmark.topNQuery                  24              0              minute             75000  avgt    5  1043045.982 ±  41938.790  us/op
CachingClusteredClientBenchmark.topNQuery                  24              1                 all             75000  avgt    5    37145.213 ±    311.788  us/op
CachingClusteredClientBenchmark.topNQuery                  24              1              minute             75000  avgt    5  1062463.098 ±  39684.630  us/op
CachingClusteredClientBenchmark.topNQuery                  24              4                 all             75000  avgt    5    37023.011 ±    224.322  us/op
CachingClusteredClientBenchmark.topNQuery                  24              4              minute             75000  avgt    5  1026412.293 ±  36995.235  us/op

New

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score        Error  Units
CachingClusteredClientBenchmark.groupByQuery                8              0                 all             75000  avgt    5   190210.512 ±  16174.555  us/op
CachingClusteredClientBenchmark.groupByQuery                8              0              minute             75000  avgt    5   373212.855 ±  15008.167  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1                 all             75000  avgt    5   187908.512 ±   6028.602  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1              minute             75000  avgt    5   367360.525 ±  19929.920  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4                 all             75000  avgt    5    95069.466 ±   2590.051  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4              minute             75000  avgt    5   172162.045 ±   6195.149  us/op
CachingClusteredClientBenchmark.groupByQuery               24              0                 all             75000  avgt    5   576612.400 ±  21533.053  us/op
CachingClusteredClientBenchmark.groupByQuery               24              0              minute             75000  avgt    5  1135360.679 ±  50404.299  us/op
CachingClusteredClientBenchmark.groupByQuery               24              1                 all             75000  avgt    5   616102.833 ±  15667.025  us/op
CachingClusteredClientBenchmark.groupByQuery               24              1              minute             75000  avgt    5  1167650.076 ±  46926.246  us/op
CachingClusteredClientBenchmark.groupByQuery               24              4                 all             75000  avgt    5   250064.671 ±   9261.147  us/op
CachingClusteredClientBenchmark.groupByQuery               24              4              minute             75000  avgt    5   534894.038 ± 190468.714  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              0                 all             75000  avgt    5     1808.930 ±     11.269  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              0              minute             75000  avgt    5    10066.325 ±     56.242  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1                 all             75000  avgt    5     1910.453 ±     68.603  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1              minute             75000  avgt    5     9894.090 ±    187.481  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4                 all             75000  avgt    5     1098.325 ±      8.424  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4              minute             75000  avgt    5     5143.380 ±    182.825  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              0                 all             75000  avgt    5     5492.820 ±    175.701  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              0              minute             75000  avgt    5    33331.318 ±   1322.841  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              1                 all             75000  avgt    5     5796.028 ±    520.619  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              1              minute             75000  avgt    5    33760.302 ±    840.375  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              4                 all             75000  avgt    5     2243.756 ±    279.051  us/op
CachingClusteredClientBenchmark.timeseriesQuery            24              4              minute             75000  avgt    5    11869.440 ±    782.153  us/op
CachingClusteredClientBenchmark.topNQuery                   8              0                 all             75000  avgt    5    12618.775 ±    210.252  us/op
CachingClusteredClientBenchmark.topNQuery                   8              0              minute             75000  avgt    5   299882.581 ±  12215.972  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1                 all             75000  avgt    5    12954.234 ±    231.822  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1              minute             75000  avgt    5   305808.360 ±  13212.527  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4                 all             75000  avgt    5    12964.849 ±   1378.768  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4              minute             75000  avgt    5   311156.484 ±   6341.480  us/op
CachingClusteredClientBenchmark.topNQuery                  24              0                 all             75000  avgt    5    42335.406 ±   2293.778  us/op
CachingClusteredClientBenchmark.topNQuery                  24              0              minute             75000  avgt    5  1110729.188 ± 105436.580  us/op
CachingClusteredClientBenchmark.topNQuery                  24              1                 all             75000  avgt    5    37290.951 ±    117.235  us/op
CachingClusteredClientBenchmark.topNQuery                  24              1              minute             75000  avgt    5  1041009.929 ±  10065.510  us/op
CachingClusteredClientBenchmark.topNQuery                  24              4                 all             75000  avgt    5    40098.780 ±    933.232  us/op
CachingClusteredClientBenchmark.topNQuery                  24              4              minute             75000  avgt    5  1042889.098 ±  27988.509  us/op

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some initial thoughts.

@kfaraz kfaraz marked this pull request as ready for review April 14, 2025 08:09
@adarshsanjeev
Copy link
Contributor Author

Will update documentation regarding the context parameter and APIs soon.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Apr 15, 2025
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM!!

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several improvements that are needed in this PR.
There are comments from @clintropolis too that need to be addressed.

But approving it for the time being to unblock work.

{
private final String host;
private final int port;
private final long syncTimeInMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this represents a timestamp since epoch, it is expected to be in millis (since that is the convention followed in the rest of Druid).
We can just call this lastSyncTime or lastSyncTimestamp.

Suggested change
private final long syncTimeInMs;
private final long syncTimeInMs;

Copy link
Contributor

@cryptoe cryptoe Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this represents a timestamp since epoch, it is expected to be in millis (since that is the convention followed in the rest of Druid).

Is there some dev documentation which mentions the above ?

IMHO units are better represented with the variable name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately, it is not documented anywhere.

I agree that calling out the units removes the ambiguity, esp. when it comes to specifying durations.

(Side note: when writing configs for durations, we should either just call out the unit in the config name or accept a Duration or Period in the config value. We should probably document this somewhere as a Druid coding guideline.)

But I actually had other concerns with this field name.
syncTimeInMs sounds more like the duration it took to finish a sync, rather than the last sync timestamp.

So we should definitely call it lastSyncTimestamp.

For clarity, we can add the unit suffix and calling it lastSyncTimestampMillis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @adarshsanjeev , note for follow up

@adarshsanjeev adarshsanjeev added the Needs web console change Backend API changes that would benefit from frontend support in the web console label Apr 28, 2025
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvements, @adarshsanjeev .
I have left some minor suggestions that can be addressed in a follow up PR.

|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow up: Use preferClones instead of clonesPreferred.

public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final CloneQueryMode DEFAULT_CLONE_QUERY_MODE = CloneQueryMode.EXCLUDECLONES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for follow-up: Should the default be exclude or include?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default should be exclude as we do not want clone servers to be queried by default. They might be running different versions of the code etc, which means that by default we do not want normal queries to be aware of these servers. In fact, should INCLUDE be removed? I don't know of a use case for them, other than ensuring that the query hits any possible server if available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see your point. The use case doesn't seem all that important, except maybe to ensure that segment remains available even when the source server flickers (but that is already handled by having 2 replicas).
In fact, the includeClones mode might actually cause changes in query performance in a non-deterministic manner.

Let's get rid of it for now. We can add it back if there seems to be a real need for it.

// Don't remove either.
return Set.of();
default:
throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up:

Suggested change
throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode);
throw DruidException.defensive("Unexpected value of cloneQueryMode[%s]", cloneQueryMode);

{
private final String host;
private final int port;
private final long syncTimeInMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately, it is not documented anywhere.

I agree that calling out the units removes the ambiguity, esp. when it comes to specifying durations.

(Side note: when writing configs for durations, we should either just call out the unit in the config name or accept a Duration or Period in the config value. We should probably document this somewhere as a Druid coding guideline.)

But I actually had other concerns with this field name.
syncTimeInMs sounds more like the duration it took to finish a sync, rather than the last sync timestamp.

So we should definitely call it lastSyncTimestamp.

For clarity, we can add the unit suffix and calling it lastSyncTimestampMillis.

{
private final String host;
private final int port;
private final long syncTimeInMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @adarshsanjeev , note for follow up

this.configManager = configManager;
this.jsonMapper = jsonMapper;
this.druidNodeDiscovery = druidNodeDiscoveryProvider;
this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for follow up: There should preferably be a lifecycle stop method which shuts down this executor, as it allows for nicer cleanup of the resources used by this class.

@kfaraz
Copy link
Contributor

kfaraz commented Apr 28, 2025

Proceeding with the merge as all the requested changes have been made.
@clintropolis , @cryptoe , please feel free to add comments if you feel there is room for further improvement,
that can be taken up in follow up PRs.

@kfaraz kfaraz merged commit f1c6f26 into apache:master Apr 28, 2025
74 checks passed
adarshsanjeev added a commit that referenced this pull request May 2, 2025
Add metrics for monitoring cloning and broker config syncing. These metrics are:
- config/brokerSync/time
- config/brokerSync/total/time
- config/brokerSync/error

Also addresses followup comments from #17899 including:
- Fixing documentation of context parameter
- Add a more graceful shutdown
- Rename some fields for consistency
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying Design Review Needs web console change Backend API changes that would benefit from frontend support in the web console

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants