Skip to content

Commit eda1e5b

Browse files
slfan1989HarshitGupta11
authored andcommitted
YARN-11224. [Federation] Add getAppQueue, updateAppQueue REST APIs for Router. (apache#4747)
1 parent 7dbc5f2 commit eda1e5b

File tree

3 files changed

+130
-3
lines changed

3 files changed

+130
-3
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,14 +1312,52 @@ public Response updateApplicationPriority(AppPriority targetPriority,
13121312
@Override
13131313
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
13141314
throws AuthorizationException {
1315-
throw new NotImplementedException("Code is not implemented");
1315+
1316+
if (appId == null || appId.isEmpty()) {
1317+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1318+
}
1319+
1320+
try {
1321+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
1322+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1323+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1324+
return interceptor.getAppQueue(hsr, appId);
1325+
} catch (IllegalArgumentException e) {
1326+
RouterServerUtil.logAndThrowRunTimeException(e,
1327+
"Unable to get queue by appId: %s.", appId);
1328+
} catch (YarnException e) {
1329+
RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e);
1330+
}
1331+
1332+
return null;
13161333
}
13171334

13181335
@Override
13191336
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
13201337
String appId) throws AuthorizationException, YarnException,
13211338
InterruptedException, IOException {
1322-
throw new NotImplementedException("Code is not implemented");
1339+
1340+
if (appId == null || appId.isEmpty()) {
1341+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1342+
}
1343+
1344+
if (targetQueue == null) {
1345+
throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
1346+
}
1347+
1348+
try {
1349+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
1350+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1351+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1352+
return interceptor.updateAppQueue(targetQueue, hsr, appId);
1353+
} catch (IllegalArgumentException e) {
1354+
RouterServerUtil.logAndThrowRunTimeException(e,
1355+
"Unable to update app queue by appId: %s.", appId);
1356+
} catch (YarnException e) {
1357+
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e);
1358+
}
1359+
1360+
return null;
13231361
}
13241362

13251363
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
import javax.ws.rs.core.Response.Status;
3636

3737
import org.apache.commons.lang3.EnumUtils;
38+
import org.apache.commons.lang3.StringUtils;
3839
import org.apache.hadoop.security.authorize.AuthorizationException;
3940
import org.apache.hadoop.util.Sets;
40-
import org.apache.hadoop.util.StringUtils;
4141
import org.apache.hadoop.yarn.api.records.ApplicationId;
4242
import org.apache.hadoop.yarn.api.records.Resource;
4343
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -77,6 +77,7 @@
7777
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
7878
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
7979
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
80+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
8081
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
8182
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
8283
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -621,4 +622,43 @@ public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
621622

622623
return new AppPriority(priority.getPriority());
623624
}
625+
626+
@Override
627+
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
628+
throws AuthorizationException {
629+
if (!isRunning) {
630+
throw new RuntimeException("RM is stopped");
631+
}
632+
ApplicationId applicationId = ApplicationId.fromString(appId);
633+
if (!applicationMap.containsKey(applicationId)) {
634+
throw new NotFoundException("app with id: " + appId + " not found");
635+
}
636+
String queue = applicationMap.get(applicationId).getQueue();
637+
return new AppQueue(queue);
638+
}
639+
640+
@Override
641+
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId)
642+
throws AuthorizationException, YarnException, InterruptedException, IOException {
643+
if (!isRunning) {
644+
throw new RuntimeException("RM is stopped");
645+
}
646+
ApplicationId applicationId = ApplicationId.fromString(appId);
647+
if (!applicationMap.containsKey(applicationId)) {
648+
throw new NotFoundException("app with id: " + appId + " not found");
649+
}
650+
if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) {
651+
return Response.status(Status.BAD_REQUEST).build();
652+
}
653+
654+
ApplicationReport appReport = applicationMap.get(applicationId);
655+
String originalQueue = appReport.getQueue();
656+
appReport.setQueue(targetQueue.getQueue());
657+
applicationMap.put(applicationId, appReport);
658+
LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.",
659+
appId, originalQueue, targetQueue);
660+
661+
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
662+
return Response.status(Status.OK).entity(targetAppQueue).build();
663+
}
624664
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
6363
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
6464
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
65+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
6566
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
6667
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
6768
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -900,4 +901,52 @@ public void testGetAppPriority() throws IOException, InterruptedException,
900901
Assert.assertNotNull(appPriority);
901902
Assert.assertEquals(priority, appPriority.getPriority());
902903
}
904+
905+
@Test
906+
public void testUpdateAppQueue() throws IOException, InterruptedException,
907+
YarnException {
908+
909+
String oldQueue = "oldQueue";
910+
String newQueue = "newQueue";
911+
912+
// Submit application to multiSubCluster
913+
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
914+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
915+
context.setApplicationId(appId.toString());
916+
context.setQueue(oldQueue);
917+
918+
// Submit the application
919+
Assert.assertNotNull(interceptor.submitApplication(context, null));
920+
921+
// Set New Queue for application
922+
Response response = interceptor.updateAppQueue(new AppQueue(newQueue),
923+
null, appId.toString());
924+
925+
Assert.assertNotNull(response);
926+
AppQueue appQueue = (AppQueue) response.getEntity();
927+
Assert.assertEquals(newQueue, appQueue.getQueue());
928+
929+
// Get AppQueue by application
930+
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
931+
Assert.assertNotNull(queue);
932+
Assert.assertEquals(newQueue, queue.getQueue());
933+
}
934+
935+
@Test
936+
public void testGetAppQueue() throws IOException, InterruptedException, YarnException {
937+
String queueName = "queueName";
938+
939+
// Submit application to multiSubCluster
940+
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
941+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
942+
context.setApplicationId(appId.toString());
943+
context.setQueue(queueName);
944+
945+
Assert.assertNotNull(interceptor.submitApplication(context, null));
946+
947+
// Get Queue by application
948+
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
949+
Assert.assertNotNull(queue);
950+
Assert.assertEquals(queueName, queue.getQueue());
951+
}
903952
}

0 commit comments

Comments
 (0)