Skip to content

Commit ba2c9d9

Browse files
author
Sukriti Sinha
committed
Staggered merges metrics
1 parent a033cd1 commit ba2c9d9

File tree

3 files changed

+196
-5
lines changed

3 files changed

+196
-5
lines changed

server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
6767
private NodeValidator nodeValidator;
6868
private ShardValidator shardValidator;
6969
private Integer allocatedProcessors;
70+
private String nodeId;
71+
private final AutoForceMergeMetrics autoForceMergeMetrics;
7072
private ResourceTrackerProvider.ResourceTrackers resourceTrackers;
7173
private final ForceMergeManagerSettings forceMergeManagerSettings;
7274
private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog);
@@ -78,14 +80,16 @@ public AutoForceMergeManager(
7880
ThreadPool threadPool,
7981
MonitorService monitorService,
8082
IndicesService indicesService,
81-
ClusterService clusterService
83+
ClusterService clusterService,
84+
AutoForceMergeMetrics autoForceMergeMetrics
8285
) {
8386
this.threadPool = threadPool;
8487
this.osService = monitorService.osService();
8588
this.fsService = monitorService.fsService();
8689
this.jvmService = monitorService.jvmService();
8790
this.clusterService = clusterService;
8891
this.indicesService = indicesService;
92+
this.autoForceMergeMetrics = autoForceMergeMetrics;
8993
this.forceMergeManagerSettings = new ForceMergeManagerSettings(clusterService, this::modifySchedulerInterval);
9094
this.task = new AsyncForceMergeTask();
9195
this.mergingShards = new HashSet<>();
@@ -98,6 +102,7 @@ protected void doStart() {
98102
this.shardValidator = new ShardValidator();
99103
this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings());
100104
this.resourceTrackers = ResourceTrackerProvider.create(threadPool);
105+
this.nodeId = clusterService.localNode().getId();
101106
}
102107

103108
@Override
@@ -119,20 +124,44 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) {
119124
}
120125

