Skip to content

Commit 2dd678f

Browse files
committed
HDFS-16488. [SPS]: Expose metrics to JMX for external SPS
1 parent ab8c360 commit 2dd678f

File tree

8 files changed

+247
-5
lines changed

8 files changed

+247
-5
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,16 @@ public int getAttemptedItemsCount() {
301301
}
302302
}
303303

304+
@VisibleForTesting
305+
public List<AttemptedItemInfo> getStorageMovementAttemptedItems() {
306+
return storageMovementAttemptedItems;
307+
}
308+
309+
@VisibleForTesting
310+
public BlockingQueue<Block> getMovementFinishedBlocks() {
311+
return movementFinishedBlocks;
312+
}
313+
304314
public void clearQueues() {
305315
movementFinishedBlocks.clear();
306316
synchronized (storageMovementAttemptedItems) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ public void clearQueues() {
10771077
* attempted or reported time stamp. This is used by
10781078
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
10791079
*/
1080-
final static class AttemptedItemInfo extends ItemInfo {
1080+
public final static class AttemptedItemInfo extends ItemInfo {
10811081
private long lastAttemptedOrReportedTime;
10821082
private final Set<Block> blocks;
10831083

@@ -1095,7 +1095,7 @@ final static class AttemptedItemInfo extends ItemInfo {
10951095
* @param retryCount
10961096
* file retry count
10971097
*/
1098-
AttemptedItemInfo(long rootId, long trackId,
1098+
public AttemptedItemInfo(long rootId, long trackId,
10991099
long lastAttemptedOrReportedTime,
11001100
Set<Block> blocks, int retryCount) {
11011101
super(rootId, trackId, retryCount);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525

2626
import org.apache.hadoop.classification.InterfaceAudience;
27+
import org.apache.hadoop.classification.VisibleForTesting;
2728
import org.apache.hadoop.fs.Path;
2829
import org.apache.hadoop.hdfs.DFSUtilClient;
2930
import org.apache.hadoop.hdfs.protocol.Block;
@@ -39,10 +40,12 @@
3940
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
4041
import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
4142
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
43+
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
4244
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
4345
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
4446
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
4547
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
48+
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
4649
import org.apache.hadoop.net.NetworkTopology;
4750
import org.slf4j.Logger;
4851
import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ public class ExternalSPSContext implements Context {
6265
private final FileCollector fileCollector;
6366
private final BlockMoveTaskHandler externalHandler;
6467
private final BlockMovementListener blkMovementListener;
68+
private ExternalSPSBeanMetrics spsBeanMetrics;
6569

6670
public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
6771
this.service = service;
@@ -208,4 +212,17 @@ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
208212
LOG.info("Movement attempted blocks", actualBlockMovements);
209213
}
210214
}
215+
216+
public void initMetrics(StoragePolicySatisfier sps) {
217+
spsBeanMetrics = new ExternalSPSBeanMetrics(sps);
218+
}
219+
220+
public void closeMetrics() {
221+
spsBeanMetrics.close();
222+
}
223+
224+
@VisibleForTesting
225+
public ExternalSPSBeanMetrics getSpsBeanMetrics() {
226+
return spsBeanMetrics;
227+
}
211228
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@
4848
*/
4949
@InterfaceAudience.Private
5050
public final class ExternalStoragePolicySatisfier {
51-
public static final Logger LOG = LoggerFactory
52-
.getLogger(ExternalStoragePolicySatisfier.class);
51+
public static final Logger LOG = LoggerFactory.getLogger(ExternalStoragePolicySatisfier.class);
5352

5453
private ExternalStoragePolicySatisfier() {
5554
// This is just a class to start and run external sps.
@@ -60,6 +59,7 @@ private ExternalStoragePolicySatisfier() {
6059
*/
6160
public static void main(String[] args) throws Exception {
6261
NameNodeConnector nnc = null;
62+
ExternalSPSContext context = null;
6363
try {
6464
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
6565
LOG);
@@ -69,9 +69,10 @@ public static void main(String[] args) throws Exception {
6969
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
7070
nnc = getNameNodeConnector(spsConf);
7171

72-
ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
72+
context = new ExternalSPSContext(sps, nnc);
7373
sps.init(context);
7474
sps.start(StoragePolicySatisfierMode.EXTERNAL);
75+
context.initMetrics(sps);
7576
if (sps != null) {
7677
sps.join();
7778
}
@@ -82,6 +83,11 @@ public static void main(String[] args) throws Exception {
8283
if (nnc != null) {
8384
nnc.close();
8485
}
86+
if (context!= null) {
87+
if (context.getSpsBeanMetrics() != null) {
88+
context.closeMetrics();
89+
}
90+
}
8591
}
8692
}
8793

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.sps.metrics;
19+
20+
import org.apache.hadoop.classification.VisibleForTesting;
21+
import org.apache.hadoop.hdfs.protocol.Block;
22+
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
23+
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
24+
import org.apache.hadoop.metrics2.util.MBeans;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import javax.management.NotCompliantMBeanException;
29+
import javax.management.ObjectName;
30+
import javax.management.StandardMBean;
31+
import java.util.HashSet;
32+
33+
/**
34+
* Expose the ExternalSPS metrics.
35+
*/
36+
public class ExternalSPSBeanMetrics implements ExternalSPSMXBean {
37+
38+
private static final Logger LOG =
39+
LoggerFactory.getLogger(ExternalSPSBeanMetrics.class);
40+
41+
/**
42+
* ExternalSPS bean.
43+
*/
44+
private ObjectName externalSPSBeanName;
45+
private StoragePolicySatisfier storagePolicySatisfier;
46+
47+
public ExternalSPSBeanMetrics(StoragePolicySatisfier sps) {
48+
try {
49+
this.storagePolicySatisfier = sps;
50+
StandardMBean bean = new StandardMBean(this, ExternalSPSMXBean.class);
51+
this.externalSPSBeanName = MBeans.register("ExternalSPS", "ExternalSPS", bean);
52+
LOG.info("Registered ExternalSPS MBean: {}", this.externalSPSBeanName);
53+
} catch (NotCompliantMBeanException e) {
54+
throw new RuntimeException("Bad externalSPS MBean setup", e);
55+
}
56+
}
57+
58+
/**
59+
* Unregister the JMX interfaces.
60+
*/
61+
public void close() {
62+
if (externalSPSBeanName != null) {
63+
MBeans.unregister(externalSPSBeanName);
64+
externalSPSBeanName = null;
65+
}
66+
}
67+
68+
@Override
69+
public int getProcessingQueueSize() {
70+
return storagePolicySatisfier.processingQueueSize();
71+
}
72+
73+
@VisibleForTesting
74+
public void updateProcessingQueueSize() {
75+
storagePolicySatisfier.getStorageMovementQueue()
76+
.add(new ItemInfo(0, 1, 1));
77+
}
78+
79+
@Override
80+
public int getMovementFinishedBlocksCount() {
81+
return storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocksCount();
82+
}
83+
84+
@VisibleForTesting
85+
public void updateMovementFinishedBlocksCount() {
86+
storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocks()
87+
.add(new Block(1));
88+
}
89+
90+
@Override
91+
public int getAttemptedItemsCount() {
92+
return storagePolicySatisfier.getAttemptedItemsMonitor().getAttemptedItemsCount();
93+
}
94+
95+
@VisibleForTesting
96+
public void updateAttemptedItemsCount() {
97+
storagePolicySatisfier.getAttemptedItemsMonitor().getStorageMovementAttemptedItems()
98+
.add(new StoragePolicySatisfier.AttemptedItemInfo(0, 1, 1, new HashSet<>(), 1));
99+
}
100+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.sps.metrics;
19+
20+
import org.apache.hadoop.classification.InterfaceAudience;
21+
import org.apache.hadoop.classification.InterfaceStability;
22+
23+
/**
24+
* This is the JMX management interface for ExternalSPS information.
25+
* End users shouldn't be implementing these interfaces, and instead
26+
* access this information through the JMX APIs.
27+
*/
28+
@InterfaceAudience.Private
29+
@InterfaceStability.Stable
30+
public interface ExternalSPSMXBean {
31+
32+
/**
33+
* Gets the queue size of StorageMovementNeeded.
34+
*
35+
* @return the queue size of StorageMovementNeeded.
36+
*/
37+
int getProcessingQueueSize();
38+
39+
/**
40+
* Gets the count of movement finished blocks.
41+
*
42+
* @return the count of movement finished blocks.
43+
*/
44+
int getMovementFinishedBlocksCount();
45+
46+
/**
47+
* Gets the count of attempted items.
48+
*
49+
* @return the count of attempted items.
50+
*/
51+
int getAttemptedItemsCount();
52+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* This package provides the ability to expose external SPS metrics to JMX.
21+
*/
22+
package org.apache.hadoop.hdfs.server.sps.metrics;

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@
3636
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
3737
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
3838
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
39+
import static org.junit.Assert.assertEquals;
3940
import static org.junit.Assert.assertFalse;
4041
import static org.junit.Assert.fail;
4142

4243
import java.io.File;
4344
import java.io.FileNotFoundException;
4445
import java.io.IOException;
46+
import java.lang.management.ManagementFactory;
4547
import java.net.InetSocketAddress;
4648
import java.security.PrivilegedExceptionAction;
4749
import java.util.ArrayList;
@@ -82,6 +84,7 @@
8284
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
8385
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
8486
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
87+
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
8588
import org.apache.hadoop.http.HttpConfig;
8689
import org.apache.hadoop.minikdc.MiniKdc;
8790
import org.apache.hadoop.security.SecurityUtil;
@@ -100,6 +103,8 @@
100103
import org.slf4j.Logger;
101104
import org.slf4j.LoggerFactory;
102105

106+
import javax.management.MBeanServer;
107+
import javax.management.ObjectName;
103108
import java.util.function.Supplier;
104109

105110
/**
@@ -1716,4 +1721,34 @@ public void clear() {
17161721
actualBlockMovements.clear();
17171722
}
17181723
}
1724+
1725+
@Test(timeout = 300000)
1726+
public void testExternalSPSMetrics() throws Exception {
1727+
try {
1728+
createCluster();
1729+
// Start JMX but stop SPS thread to prevent mock data from being consumed.
1730+
externalSps.stop(true);
1731+
externalCtxt.initMetrics(externalSps);
1732+
1733+
ExternalSPSBeanMetrics spsBeanMetrics = externalCtxt.getSpsBeanMetrics();
1734+
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
1735+
ObjectName mxBeanName = new ObjectName("Hadoop:service=ExternalSPS,name=ExternalSPS");
1736+
// Assert metrics before update.
1737+
assertEquals(0, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
1738+
assertEquals(0, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
1739+
assertEquals(0, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
1740+
1741+
// Update metrics.
1742+
spsBeanMetrics.updateAttemptedItemsCount();
1743+
spsBeanMetrics.updateProcessingQueueSize();
1744+
spsBeanMetrics.updateMovementFinishedBlocksCount();
1745+
1746+
// Assert metrics after update.
1747+
assertEquals(1, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
1748+
assertEquals(1, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
1749+
assertEquals(1, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
1750+
} finally {
1751+
shutdownCluster();
1752+
}
1753+
}
17191754
}

0 commit comments

Comments
 (0)