Skip to content

Commit 52c2d99

Browse files
authored
YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. (#4695)
1 parent 25ccdc7 commit 52c2d99

File tree

5 files changed

+219
-24
lines changed

5 files changed

+219
-24
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,28 @@ private RouterServerUtil() {
4242
public static final Logger LOG =
4343
LoggerFactory.getLogger(RouterServerUtil.class);
4444

45+
/**
46+
* Throws an exception due to an error.
47+
*
48+
* @param t the throwable raised in the called class.
49+
* @param errMsgFormat the error message format string.
50+
* @param args referenced by the format specifiers in the format string.
51+
* @throws YarnException on failure
52+
*/
53+
@Public
54+
@Unstable
55+
public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args)
56+
throws YarnException {
57+
String msg = String.format(errMsgFormat, args);
58+
if (t != null) {
59+
LOG.error(msg, t);
60+
throw new YarnException(msg, t);
61+
} else {
62+
LOG.error(msg);
63+
throw new YarnException(msg);
64+
}
65+
}
66+
4567
/**
4668
* Throws an exception due to an error.
4769
*
@@ -101,4 +123,26 @@ public static void logAndThrowRunTimeException(String errMsg, Throwable t)
101123
throw new RuntimeException(errMsg);
102124
}
103125
}
126+
127+
/**
128+
* Throws an RunTimeException due to an error.
129+
*
130+
* @param t the throwable raised in the called class.
131+
* @param errMsgFormat the error message format string.
132+
* @param args referenced by the format specifiers in the format string.
133+
* @throws RuntimeException on failure
134+
*/
135+
@Public
136+
@Unstable
137+
public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args)
138+
throws RuntimeException {
139+
String msg = String.format(errMsgFormat, args);
140+
if (t != null) {
141+
LOG.error(msg, t);
142+
throw new RuntimeException(msg, t);
143+
} else {
144+
LOG.error(msg);
145+
throw new RuntimeException(msg);
146+
}
147+
}
104148
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,7 +1360,24 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
13601360

13611361
@Override
13621362
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
1363-
throw new NotImplementedException("Code is not implemented");
1363+
if (appId == null || appId.isEmpty()) {
1364+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1365+
}
1366+
1367+
try {
1368+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
1369+
1370+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1371+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1372+
return interceptor.getAppAttempts(hsr, appId);
1373+
} catch (IllegalArgumentException e) {
1374+
RouterServerUtil.logAndThrowRunTimeException(e,
1375+
"Unable to get the AppAttempt appId: %s.", appId);
1376+
} catch (YarnException e) {
1377+
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
1378+
}
1379+
1380+
return null;
13641381
}
13651382

13661383
@Override
@@ -1372,7 +1389,28 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
13721389
@Override
13731390
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
13741391
HttpServletResponse res, String appId, String appAttemptId) {
1375-
throw new NotImplementedException("Code is not implemented");
1392+
1393+
if (appId == null || appId.isEmpty()) {
1394+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1395+
}
1396+
if (appAttemptId == null || appAttemptId.isEmpty()) {
1397+
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
1398+
}
1399+
1400+
try {
1401+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
1402+
1403+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1404+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1405+
return interceptor.getAppAttempt(req, res, appId, appAttemptId);
1406+
} catch (IllegalArgumentException e) {
1407+
RouterServerUtil.logAndThrowRunTimeException(e,
1408+
"Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId);
1409+
} catch (YarnException e) {
1410+
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
1411+
}
1412+
1413+
return null;
13761414
}
13771415

13781416
@Override
@@ -1423,13 +1461,7 @@ public ContainerInfo getContainer(HttpServletRequest req,
14231461
}
14241462

14251463
try {
1426-
ApplicationId applicationId = ApplicationId.fromString(appId);
1427-
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
1428-
1429-
if (subClusterInfo == null) {
1430-
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
1431-
applicationId, null);
1432-
}
1464+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
14331465

14341466
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
14351467
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@@ -1483,12 +1515,7 @@ public Response signalToContainer(String containerId, String command,
14831515
ContainerId containerIdObj = ContainerId.fromString(containerId);
14841516
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
14851517

1486-
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
1487-
1488-
if (subClusterInfo == null) {
1489-
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
1490-
applicationId, null);
1491-
}
1518+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());
14921519

