Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ public int getAttemptedItemsCount() {
}
}

@VisibleForTesting
public List<AttemptedItemInfo> getStorageMovementAttemptedItems() {
return storageMovementAttemptedItems;
}

@VisibleForTesting
public BlockingQueue<Block> getMovementFinishedBlocks() {
return movementFinishedBlocks;
}

public void clearQueues() {
movementFinishedBlocks.clear();
synchronized (storageMovementAttemptedItems) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ public void clearQueues() {
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
final static class AttemptedItemInfo extends ItemInfo {
public final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final Set<Block> blocks;

Expand All @@ -1095,7 +1095,7 @@ final static class AttemptedItemInfo extends ItemInfo {
* @param retryCount
* file retry count
*/
AttemptedItemInfo(long rootId, long trackId,
public AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
Set<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
Expand All @@ -39,10 +40,12 @@
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
import org.apache.hadoop.net.NetworkTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,6 +65,7 @@ public class ExternalSPSContext implements Context {
private final FileCollector fileCollector;
private final BlockMoveTaskHandler externalHandler;
private final BlockMovementListener blkMovementListener;
private ExternalSPSBeanMetrics spsBeanMetrics;

public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
this.service = service;
Expand Down Expand Up @@ -208,4 +212,17 @@ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
LOG.info("Movement attempted blocks", actualBlockMovements);
}
}

public void initMetrics(StoragePolicySatisfier sps) {
spsBeanMetrics = new ExternalSPSBeanMetrics(sps);
}

public void closeMetrics() {
spsBeanMetrics.close();
}

@VisibleForTesting
public ExternalSPSBeanMetrics getSpsBeanMetrics() {
return spsBeanMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
*/
@InterfaceAudience.Private
public final class ExternalStoragePolicySatisfier {
public static final Logger LOG = LoggerFactory
.getLogger(ExternalStoragePolicySatisfier.class);
public static final Logger LOG = LoggerFactory.getLogger(ExternalStoragePolicySatisfier.class);

private ExternalStoragePolicySatisfier() {
// This is just a class to start and run external sps.
Expand All @@ -60,6 +59,7 @@ private ExternalStoragePolicySatisfier() {
*/
public static void main(String[] args) throws Exception {
NameNodeConnector nnc = null;
ExternalSPSContext context = null;
try {
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
LOG);
Expand All @@ -69,9 +69,10 @@ public static void main(String[] args) throws Exception {
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
nnc = getNameNodeConnector(spsConf);

ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
context = new ExternalSPSContext(sps, nnc);
sps.init(context);
sps.start(StoragePolicySatisfierMode.EXTERNAL);
context.initMetrics(sps);
if (sps != null) {
sps.join();
}
Expand All @@ -82,6 +83,11 @@ public static void main(String[] args) throws Exception {
if (nnc != null) {
nnc.close();
}
if (context!= null) {
if (context.getSpsBeanMetrics() != null) {
context.closeMetrics();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps.metrics;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.metrics2.util.MBeans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import java.util.HashSet;

/**
* Expose the ExternalSPS metrics.
*/
public class ExternalSPSBeanMetrics implements ExternalSPSMXBean {

private static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSBeanMetrics.class);

/**
* ExternalSPS bean.
*/
private ObjectName externalSPSBeanName;
private StoragePolicySatisfier storagePolicySatisfier;

public ExternalSPSBeanMetrics(StoragePolicySatisfier sps) {
try {
this.storagePolicySatisfier = sps;
StandardMBean bean = new StandardMBean(this, ExternalSPSMXBean.class);
this.externalSPSBeanName = MBeans.register("ExternalSPS", "ExternalSPS", bean);
LOG.info("Registered ExternalSPS MBean: {}", this.externalSPSBeanName);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad externalSPS MBean setup", e);
}
}

/**
* Unregister the JMX interfaces.
*/
public void close() {
if (externalSPSBeanName != null) {
MBeans.unregister(externalSPSBeanName);
externalSPSBeanName = null;
}
}

@Override
public int getProcessingQueueSize() {
return storagePolicySatisfier.processingQueueSize();
}

@VisibleForTesting
public void updateProcessingQueueSize() {
storagePolicySatisfier.getStorageMovementQueue()
.add(new ItemInfo(0, 1, 1));
}

@Override
public int getMovementFinishedBlocksCount() {
return storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocksCount();
}

@VisibleForTesting
public void updateMovementFinishedBlocksCount() {
storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocks()
.add(new Block(1));
}

@Override
public int getAttemptedItemsCount() {
return storagePolicySatisfier.getAttemptedItemsMonitor().getAttemptedItemsCount();
}

@VisibleForTesting
public void updateAttemptedItemsCount() {
storagePolicySatisfier.getAttemptedItemsMonitor().getStorageMovementAttemptedItems()
.add(new StoragePolicySatisfier.AttemptedItemInfo(0, 1, 1, new HashSet<>(), 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps.metrics;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* This is the JMX management interface for ExternalSPS information.
* End users shouldn't be implementing these interfaces, and instead
* access this information through the JMX APIs.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface ExternalSPSMXBean {

/**
* Gets the queue size of StorageMovementNeeded.
*
* @return the queue size of StorageMovementNeeded.
*/
int getProcessingQueueSize();

/**
* Gets the count of movement finished blocks.
*
* @return the count of movement finished blocks.
*/
int getMovementFinishedBlocksCount();

/**
* Gets the count of attempted items.
*
* @return the count of attempted items.
*/
int getAttemptedItemsCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package provides the ability to expose external SPS metrics to JMX.
*/
package org.apache.hadoop.hdfs.server.sps.metrics;
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -82,6 +84,7 @@
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
Expand All @@ -100,6 +103,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -1716,4 +1721,34 @@ public void clear() {
actualBlockMovements.clear();
}
}

@Test(timeout = 300000)
public void testExternalSPSMetrics() throws Exception {
try {
createCluster();
// Start JMX but stop SPS thread to prevent mock data from being consumed.
externalSps.stop(true);
externalCtxt.initMetrics(externalSps);

ExternalSPSBeanMetrics spsBeanMetrics = externalCtxt.getSpsBeanMetrics();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxBeanName = new ObjectName("Hadoop:service=ExternalSPS,name=ExternalSPS");
// Assert metrics before update.
assertEquals(0, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
assertEquals(0, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
assertEquals(0, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));

// Update metrics.
spsBeanMetrics.updateAttemptedItemsCount();
spsBeanMetrics.updateProcessingQueueSize();
spsBeanMetrics.updateMovementFinishedBlocksCount();

// Assert metrics after update.
assertEquals(1, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
assertEquals(1, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
assertEquals(1, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
} finally {
shutdownCluster();
}
}
}