From b28f143ad86098fb2e698060d3485e9ce83f00cc Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 16 Aug 2022 18:07:13 +0800 Subject: [PATCH] YARN-11224. [Federation] Add getAppQueue, updateAppQueue REST APIs for Router. --- .../webapp/FederationInterceptorREST.java | 42 +++++++++++++++- .../MockDefaultRequestInterceptorREST.java | 42 +++++++++++++++- .../webapp/TestFederationInterceptorREST.java | 49 +++++++++++++++++++ 3 files changed, 130 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 8a07def8d6b7e..31a841cfb97dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -1312,14 +1312,52 @@ public Response updateApplicationPriority(AppPriority targetPriority, @Override public AppQueue getAppQueue(HttpServletRequest hsr, String appId) throws AuthorizationException { - throw new NotImplementedException("Code is not implemented"); + + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppQueue(hsr, appId); + } catch (IllegalArgumentException e) { + RouterServerUtil.logAndThrowRunTimeException(e, + "Unable to get queue by appId: %s.", appId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e); + } + + return null; } @Override public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + + if (targetQueue == null) { + throw new IllegalArgumentException("Parameter error, the targetQueue is null."); + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.updateAppQueue(targetQueue, hsr, appId); + } catch (IllegalArgumentException e) { + RouterServerUtil.logAndThrowRunTimeException(e, + "Unable to update app queue by appId: %s.", appId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e); + } + + return null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 4b3e49e875f5a..f42ffd5961b75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -35,9 +35,9 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.EnumUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.util.Sets; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; @@ -621,4 +622,43 @@ public AppPriority getAppPriority(HttpServletRequest hsr, String appId) return new AppPriority(priority.getPriority()); } + + @Override + public AppQueue getAppQueue(HttpServletRequest hsr, String appId) + throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + String queue = applicationMap.get(applicationId).getQueue(); + return new AppQueue(queue); + } + + @Override + public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId) + throws AuthorizationException, YarnException, InterruptedException, IOException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) { + return Response.status(Status.BAD_REQUEST).build(); + } + + ApplicationReport appReport = applicationMap.get(applicationId); + String originalQueue = appReport.getQueue(); + appReport.setQueue(targetQueue.getQueue()); + applicationMap.put(applicationId, appReport); + LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.", + appId, originalQueue, targetQueue); + + AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue()); + return Response.status(Status.OK).entity(targetAppQueue).build(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 0c90412260240..a8cafa01403bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; @@ -900,4 +901,52 @@ public void testGetAppPriority() throws IOException, InterruptedException, Assert.assertNotNull(appPriority); Assert.assertEquals(priority, appPriority.getPriority()); } + + @Test + public void testUpdateAppQueue() throws IOException, InterruptedException, + YarnException { + + String oldQueue = "oldQueue"; + String newQueue = "newQueue"; + + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setQueue(oldQueue); + + // Submit the application + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + // Set New Queue for application + Response response = interceptor.updateAppQueue(new AppQueue(newQueue), + null, appId.toString()); + + Assert.assertNotNull(response); + AppQueue appQueue = (AppQueue) response.getEntity(); + Assert.assertEquals(newQueue, appQueue.getQueue()); + + // Get AppQueue by application + AppQueue queue = interceptor.getAppQueue(null, appId.toString()); + Assert.assertNotNull(queue); + Assert.assertEquals(newQueue, queue.getQueue()); + } + + @Test + public void testGetAppQueue() throws IOException, InterruptedException, YarnException { + String queueName = "queueName"; + + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setQueue(queueName); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + // Get Queue by application + AppQueue queue = interceptor.getAppQueue(null, appId.toString()); + Assert.assertNotNull(queue); + Assert.assertEquals(queueName, queue.getQueue()); + } }