14931520
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
14941521
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@@ -1556,24 +1583,26 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
15561583
/**
15571584
* get the HomeSubCluster according to ApplicationId.
15581585
*
1559-
* @param applicationId applicationId
1586+
* @param appId applicationId
15601587
* @return HomeSubCluster
15611588
* @throws YarnException on failure
15621589
*/
1563-
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
1590+
private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
1591+
throws YarnException {
15641592
SubClusterInfo subClusterInfo = null;
1565-
SubClusterId subClusterId = null;
15661593
try {
1567-
subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
1594+
ApplicationId applicationId = ApplicationId.fromString(appId);
1595+
SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
15681596
if (subClusterId == null) {
1569-
RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
1570-
+ applicationId, null);
1597+
RouterServerUtil.logAndThrowException(null,
1598+
"Can't get HomeSubCluster by applicationId %s", applicationId);
15711599
}
15721600
subClusterInfo = federationFacade.getSubCluster(subClusterId);
1601+
return subClusterInfo;
15731602
} catch (YarnException e) {
1574-
RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
1575-
+ applicationId + " failed.", e);
1603+
RouterServerUtil.logAndThrowException(e,
1604+
"Get HomeSubClusterInfo by applicationId %s failed.", appId);
15761605
}
1577-
return subClusterInfo;
1606+
throw new YarnException("Unable to get subCluster by applicationId = " + appId);
15781607
}
15791608
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
import org.apache.hadoop.yarn.api.records.ContainerReport;
5050
import org.apache.hadoop.yarn.api.records.NodeLabel;
5151
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
52+
import org.apache.hadoop.yarn.api.records.ApplicationReport;
53+
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
54+
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
55+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
56+
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
5257
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
5358
import org.apache.hadoop.yarn.exceptions.YarnException;
5459
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -67,6 +72,8 @@
6772
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
6873
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
6974
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
75+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
76+
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
7077
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
7178
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
7279
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -412,4 +419,48 @@ public Response signalToContainer(String containerId, String command,
412419

413420
return Response.status(Status.OK).build();
414421
}
422+
423+
@Override
424+
public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
425+
String appId, String appAttemptId) {
426+
if (!isRunning) {
427+
throw new RuntimeException("RM is stopped");
428+
}
429+
430+
ApplicationId applicationId = ApplicationId.fromString(appId);
431+
if (!applicationMap.contains(applicationId)) {
432+
throw new NotFoundException("app with id: " + appId + " not found");
433+
}
434+
435+
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
436+
applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
437+
"user", "queue", "appname", "host", 124, null,
438+
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
439+
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
440+
441+
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
442+
ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
443+
"host", 124, "url", "oUrl", "diagnostics",
444+
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
445+
newApplicationReport.getCurrentApplicationAttemptId(), 1));
446+
447+
return new AppAttemptInfo(attempt);
448+
}
449+
450+
@Override
451+
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
452+
if (!isRunning) {
453+
throw new RuntimeException("RM is stopped");
454+
}
455+
456+
ApplicationId applicationId = ApplicationId.fromString(appId);
457+
if (!applicationMap.contains(applicationId)) {
458+
throw new NotFoundException("app with id: " + appId + " not found");
459+
}
460+
461+
AppAttemptsInfo infos = new AppAttemptsInfo();
462+
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0));
463+
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
464+
return infos;
465+
}
415466
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
5757
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
5858
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
59+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
60+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
5961
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
6062
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
6163
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -717,4 +719,60 @@ public void testGetContainer()
717719
appId.toString(), appAttemptId.toString(), "0");
718720
Assert.assertNotNull(containerInfo);
719721
}
722+
723+
@Test
724+
public void testGetAppAttempts()
725+
throws IOException, InterruptedException, YarnException {
726+
// Submit application to multiSubCluster
727+
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
728+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
729+
context.setApplicationId(appId.toString());
730+
731+
Assert.assertNotNull(interceptor.submitApplication(context, null));
732+
733+
AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
734+
Assert.assertNotNull(appAttemptsInfo);
735+
736+
ArrayList<AppAttemptInfo> attemptLists = appAttemptsInfo.getAttempts();
737+
Assert.assertNotNull(appAttemptsInfo);
738+
Assert.assertEquals(2, attemptLists.size());
739+
740+
AppAttemptInfo attemptInfo1 = attemptLists.get(0);
741+
Assert.assertNotNull(attemptInfo1);
742+
Assert.assertEquals(0, attemptInfo1.getAttemptId());
743+
Assert.assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId());
744+
Assert.assertEquals("LogLink_0", attemptInfo1.getLogsLink());
745+
Assert.assertEquals(1659621705L, attemptInfo1.getFinishedTime());
746+
747+
AppAttemptInfo attemptInfo2 = attemptLists.get(1);
748+
Assert.assertNotNull(attemptInfo2);
749+
Assert.assertEquals(0, attemptInfo2.getAttemptId());
750+
Assert.assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId());
751+
Assert.assertEquals("LogLink_1", attemptInfo2.getLogsLink());
752+
Assert.assertEquals(1659621705L, attemptInfo2.getFinishedTime());
753+
}
754+
755+
@Test
756+
public void testGetAppAttempt()
757+
throws IOException, InterruptedException, YarnException {
758+
759+
// Generate ApplicationId information
760+
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
761+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
762+
context.setApplicationId(appId.toString());
763+
764+
// Generate ApplicationAttemptId information
765+
Assert.assertNotNull(interceptor.submitApplication(context, null));
766+
ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);
767+
768+
org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
769+
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1");
770+
771+
Assert.assertNotNull(appAttemptInfo);
772+
Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
773+
Assert.assertEquals("url", appAttemptInfo.getTrackingUrl());
774+
Assert.assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl());
775+
Assert.assertEquals(124, appAttemptInfo.getRpcPort());
776+
Assert.assertEquals("host", appAttemptInfo.getHost());
777+
}
720778
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@
3030
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
3131
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
3232
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
33+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
3334
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
3435
import org.junit.Assert;
3536
import org.junit.Test;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

40+
import static org.mockito.Mockito.mock;
41+
import static org.mockito.Mockito.when;
42+
3943
/**
4044
* Test class to validate RouterWebServiceUtil methods.
4145
*/
@@ -579,4 +583,13 @@ private void setUpClusterMetrics(ClusterMetricsInfo metrics, long seed) {
579583
metrics.setActiveNodes(rand.nextInt(1000));
580584
metrics.setShutdownNodes(rand.nextInt(1000));
581585
}
586+
587+
public static AppAttemptInfo generateAppAttemptInfo(int attemptId) {
588+
AppAttemptInfo appAttemptInfo = mock(AppAttemptInfo.class);
589+
when(appAttemptInfo.getAppAttemptId()).thenReturn("AppAttemptId_" + attemptId);
590+
when(appAttemptInfo.getAttemptId()).thenReturn(0);
591+
when(appAttemptInfo.getFinishedTime()).thenReturn(1659621705L);
592+
when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId);
593+
return appAttemptInfo;
594+
}
582595
}

0 commit comments

Comments
 (0)