121126
private void triggerForceMerge() {
122-
if (isValidForForceMerge() == false) {
123-
return;
127+
long startTime = System.currentTimeMillis();
128+
try {
129+
if (isValidForForceMerge() == false) {
130+
return;
131+
}
132+
executeForceMergeOnShards();
133+
} finally {
134+
autoForceMergeMetrics.recordInHistogram(
135+
autoForceMergeMetrics.totalSchedulerExecutionTime,
136+
(double) System.currentTimeMillis() - startTime,
137+
autoForceMergeMetrics.getTags(nodeId, null)
138+
);
124139
}
125-
executeForceMergeOnShards();
126140
}
127141

128142
private boolean isValidForForceMerge() {
129143
if (configurationValidator.hasWarmNodes() == false) {
130144
resourceTrackers.stop();
131145
logger.debug("No warm nodes found. Skipping Auto Force merge.");
146+
autoForceMergeMetrics.incrementCounter(
147+
autoForceMergeMetrics.totalMergesSkipped,
148+
1.0,
149+
autoForceMergeMetrics.getTags(nodeId, null)
150+
);
132151
return false;
133152
}
134153
if (nodeValidator.validate().isAllowed() == false) {
135154
logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge");
155+
autoForceMergeMetrics.incrementCounter(
156+
autoForceMergeMetrics.skipsFromNodeValidator,
157+
1.0,
158+
autoForceMergeMetrics.getTags(nodeId, null)
159+
);
160+
autoForceMergeMetrics.incrementCounter(
161+
autoForceMergeMetrics.totalMergesSkipped,
162+
1.0,
163+
autoForceMergeMetrics.getTags(nodeId, null)
164+
);
136165
return false;
137166
}
138167
return true;
@@ -157,14 +186,47 @@ private void executeForceMergeOnShards() {
157186

158187
private void executeForceMergeForShard(IndexShard shard) {
159188
CompletableFuture.runAsync(() -> {
189+
long startTime = System.currentTimeMillis();
190+
String shardId = String.valueOf(shard.shardId().getId());
160191
try {
161192
mergingShards.add(shard.shardId().getId());
193+
autoForceMergeMetrics.incrementCounter(
194+
autoForceMergeMetrics.totalMergesTriggered,
195+
1.0,
196+
autoForceMergeMetrics.getTags(nodeId, null)
197+
);
198+
199+
CommonStats preStats = new CommonStats(indicesService.getIndicesQueryCache(), shard, flags);
200+
if (preStats.getSegments() != null) {
201+
autoForceMergeMetrics.incrementCounter(
202+
autoForceMergeMetrics.segmentCount,
203+
(double) preStats.getSegments().getCount(),
204+
autoForceMergeMetrics.getTags(nodeId, shardId)
205+
);
206+
autoForceMergeMetrics.incrementCounter(
207+
autoForceMergeMetrics.shardSize,
208+
(double) preStats.getStore().getSizeInBytes(),
209+
autoForceMergeMetrics.getTags(nodeId, shardId)
210+
);
211+
}
212+
162213
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
163214
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
215+
164216
} catch (Exception e) {
165217
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
218+
autoForceMergeMetrics.incrementCounter(
219+
autoForceMergeMetrics.totalMergesFailed,
220+
1.0,
221+
autoForceMergeMetrics.getTags(nodeId, null)
222+
);
166223
} finally {
167224
mergingShards.remove(shard.shardId().getId());
225+
autoForceMergeMetrics.recordInHistogram(
226+
autoForceMergeMetrics.shardForceMergeLatency,
227+
(double) System.currentTimeMillis() - startTime,
228+
autoForceMergeMetrics.getTags(nodeId, shardId)
229+
);
168230
}
169231
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
170232
logger.info("Successfully triggered force merge for shard {}", shard.shardId());
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.autoforcemerge;
10+
11+
import org.opensearch.telemetry.metrics.Counter;
12+
import org.opensearch.telemetry.metrics.Histogram;
13+
import org.opensearch.telemetry.metrics.MetricsRegistry;
14+
import org.opensearch.telemetry.metrics.tags.Tags;
15+
16+
import java.util.Objects;
17+
import java.util.Optional;
18+
19+
/**
20+
* Class containing metrics (counters/latency) specific to Auto Force merges.
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class AutoForceMergeMetrics {
25+
26+
private static final String LATENCY_METRIC_UNIT_MS = "ms";
27+
private static final String COUNTER_METRICS_UNIT = "1";
28+
private static final String SIZE_METRIC_UNIT = "bytes";
29+
30+
public static final String NODE_ID = "NodeId";
31+
public static final String SHARD_ID = "ShardId";
32+
public static final String INDEX_NAME = "IndexName";
33+
34+
public final Histogram totalSchedulerExecutionTime;
35+
public final Counter totalMergesTriggered;
36+
public final Counter totalMergesSkipped;
37+
public final Counter skipsFromNodeValidator;
38+
public final Counter totalMergesFailed;
39+
40+
// Shard specific metrics
41+
public final Histogram shardForceMergeLatency;
42+
public final Counter shardSize;
43+
public final Counter segmentCount;
44+
45+
public AutoForceMergeMetrics(MetricsRegistry metricsRegistry) {
46+
totalSchedulerExecutionTime = metricsRegistry.createHistogram(
47+
"auto_force_merge.scheduler.execution_time",
48+
"Histogram for tracking total scheduler execution time.",
49+
LATENCY_METRIC_UNIT_MS
50+
);
51+
52+
totalMergesTriggered = metricsRegistry.createCounter(
53+
"auto_force_merge.merges.triggered",
54+
"Counter for number of force merges triggered.",
55+
COUNTER_METRICS_UNIT
56+
);
57+
58+
totalMergesSkipped = metricsRegistry.createCounter(
59+
"auto_force_merge.merges.skipped.total",
60+
"Counter for number of force merges skipped.",
61+
COUNTER_METRICS_UNIT
62+
);
63+
64+
skipsFromNodeValidator = metricsRegistry.createCounter(
65+
"auto_force_merge.merges.skipped.node_validator",
66+
"Counter for number of force merges skipped due to Node Validator.",
67+
COUNTER_METRICS_UNIT
68+
);
69+
70+
totalMergesFailed = metricsRegistry.createCounter(
71+
"auto_force_merge.merges.failed",
72+
"Counter for number of force merges failed.",
73+
COUNTER_METRICS_UNIT
74+
);
75+
76+
shardForceMergeLatency = metricsRegistry.createHistogram(
77+
"auto_force_merge.shard.merge_latency",
78+
"Histogram for tracking time taken by force merge on individual shards.",
79+
LATENCY_METRIC_UNIT_MS
80+
);
81+
82+
shardSize = metricsRegistry.createCounter(
83+
"auto_force_merge.shard.size",
84+
"Counter for tracking shard size during force merge operations.",
85+
SIZE_METRIC_UNIT
86+
);
87+
88+
segmentCount = metricsRegistry.createCounter(
89+
"auto_force_merge.shard.segment_count",
90+
"Counter for tracking segment count during force merge operations.",
91+
COUNTER_METRICS_UNIT
92+
);
93+
}
94+
95+
public void recordInHistogram(Histogram histogram, Double value, Optional<Tags> tags) {
96+
if (Objects.isNull(tags) || tags.isEmpty()) {
97+
histogram.record(value);
98+
return;
99+
}
100+
histogram.record(value, tags.get());
101+
}
102+
103+
public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
104+
if (Objects.isNull(tags) || tags.isEmpty()) {
105+
counter.add(value);
106+
return;
107+
}
108+
counter.add(value, tags.get());
109+
}
110+
111+
public Optional<Tags> getTags(String nodeId, String shardId) {
112+
Optional<Tags> tags = Optional.of(Tags.create());
113+
if (nodeId != null) {
114+
tags.get().addTag(NODE_ID, nodeId);
115+
}
116+
if (shardId != null) {
117+
tags.get().addTag(SHARD_ID, shardId);
118+
}
119+
return tags;
120+
}
121+
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
import org.opensearch.index.SegmentReplicationStatsTracker;
162162
import org.opensearch.index.analysis.AnalysisRegistry;
163163
import org.opensearch.index.autoforcemerge.AutoForceMergeManager;
164+
import org.opensearch.index.autoforcemerge.AutoForceMergeMetrics;
164165
import org.opensearch.index.compositeindex.CompositeIndexSettings;
165166
import org.opensearch.index.engine.EngineFactory;
166167
import org.opensearch.index.engine.MergedSegmentWarmerFactory;
@@ -1208,7 +1209,14 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
12081209
workloadGroupService
12091210
);
12101211

1211-
this.autoForceMergeManager = new AutoForceMergeManager(threadPool, monitorService, indicesService, clusterService);
1212+
final AutoForceMergeMetrics autoForceMergeMetrics = new AutoForceMergeMetrics(metricsRegistry);
1213+
this.autoForceMergeManager = new AutoForceMergeManager(
1214+
threadPool,
1215+
monitorService,
1216+
indicesService,
1217+
clusterService,
1218+
autoForceMergeMetrics
1219+
);
12121220

12131221
final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
12141222
.stream()

0 commit comments

Comments
 (0)