Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1312,14 +1312,52 @@ public Response updateApplicationPriority(AppPriority targetPriority,
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppQueue(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e);
}

return null;
}

@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

if (targetQueue == null) {
throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateAppQueue(targetQueue, hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to update app queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e);
}

return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import javax.ws.rs.core.Response.Status;

import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand Down Expand Up @@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -621,4 +622,43 @@ public AppPriority getAppPriority(HttpServletRequest hsr, String appId)

return new AppPriority(priority.getPriority());
}

@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
String queue = applicationMap.get(applicationId).getQueue();
return new AppQueue(queue);
}

@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) {
return Response.status(Status.BAD_REQUEST).build();
}

ApplicationReport appReport = applicationMap.get(applicationId);
String originalQueue = appReport.getQueue();
appReport.setQueue(targetQueue.getQueue());
applicationMap.put(applicationId, appReport);
LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.",
appId, originalQueue, targetQueue);

AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
return Response.status(Status.OK).entity(targetAppQueue).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -900,4 +901,52 @@ public void testGetAppPriority() throws IOException, InterruptedException,
Assert.assertNotNull(appPriority);
Assert.assertEquals(priority, appPriority.getPriority());
}

@Test
public void testUpdateAppQueue() throws IOException, InterruptedException,
YarnException {

String oldQueue = "oldQueue";
String newQueue = "newQueue";

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(oldQueue);

// Submit the application
Assert.assertNotNull(interceptor.submitApplication(context, null));

// Set New Queue for application
Response response = interceptor.updateAppQueue(new AppQueue(newQueue),
null, appId.toString());

Assert.assertNotNull(response);
AppQueue appQueue = (AppQueue) response.getEntity();
Assert.assertEquals(newQueue, appQueue.getQueue());

// Get AppQueue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
Assert.assertNotNull(queue);
Assert.assertEquals(newQueue, queue.getQueue());
}

@Test
public void testGetAppQueue() throws IOException, InterruptedException, YarnException {
String queueName = "queueName";

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(queueName);

Assert.assertNotNull(interceptor.submitApplication(context, null));

// Get Queue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
Assert.assertNotNull(queue);
Assert.assertEquals(queueName, queue.getQueue());
}
}