diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 1520929325eb6..39433793620ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -481,6 +483,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 7eb74d5e4e854..81059928250a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -47,10 +47,12 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; @@ -70,7 +72,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; +import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy + .ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -486,6 +489,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 9077d3b6af349..fc7db5ac987c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -51,7 +53,7 @@ /** *

The protocol between an ApplicationMaster and a * NodeManager to start/stop and increase resource of containers - * and to get status of running containers.

+ * and to get status and launch context of running containers.

* *

If security is enabled the NodeManager verifies that the * ApplicationMaster has truly been allocated the container @@ -220,6 +222,22 @@ SignalContainerResponse signalToContainer(SignalContainerRequest request) @Unstable ResourceLocalizationResponse localize(ResourceLocalizationRequest request) throws YarnException, IOException; + + /** + * Gets container launch context for a container specified in + * {@link GetContainerLaunchContextRequest}. + * This protocol is only used by the container relocation logic, between node managers, to + * transfer the launch context of the container to be relocated to the target node. + * + * @param request specifies the id of the container for which the launch context is requested + * @return Response that contains the requested container launch context + * @throws YarnException + * @throws IOException + */ + @Public + @Stable + GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException; /** * ReInitialize the Container with a new Launch Context. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index f7ce127168cb7..087f227ce4c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -52,6 +53,11 @@ * the ResourceManager about the change in * requirements of running containers. * + *

  • + * A list of {@link ContainerMoveRequest} to inform + * the ResourceManager about the application's + * container relocation needs. + *
  • * * * @see ApplicationMasterProtocol#allocate(AllocateRequest) @@ -77,6 +83,19 @@ public static AllocateRequest newInstance(int responseID, float appProgress, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, List updateRequests) { + return newInstance(responseID, appProgress, resourceAsk, + containersToBeReleased, resourceBlacklistRequest, updateRequests, + null); + } + + @Public + @Stable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + List updateRequests, + List moveAsk) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); @@ -84,6 +103,7 @@ public static AllocateRequest newInstance(int responseID, float appProgress, allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); allocateRequest.setUpdateRequests(updateRequests); + allocateRequest.setMoveAskList(moveAsk); return allocateRequest; } @@ -211,4 +231,27 @@ public abstract void setResourceBlacklistRequest( @Unstable public abstract void setUpdateRequests( List updateRequests); + + /** + * Get the list of container move requests being sent by the + * ApplicationMaster. + * + * @return list of ContainerMoveRequest + * being sent by the ApplicationMaster. + */ + @Public + @Unstable + public abstract List getMoveAskList(); + + /** + * Set the list of container move requests to inform the + * ResourceManager about the containers that need to be + * relocated. + * + * @param containerMoveRequests list of ContainerMoveRequest + * for containers that need to be relocated. + */ + @Public + @Unstable + public abstract void setMoveAskList(List containerMoveRequests); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextRequest.java new file mode 100644 index 0000000000000..2ae6b74c45f19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextRequest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    The request sent by one NodeManager to another + * NodeManager in order to get the launch context of the container with id + * containerId.

    + * + *

    This request is used only for container relocation, where the launch context of + * the origin container is necessary for launching the relocated container on the target node.

    + * + * @see ContainerManagementProtocol#getContainerLaunchContext(GetContainerLaunchContextRequest) + */ +public abstract class GetContainerLaunchContextRequest { + + @Public + @Stable + public static GetContainerLaunchContextRequest newInstance(ContainerId containerId) { + GetContainerLaunchContextRequest request = + Records.newRecord(GetContainerLaunchContextRequest.class); + request.setContainerId(containerId); + return request; + } + + /** + * Gets the container id for which the launch context is requested. + * @return the container id for which the launch context is requested + */ + @Public + @Stable + public abstract ContainerId getContainerId(); + + /** + * Sets the container id for which the launch context is requested. + * @param containerId the container id for which the launch context is requested + */ + @Public + @Stable + public abstract void setContainerId(ContainerId containerId); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextResponse.java new file mode 100644 index 0000000000000..dbed4f0ec04f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetContainerLaunchContextResponse.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    The response for a container launch context request sent between + * NodeManagers. It contains the requested container launch context.

    + * + *

    This request is used only for container relocation, where a launch context + * of the origin container is necessary for launching the relocated container on + * the target node.

    + */ +public abstract class GetContainerLaunchContextResponse { + + @Public + @Stable + public static GetContainerLaunchContextResponse newInstance(ContainerLaunchContext + containerLaunchContext) { + GetContainerLaunchContextResponse request = + Records.newRecord(GetContainerLaunchContextResponse.class); + request.setContainerLaunchContext(containerLaunchContext); + return request; + } + + /** + * Gets the launch context of the requested container. + * @return the launch context of the requested container + */ + @Public + @Stable + public abstract ContainerLaunchContext getContainerLaunchContext(); + + /** + * Sets the launch context of the requested container. + * @param containerLaunchContext the launch context of the requested container + */ + @Public + @Stable + public abstract void setContainerLaunchContext(ContainerLaunchContext containerLaunchContext); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerRequest.java index 50179a962bad2..6af80e18c0d22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerRequest.java @@ -20,9 +20,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; @@ -36,6 +39,15 @@ * necessary binaries/jar/shared-objects etc. via the * {@link ContainerLaunchContext}.

    * + *

    The isMove flag tells whether this request corresponds to a container + * relocation. If true, then no container launch context is needed as it will be + * transferred directly from the origin node manager to the target node manager. + * The container to be relocated is identified by the originNodeId and + * originContainerId. + * The originNMToken is sent along so that the origin container can be shut down + * by the target node manager. + *

    + * * @see ContainerManagementProtocol#startContainers(StartContainersRequest) */ @Public @@ -49,6 +61,21 @@ public static StartContainerRequest newInstance( Records.newRecord(StartContainerRequest.class); request.setContainerLaunchContext(context); request.setContainerToken(container); + request.setIsMove(false); + return request; + } + + @Public + @Stable + public static StartContainerRequest newInstance( + Token container, ContainerId originContainerId, NodeId originNodeId, Token originNMToken) { + StartContainerRequest request = + Records.newRecord(StartContainerRequest.class); + request.setContainerToken(container); + request.setIsMove(true); + request.setOriginContainerId(originContainerId); + request.setOriginNodeId(originNodeId); + request.setOriginNMToken(originNMToken); return request; } @@ -91,4 +118,91 @@ public static StartContainerRequest newInstance( @Public @Stable public abstract void setContainerToken(Token container); + + /** + * Gets whether this start container request corresponds to a container relocation. + * @return whether this start container request corresponds to a container relocation + */ + @Public + @Unstable + public abstract boolean getIsMove(); + + /** + * Sets whether this start container request corresponds to a container relocation. + * @param isMove whether this start container request corresponds to a container + * relocation + */ + @Public + @Unstable + public abstract void setIsMove(boolean isMove); + + /** + * Gets the origin container id for this start container request. + * The origin container id is set if and only if this start container request + * corresponds to a container relocation. It identifies the container that should + * be relocated. + * + * @return the origin container id for this start container request + */ + @Public + @Unstable + public abstract ContainerId getOriginContainerId(); + + /** + * Sets the origin container id for this start container request. + * The origin container id should be set if and only if this start container request + * corresponds to a container relocation. It identifies the container that should + * be relocated. + * + * @param originContainerId the origin container id for this start container request + */ + @Public + @Unstable + public abstract void setOriginContainerId(ContainerId originContainerId); + + /** + * Gets the origin node id for this start container request. + * The origin node id is set if and only if this resource start container corresponds + * to a container relocation. It identifies the node of the container that should + * be relocated. + * + * @return the origin node id for this start container request + */ + @Public + @Unstable + public abstract NodeId getOriginNodeId(); + + /** + * Sets the origin node id for this start container request. + * The origin node id should be set if and only if this start container request + * corresponds to a container relocation. It identifies the node of the container + * that should be relocated. + * + * @param originNodeId the origin node id for this start container request + */ + @Public + @Unstable + public abstract void setOriginNodeId(NodeId originNodeId); + + /** + * Gets the security token for the origin node. + * The origin NM token is set if and only if this start container request corresponds + * to a container relocation. It is used for shutting down the origin container. + * + * @return the security token for the origin node + */ + @Public + @Unstable + public abstract Token getOriginNMToken(); + + /** + * Sets the security token for the origin node. + * The origin NM token should be set if and only if this start container request corresponds + * to a container relocation. It is used for shutting down the origin container. + * + * @param originNMToken the security token for the origin node + */ + @Public + @Unstable + public abstract void setOriginNMToken(Token originNMToken); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 4fdc8034c5707..9cc3828edce0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -48,6 +48,13 @@ * Container {@link Token} of the container, used to securely verify * authenticity of the allocation. * + *
  • + * A boolean isMove flag, defaulting to {@code false}. + * This flag indicates whether this container is associated with a + * container relocation. If true, then this container is meant to replace + * an existing container identified by originNodeId + * and originContainerId. + *
  • * * * Typically, an {@code ApplicationMaster} receives the {@code Container} @@ -256,4 +263,66 @@ public int getVersion() { public void setVersion(int version) { throw new UnsupportedOperationException(); } + + /** + * Gets whether this container is associated with a container relocation. + * @return whether this container is associated with a container relocation + */ + @Public + @Unstable + public abstract boolean getIsMove(); + + /** + * Sets whether this container is associated with a container relocation. + * @param isMove whether this container is associated with a container relocation + */ + @Public + @Unstable + public abstract void setIsMove(boolean isMove); + + /** + * Gets the origin container id for this container. + * The origin container id is set if and only if this container is associated + * with a container relocation. It identifies the container that should be relocated. + * + * @return the origin container id for this container + */ + @Public + @Unstable + public abstract ContainerId getOriginContainerId(); + + /** + * Sets the origin container id for this container. + * The origin container id should be set if and only if this container is associated + * with a container relocation. It identifies the container that should be relocated. + * + * @param originContainerId the origin container id for this container + */ + @Public + @Unstable + public abstract void setOriginContainerId(ContainerId originContainerId); + + /** + * Gets the origin node id for this container. + * The origin node id is set if and only if this container is associated + * with a container relocation. It identifies the node from which a container should + * be relocated. + * + * @return the origin node id for this container + */ + @Public + @Unstable + public abstract NodeId getOriginNodeId(); + + /** + * Sets the origin node id for this container. + * The origin node id should be set if and only if this container is associated + * with a container relocation. It identifies the node from which a container should + * be relocated. + * + * @param originNodeId the origin node id for this container + */ + @Public + @Unstable + public abstract void setOriginNodeId(NodeId originNodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 6d4bccd80c882..317b6dde621ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -228,4 +228,36 @@ public static ContainerLaunchContext newInstance( @Unstable public abstract void setContainerRetryContext( ContainerRetryContext containerRetryContext); + + /** + * Creates a deeper copy of this container launch context. + * + * @return the copy of this container launch context + * @throws InstantiationException + * @throws IllegalAccessException + */ + public ContainerLaunchContext copy() throws InstantiationException, + IllegalAccessException { + return newInstance( + copyMap(getLocalResources()), + copyMap(getEnvironment()), + copyList(getCommands()), + copyMap(getServiceData()), + getTokens(), + copyMap(getApplicationACLs())); + } + + private static Map copyMap(final Map source) throws InstantiationException, + IllegalAccessException { + final Map newMap = source.getClass().newInstance(); + newMap.putAll(source); + return newMap; + } + + private static List copyList(final List source) throws InstantiationException, + IllegalAccessException { + final List newList = source.getClass().newInstance(); + newList.addAll(source); + return newList; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerMoveRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerMoveRequest.java new file mode 100644 index 0000000000000..f354434fe3990 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerMoveRequest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.io.Serializable; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code ContainerMoveRequest} represents the request made + * by an application to the {@code ResourceManager} + * to relocate a single container to another node of the yarn cluster. + *

    + * It includes: + *

      + *
    • {@link Priority} of the request.
    • + *
    • + * originContainerId of the request, which is a + * {@link ContainerId} that identifies the container to be relocated. + *
    • + *
    • + * The targetHost of the request, which identifies the node where + * the origin container should be relocated to. + *
    • + *
    + * + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + */ +@Public +@Unstable +public abstract class ContainerMoveRequest implements Comparable { + + @Public + @Unstable + public static ContainerMoveRequest newInstance(Priority priority, + ContainerId originContainerId, String targetHost) { + ContainerMoveRequest request = Records.newRecord(ContainerMoveRequest.class); + request.setPriority(priority); + request.setOriginContainerId(originContainerId); + request.setTargetHost(targetHost); + return request; + } + + @Public + @Unstable + public static class ContainerMoveRequestComparator implements + java.util.Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(ContainerMoveRequest r1, ContainerMoveRequest r2) { + + // Compare by priority, originContainerId and targetHost + int ret = r1.getPriority().compareTo(r2.getPriority()); + if (ret == 0) { + ret = r1.getOriginContainerId().compareTo(r2.getOriginContainerId()); + } + if (ret == 0) { + ret = r1.getTargetHost().compareTo(r2.getTargetHost()); + } + return ret; + } + } + + /** + * Get the Priority of the request. + * @return Priority of the request + */ + @Public + @Unstable + public abstract Priority getPriority(); + + /** + * Set the Priority of the request. + * @param priority Priority of the request + */ + @Public + @Unstable + public abstract void setPriority(Priority priority); + + /** + * Gets the origin container id of this container move request. + * It identifies the container that should be relocated. + * + * @return the origin container id of this container move request + */ + @Public + @Unstable + public abstract ContainerId getOriginContainerId(); + + /** + * Sets the origin container id of this container move request. + * It identifies the container that should be relocated. + * + * @param originContainerId the origin container id of this container move + * request + */ + @Public + @Unstable + public abstract void setOriginContainerId(ContainerId originContainerId); + + /** + * Gets the target host of this container move request, which identifies + * the node where the container should be relocated to. + * + * @return the target host of this container move request + */ + @Public + @Unstable + public abstract String getTargetHost(); + + /** + * Sets the target host of this container move request, which identifies + * the node where the container should be relocated to. + * + * @param targetHost the target host of this container move request + */ + @Public + @Unstable + public abstract void setTargetHost(String targetHost); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + Priority priority = getPriority(); + ContainerId originContainerId = getOriginContainerId(); + String targetHost = getTargetHost(); + + result = prime * result + ((priority == null) ? 0 : priority.hashCode()); + result = prime * result + + ((originContainerId == null) ? 0 : originContainerId.hashCode()); + result = prime * result + ((targetHost == null) ? 0 : targetHost.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ContainerMoveRequest other = (ContainerMoveRequest) obj; + ContainerId originContainerId = getOriginContainerId(); + ContainerId otherOriginContainerId = other.getOriginContainerId(); + if (originContainerId == null) { + if (otherOriginContainerId != null) + return false; + } else if (!originContainerId.equals(otherOriginContainerId)) + return false; + String targetHost = getTargetHost(); + String otherTargetHost = other.getTargetHost(); + if (targetHost == null) { + if (otherTargetHost != null) + return false; + } else if (!targetHost.equals(otherTargetHost)) + return false; + Priority priority = getPriority(); + Priority otherPriority = other.getPriority(); + if (priority == null) { + if (otherPriority != null) + return false; + } else if (!priority.equals(otherPriority)) + return false; + return true; + } + + @Override + public int compareTo(ContainerMoveRequest other) { + int priorityComparison = this.getPriority().compareTo(other.getPriority()); + if (priorityComparison == 0) { + int originContainerIdComparison = + this.getOriginContainerId().compareTo(other.getOriginContainerId()); + if (originContainerIdComparison == 0) { + return this.getTargetHost().compareTo(other.getTargetHost()); + } else { + return originContainerIdComparison; + } + } else { + return priorityComparison; + } + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 2d6f0f460d42c..35164b66ab618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -50,6 +51,14 @@ * locality to be loose (i.e. allows fall-through to rack or any) * or strict (i.e. specify hard constraint on resource allocation). * + *
  • + * A boolean isMove flag, defaulting to {@code false}. + * This flag is used only around the resource manager to indicate whether + * this resource request comes from a {@code ContainerMoveRequest}. + * If isMove is true, then originContainerId tells which + * container should be relocated according to that container move + * request. + *
  • * * * @see Resource @@ -373,6 +382,44 @@ public void setAllocationRequestId(long allocationRequestID) { throw new UnsupportedOperationException(); } + /** + * Gets whether this resource request is associated with a container relocation. + * @return whether this resource request is associated with a container relocation + */ + @Public + @Unstable + public abstract boolean getIsMove(); + + /** + * Sets whether this resource request is associated with a container relocation. + * @param isMove whether this resource request is associated with a container relocation + */ + @Public + @Unstable + public abstract void setIsMove(boolean isMove); + + /** + * Gets the origin container id for this resource request. + * The origin container id is set if and only if this resource request is associated + * with a container relocation. It identifies the container that should be relocated. + * + * @return the origin container id for this resource request + */ + @Public + @Unstable + public abstract ContainerId getOriginContainerId(); + + /** + * Sets the origin container id for this resource request. + * The origin container id should be set if and only if this resource request is associated + * with a container relocation. It identifies the container that should be relocated. + * + * @param originContainerId the origin container id for this resource request + */ + @Public + @Unstable + public abstract void setOriginContainerId(ContainerId originContainerId); + @Override public int hashCode() { final int prime = 2153; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1a30c3232e7c0..360d51f5a11a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1696,6 +1696,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; + public static final String + YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_RELOCATION_PROTOCOL = + "security.containerrelocation.protocol.acl"; public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONHISTORY_PROTOCOL = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7c53d2eff15bd..cac0c7d164723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -38,6 +38,7 @@ service ContainerManagementProtocolService { rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto); rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); rpc localize(ResourceLocalizationRequestProto) returns (ResourceLocalizationResponseProto); + rpc getContainerLaunchContext(GetContainerLaunchContextRequestProto) returns (GetContainerLaunchContextResponseProto); rpc reInitializeContainer(ReInitializeContainerRequestProto) returns (ReInitializeContainerResponseProto); rpc restartContainer(ContainerIdProto) returns (RestartContainerResponseProto); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fde30338..2b9920e6a64ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -95,6 +95,9 @@ message ContainerProto { optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; optional int64 allocation_request_id = 8 [default = -1]; optional int32 version = 9 [default = 0]; + optional bool isMove = 20 [default = false]; + optional ContainerIdProto origin_container_id = 21; + optional NodeIdProto origin_node_id = 22; } message ContainerReportProto { @@ -307,6 +310,12 @@ message ResourceRequestProto { optional int64 allocation_request_id = 8 [default = 0]; } +message ContainerMoveRequestProto { + optional PriorityProto priority = 1; + optional ContainerIdProto origin_container_id = 2; + optional string target_host = 3; +} + message ExecutionTypeRequestProto { optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED]; optional bool enforce_execution_type = 2 [default = false]; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6526bf97a3aa0..498ec8a6a7fb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -86,6 +86,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 6; + repeated ContainerMoveRequestProto move_ask = 20; } message NMTokenProto { @@ -274,6 +275,10 @@ message SignalContainerResponseProto { message StartContainerRequestProto { optional ContainerLaunchContextProto container_launch_context = 1; optional hadoop.common.TokenProto container_token = 2; + optional bool is_move = 20; + optional ContainerIdProto origin_container_id = 21; + optional NodeIdProto origin_node_id = 22; + optional hadoop.common.TokenProto origin_nm_token = 23; } message StartContainerResponseProto { @@ -356,6 +361,14 @@ message IncreaseContainersResourceResponseProto { repeated ContainerExceptionMapProto failed_requests = 2; } +message GetContainerLaunchContextRequestProto { + optional ContainerIdProto container_id = 1; +} + +message GetContainerLaunchContextResponseProto { + optional ContainerLaunchContextProto container_launch_context = 1; +} + ////////////////////////////////////////////////////// /////// Application_History_Protocol ///////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 2990c05130fa9..49e1e70e69163 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -429,6 +430,12 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu * @param req Resource request */ public abstract void addContainerRequest(T req); + + /** + * Request a container to be relocated before calling allocate + * @param req container move request to be sent to the resource manager + */ + public abstract void addContainerMoveRequest(ContainerMoveRequest req); /** * Remove previous container request. The previous container request may have diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 32216610612ae..56ba7687e9ee0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -162,6 +163,8 @@ static boolean canFit(Resource arg0, Resource arg1) { protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); + protected final Map moveAsk = new HashMap<>(); + protected final Map pendingMoveAsk = new HashMap<>(); protected final Set release = new TreeSet(); // pendingRelease holds history of release requests. // request is removed only if RM sends completedContainer. @@ -255,6 +258,8 @@ public AllocateResponse allocate(float progressIndicator) "Progress indicator should not be negative"); AllocateResponse allocateResponse = null; List askList = null; + Map oldMoveAsk = new HashMap<>(); + List moveAskList = null; List releaseList = null; AllocateRequest allocateRequest = null; List blacklistToAdd = new ArrayList(); @@ -264,12 +269,15 @@ public AllocateResponse allocate(float progressIndicator) try { synchronized (this) { askList = cloneAsks(); + oldMoveAsk.putAll(moveAsk); + moveAskList = cloneMoveAsks(); // Save the current change for recovery oldChange.putAll(change); List updateList = createUpdateList(); releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure ask.clear(); + moveAsk.clear(); release.clear(); change.clear(); @@ -282,7 +290,7 @@ public AllocateResponse allocate(float progressIndicator) allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, updateList); + askList, releaseList, blacklistRequest, updateList, moveAskList); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -306,6 +314,7 @@ public AllocateResponse allocate(float progressIndicator) addResourceRequestToAsk(reqIter.next().remoteRequest); } } + moveAsk.putAll(pendingMoveAsk); change.putAll(this.pendingChange); } // re register with RM @@ -325,6 +334,24 @@ public AllocateResponse allocate(float progressIndicator) if (allocateResponse.getAMRMToken() != null) { updateAMRMToken(allocateResponse.getAMRMToken()); } + if (!pendingMoveAsk.isEmpty()) { + List completed = + allocateResponse.getCompletedContainersStatuses(); + List moved = new ArrayList<>(); + for(Container container : allocateResponse.getAllocatedContainers()) { + if(container.getIsMove()) { + moved.add(container.getOriginContainerId()); + } + } + for (ContainerStatus status : completed) { + ContainerId containerId = status.getContainerId(); + pendingMoveAsk.remove(containerId); + } + // remove all container move requests that have been satisfied + if (!moved.isEmpty()) { + removePendingMoveRequests(moved); + } + } if (!pendingRelease.isEmpty() && !allocateResponse.getCompletedContainersStatuses().isEmpty()) { removePendingReleaseRequests(allocateResponse @@ -367,6 +394,22 @@ public AllocateResponse allocate(float progressIndicator) ask.add(oldAsk); } } + // Container move requests could have been added or deleted during call to + // allocate. If container move requests were added/removed then there is + // nothing to do since the ContainerMoveRequest object in moveAsk would have + // the actual new value. If moveAsk does not have this ContainerMoveRequest + // then it was unchanged and so we can add the value back safely. + // This assumes that there will no concurrent calls to allocate() and + // so we dont have to worry about moveAsk being changed in the + // synchronized block at the beginning of this method. + for (Map.Entry entry : + oldMoveAsk.entrySet()) { + ContainerId oldContainerId = entry.getKey(); + ContainerMoveRequest request = entry.getValue(); + if (!moveAsk.containsKey(oldContainerId)) { + moveAsk.put(oldContainerId, request); + } + } // change requests could have been added during the allocate call. // Those are the newest requests which take precedence // over requests cached in the oldChange map. @@ -424,6 +467,33 @@ private List cloneAsks() { } return askList; } + + private List cloneMoveAsks() { + List moveAskList = new ArrayList<>(moveAsk.size()); + for(ContainerMoveRequest mr : moveAsk.values()) { + // create a copy of ContainerMoveRequest as we might change it while the + // RPC layer is using it to send info across + moveAskList.add(ContainerMoveRequest.newInstance(mr.getPriority(), + mr.getOriginContainerId(), mr.getTargetHost())); + } + return moveAskList; + } + + protected void removePendingMoveRequests( + List movedContainers) { + for (ContainerId containerId : movedContainers) { + if (pendingMoveAsk.get(containerId) == null) { + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("RM has confirmed move request for " + + "container " + containerId + "." + + "Remove pending move request:" + + pendingMoveAsk.get(containerId)); + } + pendingMoveAsk.remove(containerId); + } + } protected void removePendingReleaseRequests( List completedContainersStatuses) { @@ -559,6 +629,13 @@ public synchronized void addContainerRequest(T req) { req.getExecutionTypeRequest(), req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression()); } + + @Override + public void addContainerMoveRequest(ContainerMoveRequest req) { + ContainerId containerId = req.getOriginContainerId(); + moveAsk.put(containerId, req); + pendingMoveAsk.put(containerId, req); + } @Override public synchronized void removeContainerRequest(T req) { @@ -612,6 +689,7 @@ public synchronized void releaseAssignedContainer(ContainerId containerId) { "ContainerId can not be null."); pendingRelease.add(containerId); release.add(containerId); + pendingMoveAsk.remove(containerId); pendingChange.remove(containerId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index dc92cda3d5a77..3a9709bdb473a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -172,14 +172,13 @@ private void addStartingContainer(StartedContainer startedContainer) + startedContainer.containerId.toString() + " is already started"); } } - - @Override - public Map startContainer( - Container container, ContainerLaunchContext containerLaunchContext) - throws YarnException, IOException { + + public Map startContainer(Container container, ContainerLaunchContext containerLaunchContext) + throws YarnException, IOException { // Do synchronization on StartedContainer to prevent race condition // between startContainer and stopContainer only when startContainer is // in progress for a given container. + StartedContainer startingContainer = new StartedContainer(container.getId(), container.getNodeId()); synchronized (startingContainer) { @@ -188,12 +187,24 @@ public Map startContainer( Map allServiceResponse; ContainerManagementProtocolProxyData proxy = null; try { + StartContainerRequest scRequest = null; + + if (container.getIsMove()) { + // If the container corresponds to a relocation request, get the token + // for the origin node and put it into the StartContainerRequest. We + // do not need any container launch context here. + NodeId originNodeId = container.getOriginNodeId(); + String containerManagerBindAddr = originNodeId.toString(); + Token token = getNMTokenCache().getToken(containerManagerBindAddr); + scRequest = StartContainerRequest.newInstance(container.getContainerToken(), + container.getOriginContainerId(), originNodeId, token); + } else { + scRequest = StartContainerRequest.newInstance(containerLaunchContext, + container.getContainerToken()); + } proxy = cmProxy.getProxy(container.getNodeId().toString(), container.getId()); - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - container.getContainerToken()); List list = new ArrayList(); list.add(scRequest); StartContainersRequest allRequests = @@ -209,6 +220,11 @@ public Map startContainer( } allServiceResponse = response.getAllServicesMetaData(); startingContainer.state = ContainerState.RUNNING; + if (container.getIsMove()) { + // After the relocation, the origin container will be shut down, + // so remove it from startedContainers + startedContainers.remove(container.getOriginContainerId()); + } } catch (YarnException | IOException e) { startingContainer.state = ContainerState.COMPLETE; // Remove the started container if it failed to start diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMContainerMove.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMContainerMove.java new file mode 100644 index 0000000000000..074d9b1cff8df --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMContainerMove.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.internal.runners.statements.Fail; + +/** + * Tests end-to-end workflow of container relocation + */ +public class TestNMContainerMove { + Configuration conf = null; + MiniYARNCluster yarnCluster = null; + YarnClientImpl yarnClient = null; + AMRMClientImpl rmClient = null; + NMClientImpl nmClient = null; + + int nodeCount = 3; + List nodeReports = null; + static NodeId node1; + static NodeId node2; + static NodeId node3; + static String rack1; + static String rack2; + static String rack3; + ApplicationAttemptId attemptId = null; + NMTokenCache nmTokenCache = null; + + @Before + public void setup() throws YarnException, IOException { + // start minicluster + conf = new YarnConfiguration(); + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + 13); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 4000); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager" + + ".scheduler.fifo.FifoScheduler"); + yarnCluster = + new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + assertNotNull(yarnCluster); + assertEquals(STATE.STARTED, yarnCluster.getServiceState()); + + // start rm client + yarnClient = (YarnClientImpl) YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + assertNotNull(yarnClient); + assertEquals(STATE.STARTED, yarnClient.getServiceState()); + + // get node info + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + node1 = nodeReports.get(0).getNodeId(); + node2 = nodeReports.get(1).getNodeId(); + node3 = nodeReports.get(2).getNodeId(); + rack1 = nodeReports.get(0).getRackName(); + rack2 = nodeReports.get(1).getRackName(); + rack3 = nodeReports.get(2).getRackName(); + + // submit new app + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Priority.newInstance(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + // unmanaged AM + appContext.setUnmanagedAM(true); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + int iterationsLeft = 30; + RMAppAttempt appAttempt = null; + while (iterationsLeft > 0) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == + YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = + yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + sleep(1000); + --iterationsLeft; + } + if (iterationsLeft == 0) { + fail("Application hasn't bee started"); + } + + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + + //creating an instance NMTokenCase + nmTokenCache = new NMTokenCache(); + + // start am rm client + rmClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + + //setting an instance NMTokenCase + rmClient.setNMTokenCache(nmTokenCache); + rmClient.init(conf); + rmClient.start(); + assertNotNull(rmClient); + assertEquals(STATE.STARTED, rmClient.getServiceState()); + + // start am nm client + nmClient = (NMClientImpl) NMClient.createNMClient(); + + //propagating the AMRMClient NMTokenCache instance + nmClient.setNMTokenCache(rmClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertNotNull(nmClient); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + } + + @After + public void tearDown() { + rmClient.stop(); + yarnClient.stop(); + yarnCluster.stop(); + } + + @Test(timeout = 200000) + public void testNMContainerMove() + throws YarnException, IOException { + // registering application master + rmClient.registerApplicationMaster("Host", 10000, ""); + + // reserving two regular containers with RM + Resource capability1 = Resource.newInstance(2048, 1); + Resource capability2 = Resource.newInstance(1024, 1); + String[] nodes = new String[]{node1.getHost()}; + String[] racks = new String[]{rack1}; + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + + ContainerRequest request1 = new ContainerRequest(capability1, nodes, racks, priority1); + ContainerRequest request2 = new ContainerRequest(capability2, nodes, racks, priority2); + + rmClient.addContainerRequest(request1); + rmClient.addContainerRequest(request2); + + List containers = allocate(2); + + // select the container with capability1 to be the origin container + Container originContainer = null; + for (Container container : containers) { + if (container.getResource().equals(capability1)) { + originContainer = container; + } + } + + // request the origin container to be replaced to another node of another host + // as the test normally runs in local mode, we do not automatically get a + // different target node, so loop until it happens + NodeId targetNodeId = originContainer.getNodeId(); + Container targetContainer = null; + while (originContainer.getNodeId().equals(targetNodeId)) { + ContainerMoveRequest moveRequest = ContainerMoveRequest.newInstance(priority1, + originContainer.getId(), targetNodeId.getHost()); + rmClient.addContainerMoveRequest(moveRequest); + targetContainer = allocate(1).get(0); + targetNodeId = targetContainer.getNodeId(); + } + + // start the two containers allocated before + for (Container container : containers) { + Credentials cred = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + cred.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + clc.setCommands(Collections.singletonList("sleep 1m")); + clc.setTokens(securityTokens); + try { + nmClient.startContainer(container, clc); + } catch (YarnException e) { + throw (AssertionError) + (new AssertionError("Exception is not expected: " + e).initCause(e)); + } + } + + int exitStatus1 = 0; + try { + exitStatus1 = nmClient.getContainerStatus( + originContainer.getId(), originContainer.getNodeId()).getExitStatus(); + // status of the origin container befor relocation + assertTrue(exitStatus1 == ContainerExitStatus.INVALID); + } catch (Exception e) { + fail("Unexpected status of the origin container before relocation: " + + exitStatus1); + } + + // do the actual container relocation + Credentials cred = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + cred.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + clc.setTokens(securityTokens); + try { + nmClient.startContainer(targetContainer, null); + } catch (YarnException e) { + throw (AssertionError) + (new AssertionError("Exception is not expected: " + e).initCause(e)); + } + + try { + // sleep until the container statuses catch up + Thread.sleep(1000); + exitStatus1 = nmClient.getContainerStatus( + originContainer.getId(), originContainer.getNodeId()).getExitStatus(); + // status of the origin container after relocation + // the container may stop succesfully before it can be killed + assertTrue(exitStatus1 == ContainerExitStatus.KILLED_BY_APPMASTER); + } catch (Exception e) { + fail("Unexpected status of the origin container after relocation: " + exitStatus1); + } + int exitStatus2 = 0; + try { + exitStatus2 = nmClient.getContainerStatus( + targetContainer.getId(), targetContainer.getNodeId()).getExitStatus(); + // status of the target container after relocation + assertTrue(exitStatus2 == ContainerExitStatus.INVALID || exitStatus2 == + ContainerExitStatus.SUCCESS); + } catch (Exception e) { + fail("Unexpected status of the target container after relocation: " + + exitStatus2); + } + + rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + // stop the running containers on close + nmClient.cleanupRunningContainersOnStop(true); + assertTrue(nmClient.getCleanupRunningContainers().get()); + nmClient.stop(); + } + + private List allocate(int expectedNumOfContainers) throws YarnException, IOException { + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 2; + List containers = new ArrayList(); + + while (allocatedContainerCount < expectedNumOfContainers + && iterationsLeft > 0) { + AllocateResponse allocResponse = rmClient.allocate(0.1f); + allocatedContainerCount += allocResponse.getAllocatedContainers().size(); + containers.addAll(allocResponse.getAllocatedContainers()); + if (!allocResponse.getNMTokens().isEmpty()) { + for (NMToken token : allocResponse.getNMTokens()) { + rmClient.getNMTokenCache().setToken(token.getNodeId().toString(), + token.getToken()); + } + } + if (allocatedContainerCount < expectedNumOfContainers) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(1000); + } + --iterationsLeft; + } + assertEquals(expectedNumOfContainers, allocatedContainerCount); + return containers; + } + + private void sleep(int sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMContainerMove.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMContainerMove.java new file mode 100644 index 0000000000000..b7e757c377444 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMContainerMove.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests container relocation logic between ApplicationMaster + * and ResourceManager + */ +public class TestRMContainerMove { + private static final Log LOG = LogFactory.getLog(TestRMContainerMove.class); + + static Configuration conf = null; + static MiniYARNCluster yarnCluster = null; + static YarnClient yarnClient = null; + static List nodeReports = null; + static ApplicationAttemptId attemptId = null; + static int nodeCount = 3; + + static final int rolling_interval_sec = 13; + static final long am_expire_ms = 4000; + + static Priority priority1; + static Priority priority2; + static Priority priority3; + static Resource capability1; + static Resource capability2; + static Resource capability3; + static String node1; + static String node2; + static String node3; + static String[] nodes1; + static String[] nodes2; + static String[] nodes3; + static String[] nodes; + static String rack1; + static String rack2; + static String rack3; + static String[] racks1; + static String[] racks2; + static String[] racks3; + static String[] racks; + + private final static int DEFAULT_ITERATION = 2; + + @BeforeClass + public static void setup() throws Exception { + // start minicluster + conf = new YarnConfiguration(); + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + rolling_interval_sec); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + // setting FifoScheduler as the resource scheduler as currently, container relocation is only + // implemented for FifoScheduler + conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager" + + ".scheduler.fifo.FifoScheduler"); + yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + priority1 = Priority.newInstance(1); + priority2 = Priority.newInstance(2); + priority3 = Priority.newInstance(3); + + capability1 = Resource.newInstance(512, 1); + capability2 = Resource.newInstance(1024, 1); + capability3 = Resource.newInstance(1536, 1); + + // get node info + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + + node1 = nodeReports.get(0).getNodeId().getHost(); + node2 = nodeReports.get(1).getNodeId().getHost(); + node3 = nodeReports.get(2).getNodeId().getHost(); + nodes1 = new String[]{node1}; + nodes2 = new String[]{node2}; + nodes3 = new String[]{node3}; + nodes = new String[]{node1, node2, node3}; + + rack1 = nodeReports.get(0).getRackName(); + rack2 = nodeReports.get(1).getRackName(); + rack3 = nodeReports.get(2).getRackName(); + racks1 = new String[]{rack1}; + racks2 = new String[]{rack2}; + racks3 = new String[]{rack3}; + racks = new String[]{rack1, rack2, rack3}; + } + + @Before + public void startApp() throws Exception { + // submit new app + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = + BuilderUtils.newContainerLaunchContext( + Collections.emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), null, + new HashMap()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + RMAppAttempt appAttempt; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = + yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); + } + + @After + public void cancelApp() throws YarnException, IOException { + yarnClient.killApplication(attemptId.getApplicationId()); + attemptId = null; + } + + @AfterClass + public static void tearDown() { + if (yarnClient != null && yarnClient.getServiceState() == Service.STATE.STARTED) { + yarnClient.stop(); + } + if (yarnCluster != null && yarnCluster.getServiceState() == Service.STATE.STARTED) { + yarnCluster.stop(); + } + } + + @Test(timeout = 60000) + public void testAMRMClientContainerMove() throws YarnException, IOException { + AMRMClientImpl amClient = null; + try { + // start am rm client and register the application master + amClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.moveAsk.size()); + assertEquals(0, amClient.pendingMoveAsk.size()); + + // allocating two regular containers + AMRMClient.ContainerRequest storedContainer1 = + new AMRMClient.ContainerRequest(capability1, nodes1, null, priority1); + AMRMClient.ContainerRequest storedContainer2 = + new AMRMClient.ContainerRequest(capability2, nodes2, null, priority2); + + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + assertEquals(6, amClient.ask.size()); + assertEquals(0, amClient.moveAsk.size()); + assertEquals(0, amClient.pendingMoveAsk.size()); + + List allocatedContainers = getAllocatedContainers(amClient, DEFAULT_ITERATION); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.moveAsk.size()); + assertEquals(0, amClient.pendingMoveAsk.size()); + + // relocating a container to another host + // In local mode, this host is the same as before, + // but it is not important here + ContainerId originContainerId = getContainerByCapability(capability1, allocatedContainers) + .getId(); + NodeId targetNodeId = nodeReports.get(0).getNodeId(); + ContainerMoveRequest moveRequest = ContainerMoveRequest.newInstance(priority1, + originContainerId, targetNodeId.getHost()); + + amClient.addContainerMoveRequest(moveRequest); + assertEquals(0, amClient.ask.size()); + assertEquals(1, amClient.moveAsk.size()); + assertEquals(1, amClient.pendingMoveAsk.size()); + + amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.moveAsk.size()); + assertEquals(1, amClient.pendingMoveAsk.size()); + + allocatedContainers = getAllocatedContainers(amClient, DEFAULT_ITERATION); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.moveAsk.size()); + assertEquals(0, amClient.pendingMoveAsk.size()); + + // select move responses + List moveResponses = new ArrayList(); + for (Container c : allocatedContainers) { + if (c.getIsMove()) { + moveResponses.add(c); + } + } + + // verify that relocation informations are set correctly in the move response + assertEquals(1, moveResponses.size()); + Container moveResponse = moveResponses.get(0); + assertEquals(targetNodeId.getHost(), moveResponse.getNodeId().getHost()); + assertEquals(originContainerId, moveResponse.getOriginContainerId()); + assertEquals(priority1, moveResponse.getPriority()); + assertEquals(capability1, moveResponse.getResource()); + assertNotEquals(originContainerId, moveResponse.getId()); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + } finally { + if (amClient != null && amClient.getServiceState() == Service.STATE.STARTED) { + amClient.stop(); + } + } + } + + private List getAllocatedContainers( + AMRMClientImpl amClient, int iterationsLeft) + throws YarnException, IOException { + List allocatedContainers = new ArrayList(); + while (iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.moveAsk.size()); + + assertEquals(nodeCount, amClient.getClusterNodeCount()); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + if (allocatedContainers.isEmpty()) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + return allocatedContainers; + } + + private void sleep(int sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private Container getContainerByCapability(Resource capability, List containers) { + for (Container c : containers) { + if (c.getResource().equals(capability)) { + return c; + } + } + throw new RuntimeException("Container with capability " + capability + " not found"); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 873dcb780bc0e..c415de7da7031 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; @@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerLaunchContextRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerLaunchContextResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; @@ -71,6 +75,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerLaunchContextRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto; @@ -202,6 +207,20 @@ public ResourceLocalizationResponse localize( return null; } } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + GetContainerLaunchContextRequestProto requestProto = + ((GetContainerLaunchContextRequestPBImpl)request).getProto(); + try { + return new GetContainerLaunchContextResponsePBImpl( + proxy.getContainerLaunchContext(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index fb8eead247b42..ffb2e89d9a1a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; @@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerLaunchContextRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerLaunchContextResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; @@ -57,10 +60,13 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CommitResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerLaunchContextRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerLaunchContextResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto; @@ -73,7 +79,6 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.CommitResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -176,6 +181,23 @@ public ResourceLocalizationResponseProto localize(RpcController controller, throw new ServiceException(e); } } + + @Override + public GetContainerLaunchContextResponseProto getContainerLaunchContext( + RpcController controller, GetContainerLaunchContextRequestProto proto) throws + ServiceException { + GetContainerLaunchContextRequestPBImpl request = + new GetContainerLaunchContextRequestPBImpl(proto); + try { + GetContainerLaunchContextResponse response = + real.getContainerLaunchContext(request); + return ((GetContainerLaunchContextResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } @Override public ReInitializeContainerResponseProto reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 0f0f57103320d..b014d8fb0f362 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -27,14 +26,17 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerMoveRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerMoveRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; @@ -54,6 +56,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List release = null; private List updateRequests = null; private ResourceBlacklistRequest blacklistRequest = null; + private List moveAsk = null; public AllocateRequestPBImpl() { builder = AllocateRequestProto.newBuilder(); @@ -104,6 +107,9 @@ private void mergeLocalToBuilder() { if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } + if (this.moveAsk != null) { + addMoveAsksToProto(); + } } private void mergeLocalToProto() { @@ -358,6 +364,69 @@ public void remove() { }; builder.addAllRelease(iterable); } + + @Override + public List getMoveAskList() { + initMoveAsks(); + return this.moveAsk; + } + + @Override + public void setMoveAskList(final List containerMoveRequests) { + if(containerMoveRequests == null) { + return; + } + initMoveAsks(); + this.moveAsk.clear(); + this.moveAsk.addAll(containerMoveRequests); + } + + private void initMoveAsks() { + if (this.moveAsk != null) { + return; + } + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getMoveAskList(); + this.moveAsk = new ArrayList(); + + for (ContainerMoveRequestProto c : list) { + this.moveAsk.add(convertFromProtoFormat(c)); + } + } + + private void addMoveAsksToProto() { + maybeInitBuilder(); + builder.clearMoveAsk(); + if (moveAsk == null) + return; + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = moveAsk.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerMoveRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllMoveAsk(iterable); + } private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) { return new ResourceRequestPBImpl(p); @@ -392,4 +461,12 @@ private ResourceBlacklistRequestPBImpl convertFromProtoFormat(ResourceBlacklistR private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) { return ((ResourceBlacklistRequestPBImpl)t).getProto(); } + + private ContainerMoveRequestPBImpl convertFromProtoFormat(ContainerMoveRequestProto p) { + return new ContainerMoveRequestPBImpl(p); + } + + private ContainerMoveRequestProto convertToProtoFormat(ContainerMoveRequest t) { + return ((ContainerMoveRequestPBImpl)t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextRequestPBImpl.java new file mode 100644 index 0000000000000..d8d3b97bed362 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextRequestPBImpl.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import static org.apache.hadoop.yarn.proto.YarnServiceProtos + .GetContainerLaunchContextRequestProtoOrBuilder; + +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerLaunchContextRequestProto; + +import com.google.protobuf.TextFormat; + +public class GetContainerLaunchContextRequestPBImpl extends GetContainerLaunchContextRequest { + GetContainerLaunchContextRequestProto + proto = GetContainerLaunchContextRequestProto + .getDefaultInstance(); + GetContainerLaunchContextRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + + public GetContainerLaunchContextRequestPBImpl() { + builder = GetContainerLaunchContextRequestProto.newBuilder(); + } + + public GetContainerLaunchContextRequestPBImpl(GetContainerLaunchContextRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetContainerLaunchContextRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetContainerLaunchContextRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ContainerId getContainerId() { + if (this.containerId != null) { + return this.containerId; + } + GetContainerLaunchContextRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextResponsePBImpl.java new file mode 100644 index 0000000000000..53d8d0d1ee594 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetContainerLaunchContextResponsePBImpl.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; +import static org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerLaunchContextResponseProto; + +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos + .GetContainerLaunchContextResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +public class GetContainerLaunchContextResponsePBImpl extends GetContainerLaunchContextResponse { + GetContainerLaunchContextResponseProto proto = + GetContainerLaunchContextResponseProto + .getDefaultInstance(); + GetContainerLaunchContextResponseProto.Builder builder = null; + boolean viaProto = false; + + private ContainerLaunchContext containerLaunchContext = null; + + public GetContainerLaunchContextResponsePBImpl() { + builder = GetContainerLaunchContextResponseProto.newBuilder(); + } + + public GetContainerLaunchContextResponsePBImpl(GetContainerLaunchContextResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetContainerLaunchContextResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containerLaunchContext != null) { + builder.setContainerLaunchContext(convertToProtoFormat(this.containerLaunchContext)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetContainerLaunchContextResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ContainerLaunchContext getContainerLaunchContext() { + GetContainerLaunchContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerLaunchContext != null) { + return this.containerLaunchContext; + } + if (!p.hasContainerLaunchContext()) { + return null; + } + this.containerLaunchContext = convertFromProtoFormat(p.getContainerLaunchContext()); + return this.containerLaunchContext; + } + + @Override + public void setContainerLaunchContext(ContainerLaunchContext containerLaunchContext) { + maybeInitBuilder(); + if (containerLaunchContext == null) + builder.clearContainerLaunchContext(); + this.containerLaunchContext = containerLaunchContext; + } + + private ContainerLaunchContextPBImpl convertFromProtoFormat(ContainerLaunchContextProto p) { + return new ContainerLaunchContextPBImpl(p); + } + + private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) { + return ((ContainerLaunchContextPBImpl) t).getProto(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerRequestPBImpl.java index c1cd0ebbfc2d4..68a914d4af12a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerRequestPBImpl.java @@ -18,16 +18,21 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProtoOrBuilder; @@ -43,6 +48,9 @@ public class StartContainerRequestPBImpl extends StartContainerRequest { private ContainerLaunchContext containerLaunchContext = null; private Token containerToken = null; + private ContainerId originContainerId = null; + private NodeId originNodeId = null; + private Token originNMToken = null; public StartContainerRequestPBImpl() { builder = StartContainerRequestProto.newBuilder(); @@ -87,6 +95,15 @@ private void mergeLocalToBuilder() { if(this.containerToken != null) { builder.setContainerToken(convertToProtoFormat(this.containerToken)); } + if(this.originContainerId != null) { + builder.setOriginContainerId(convertToProtoFormat(this.originContainerId)); + } + if(this.originNodeId != null) { + builder.setOriginNodeId(convertToProtoFormat(this.originNodeId)); + } + if(this.originNMToken != null) { + builder.setOriginNmToken(convertToProtoFormat(this.originNMToken)); + } } private void mergeLocalToProto() { @@ -147,7 +164,85 @@ public void setContainerToken(Token containerToken) { } this.containerToken = containerToken; } - + + @Override + public boolean getIsMove() { + StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getIsMove(); + } + + @Override + public void setIsMove(boolean isMove) { + maybeInitBuilder(); + builder.setIsMove(isMove); + } + + @Override + public ContainerId getOriginContainerId() { + StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.originContainerId != null) { + return this.originContainerId; + } + if (!p.hasOriginContainerId()) { + return null; + } + this.originContainerId = convertFromProtoFormat(p.getOriginContainerId()); + return this.originContainerId; + } + + @Override + public void setOriginContainerId(ContainerId originContainerId) { + maybeInitBuilder(); + if(originContainerId == null) { + builder.clearOriginContainerId(); + } + this.originContainerId = originContainerId; + } + + @Override + public NodeId getOriginNodeId() { + StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.originNodeId != null) { + return this.originNodeId; + } + if (!p.hasOriginNodeId()) { + return null; + } + this.originNodeId = convertFromProtoFormat(p.getOriginNodeId()); + return this.originNodeId; + } + + @Override + public void setOriginNodeId(NodeId originNodeId) { + maybeInitBuilder(); + if(originNodeId == null) { + builder.clearOriginNodeId(); + } + this.originNodeId = originNodeId; + } + + @Override + public Token getOriginNMToken() { + StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.originNMToken != null) { + return this.originNMToken; + } + if (!p.hasOriginNmToken()) { + return null; + } + this.originNMToken = convertFromProtoFormat(p.getOriginNmToken()); + return this.originNMToken; + } + + @Override + public void setOriginNMToken(Token originNMToken) { + maybeInitBuilder(); + if(originNMToken == null) { + builder.clearOriginNmToken(); + } + this.originNMToken = originNMToken; + } + private ContainerLaunchContextPBImpl convertFromProtoFormat(ContainerLaunchContextProto p) { return new ContainerLaunchContextPBImpl(p); } @@ -155,9 +250,7 @@ private ContainerLaunchContextPBImpl convertFromProtoFormat(ContainerLaunchConte private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) { return ((ContainerLaunchContextPBImpl)t).getProto(); } - - - + private TokenPBImpl convertFromProtoFormat(TokenProto containerProto) { return new TokenPBImpl(containerProto); } @@ -165,4 +258,19 @@ private TokenPBImpl convertFromProtoFormat(TokenProto containerProto) { private TokenProto convertToProtoFormat(Token container) { return ((TokenPBImpl)container).getProto(); } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + + private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) { + return new NodeIdPBImpl(p); + } + + private NodeIdProto convertToProtoFormat(NodeId t) { + return ((NodeIdPBImpl)t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerMoveRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerMoveRequestPBImpl.java new file mode 100644 index 0000000000000..ab66019932612 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerMoveRequestPBImpl.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerMoveRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerMoveRequestProtoOrBuilder; + +@Private +@Unstable +public class ContainerMoveRequestPBImpl extends ContainerMoveRequest { + + ContainerMoveRequestProto proto = ContainerMoveRequestProto.getDefaultInstance(); + ContainerMoveRequestProto.Builder builder = null; + boolean viaProto = false; + + private Priority priority = null; + private ContainerId originContainerId = null; + + public ContainerMoveRequestPBImpl() { + builder = ContainerMoveRequestProto.newBuilder(); + } + + public ContainerMoveRequestPBImpl(ContainerMoveRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerMoveRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.priority != null) { + builder.setPriority(convertToProtoFormat(this.priority)); + } + if (this.originContainerId != null) { + builder.setOriginContainerId(convertToProtoFormat(this.originContainerId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerMoveRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public Priority getPriority() { + ContainerMoveRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.priority != null) { + return this.priority; + } + if (!p.hasPriority()) { + return null; + } + this.priority = convertFromProtoFormat(p.getPriority()); + return this.priority; + } + + @Override + public void setPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) + builder.clearPriority(); + this.priority = priority; + } + + @Override + public ContainerId getOriginContainerId() { + ContainerMoveRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.originContainerId != null) { + return this.originContainerId; + } + if (!p.hasOriginContainerId()) { + return null; + } + this.originContainerId = convertFromProtoFormat(p.getOriginContainerId()); + return this.originContainerId; + } + + @Override + public void setOriginContainerId(ContainerId originContainerId) { + maybeInitBuilder(); + if (priority == null) + builder.clearOriginContainerId(); + this.originContainerId = originContainerId; + } + + @Override + public String getTargetHost() { + ContainerMoveRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasTargetHost()) { + return null; + } + return (p.getTargetHost()); + } + + @Override + public void setTargetHost(String targetHost) { + maybeInitBuilder(); + if (targetHost == null) + builder.clearTargetHost(); + builder.setTargetHost(targetHost); + } + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl) t).getProto(); + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + @Override + public String toString() { + return "{Priority: " + getPriority() + + ", OriginContainerId: " + getOriginContainerId() + + ", TargetHost: " + getTargetHost() + "}"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 6bf653dd00ab8..206088aa68bd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -49,7 +49,11 @@ public class ContainerPBImpl extends Container { private Resource resource = null; private Priority priority = null; private Token containerToken = null; - + + private ContainerId originContainerId = null; + private NodeId originNodeId = null; + + public ContainerPBImpl() { builder = ContainerProto.newBuilder(); } @@ -108,6 +112,16 @@ private void mergeLocalToBuilder() { builder.getContainerToken())) { builder.setContainerToken(convertToProtoFormat(this.containerToken)); } + if (this.originContainerId != null + && !((ContainerIdPBImpl) originContainerId).getProto().equals( + builder.getOriginContainerId())) { + builder.setOriginContainerId(convertToProtoFormat(this.originContainerId)); + } + if (this.originNodeId != null + && !((NodeIdPBImpl) originNodeId).getProto().equals( + builder.getOriginNodeId())) { + builder.setOriginNodeId(convertToProtoFormat(this.originNodeId)); + } } private void mergeLocalToProto() { @@ -285,7 +299,61 @@ public void setVersion(int version) { maybeInitBuilder(); builder.setVersion(version); } - + + @Override + public boolean getIsMove() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getIsMove(); + } + + @Override + public void setIsMove(boolean isMove) { + maybeInitBuilder(); + builder.setIsMove(isMove); + } + + @Override + public ContainerId getOriginContainerId() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + if (this.originContainerId != null) { + return this.originContainerId; + } + if (!p.hasOriginContainerId()) { + return null; + } + this.originContainerId = convertFromProtoFormat(p.getOriginContainerId()); + return this.originContainerId; + } + + @Override + public void setOriginContainerId(ContainerId originContainerId) { + maybeInitBuilder(); + if (originContainerId == null) + builder.clearOriginContainerId(); + this.originContainerId = originContainerId; + } + + @Override + public NodeId getOriginNodeId() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + if (this.originNodeId != null) { + return this.originNodeId; + } + if (!p.hasOriginNodeId()) { + return null; + } + this.originNodeId = convertFromProtoFormat(p.getOriginNodeId()); + return this.originNodeId; + } + + @Override + public void setOriginNodeId(NodeId originNodeId) { + maybeInitBuilder(); + if (originNodeId == null) + builder.clearOriginNodeId(); + this.originNodeId = originNodeId; + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -348,6 +416,11 @@ public String toString() { sb.append("Priority: ").append(getPriority()).append(", "); sb.append("Token: ").append(getContainerToken()).append(", "); sb.append("ExecutionType: ").append(getExecutionType()).append(", "); + sb.append("IsMove: ").append(getIsMove()).append(", "); + if (getIsMove()) { + sb.append("OriginContainerId: ").append(getOriginContainerId()).append(", "); + sb.append("OriginNodeId: ").append(getOriginNodeId()).append(", "); + } sb.append("]"); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 9890296acddd2..3acfd51ca7183 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -41,6 +42,8 @@ public class ResourceRequestPBImpl extends ResourceRequest { private Resource capability = null; private ExecutionTypeRequest executionTypeRequest = null; + private boolean isMove = false; + private ContainerId originContainerId = null; public ResourceRequestPBImpl() { builder = ResourceRequestProto.newBuilder(); @@ -203,6 +206,26 @@ public void setAllocationRequestId(long allocationRequestID) { maybeInitBuilder(); builder.setAllocationRequestId(allocationRequestID); } + + @Override + public boolean getIsMove() { + return isMove; + } + + @Override + public void setIsMove(boolean isMove) { + this.isMove = isMove; + } + + @Override + public ContainerId getOriginContainerId() { + return originContainerId; + } + + @Override + public void setOriginContainerId(ContainerId originContainerId) { + this.originContainerId = originContainerId; + } private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 90c7573a0f343..67d0d932e558d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,10 +33,12 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; @@ -211,6 +213,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java index f97f7c74df76e..81f0f0eed3540 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java @@ -27,6 +27,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; @@ -192,6 +194,13 @@ public SignalContainerResponse signalToContainer( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index e5d159b2ad6a1..f28e7f223d5b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -34,10 +34,12 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; @@ -360,6 +362,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e8c2b7552ca33..8c53d0e482a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -39,6 +39,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; @@ -69,9 +71,11 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -144,6 +148,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -155,6 +160,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -861,8 +867,16 @@ public StartContainersResponse startContainers( .equals(ContainerType.APPLICATION_MASTER)) { this.getAMRMProxyService().processApplicationStartRequest(request); } - performContainerPreStartChecks(nmTokenIdentifier, request, - containerTokenIdentifier); + if(!request.getIsMove()) { + performContainerPreStartChecks(nmTokenIdentifier, request, + containerTokenIdentifier); + } else { + // if the request does not have a container launch context, + // just authorize the container start and update the NMToken + authorizeStartAndResourceIncreaseRequest( + nmTokenIdentifier, containerTokenIdentifier, true); + updateNMTokenIdentifier(nmTokenIdentifier); + } startContainerInternal(containerTokenIdentifier, request); succeededContainers.add(containerId); } catch (YarnException e) { @@ -966,10 +980,25 @@ protected void startContainerInternal( ContainerId containerId = containerTokenIdentifier.getContainerID(); String containerIdStr = containerId.toString(); String user = containerTokenIdentifier.getApplicationSubmitter(); + ContainerManagementProtocol proxy = null; LOG.info("Start request for " + containerIdStr + " by user " + user); - - ContainerLaunchContext launchContext = request.getContainerLaunchContext(); + + ContainerLaunchContext launchContext = null; + Configuration conf = null; + YarnRPC rpc = null; + if(request.getIsMove()) { + // In case of a relocation request, the launch context is borrowed from the origin container + conf = getConfig(); + rpc = YarnRPC.create(conf); + // get the proxy to the origin NM + proxy = getCMClient(containerId, request.getOriginNodeId(), conf, rpc, request.getOriginNMToken()); + // request the launch context from the origin NM + launchContext = proxy.getContainerLaunchContext(GetContainerLaunchContextRequest.newInstance( + request.getOriginContainerId())).getContainerLaunchContext(); + } else { + launchContext = request.getContainerLaunchContext(); + } Credentials credentials = YarnServerSecurityUtils.parseCredentials(launchContext); @@ -1044,12 +1073,23 @@ protected void startContainerInternal( // launch. A finished Application will not launch containers. metrics.launchedContainer(); metrics.allocateContainer(containerTokenIdentifier.getResource()); + LOG.debug("Started container " + containerId + " on node " + context.getNodeId() + + " with commands " + launchContext.getCommands()); + // In case of a relocation request, shut down the origin container + if(request.getIsMove()) { + proxy.stopContainers(StopContainersRequest.newInstance(Collections.singletonList(request + .getOriginContainerId()))); + } } else { throw new YarnException( "Container start failed as the NodeManager is " + "in the process of shutting down"); } } finally { + // stop the proxy to the origin NM + if(proxy != null) { + rpc.stopProxy(proxy, conf); + } this.readLock.unlock(); } } @@ -1129,6 +1169,24 @@ public IncreaseContainersResourceResponse increaseContainersResource( return IncreaseContainersResourceResponse.newInstance( successfullyIncreasedContainers, failedContainers); } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + ContainerId containerId = request.getContainerId(); + // Getting the launch context of the origin container locally + ContainerLaunchContext launchContext = context.getContainers().get(containerId) + .getLaunchContext(); + LOG.debug("Got launch context of container " + containerId + " on node " + context.getNodeId()); + // We copy the launch context in order to avoid ConcurrentModificationException + ContainerLaunchContext launchContextCopy; + try { + launchContextCopy = launchContext.copy(); + } catch (Exception e) { + throw new YarnException("Unable to copy container launch context"); + } + return GetContainerLaunchContextResponse.newInstance(launchContextCopy); + } @SuppressWarnings("unchecked") private void changeContainerResourceInternal(ContainerId containerId, @@ -1242,6 +1300,7 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) authorizeGetAndStopContainerRequest(id, container, true, identifier); stopContainerInternal(id); succeededRequests.add(id); + LOG.debug("Stopped container " + id + " on node " + context.getNodeId()); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); } @@ -1683,4 +1742,27 @@ private void internalSignalToContainer(SignalContainerRequest request, LOG.info("Container " + containerId + " no longer exists"); } } + + /** + * Sets up an NM Client to the specified NM; uses the given token for authentication + */ + private ContainerManagementProtocol getCMClient(ContainerId containerId, NodeId + originNodeId, Configuration conf, YarnRPC rpc, Token token) { + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + UserGroupInformation ugi = null; + try { + ugi = + UserGroupInformation.createRemoteUser(containerId + .getApplicationAttemptId().toString()); + } catch (Exception e) { + throw new RuntimeException(e); + } + InetSocketAddress serverAddress = NetUtils.createSocketAddrForHost(originNodeId.getHost(), + originNodeId.getPort()); + org.apache.hadoop.security.token.Token nmToken = + ConverterUtils.convertFromYarn(token, serverAddress); + ugi.addToken(nmToken); + return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi, rpc, serverAddress); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba23eebfb..5811a36f731e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security .AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -528,10 +529,22 @@ public AllocateResponse allocate(AllocateRequest request) " state, ignore container allocate request."); allocation = EMPTY_ALLOCATION; } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); + // Currently, container relocation logic is implemented only for FifoScheduler, + // hence we check the type of the scheduler in use + if (rScheduler instanceof FifoScheduler) { + // TODO normalize and validate moveAsk + // Sending the container move requests with the allocate heartbeat + allocation = + ((FifoScheduler) this.rScheduler).allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs, + request.getMoveAskList()); + } else { + allocation = + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs); + } } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index aa7ad500a49d1..6c7c4a461c00e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; @@ -69,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; + +import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -251,6 +254,53 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, writeLock.unlock(); } } + + /** + * Converts all container move requests into resource requests and add them to the ask list. + * The created resource requests are flagged as move requests and carry information about + * the original move requests. + * + * @param moveAsk the list of container move requests to convert and append to the ask list + * @param ask the ask list of the current allocation + */ + public void convertAndAppendMoveAsk(List moveAsk, List ask) { + for (ContainerMoveRequest moveRequest : moveAsk) { + String host = moveRequest.getTargetHost(); + ask.add(convertMoveRequest(moveRequest, host, true)); + ask.add(convertMoveRequest(moveRequest, resolveRack(host), false)); + ask.add(convertMoveRequest(moveRequest, ResourceRequest.ANY, false)); + } + } + + /** + * Converts a container move request to a resource request with the given locality constraint + */ + private ResourceRequest convertMoveRequest(ContainerMoveRequest moveRequest, String location, + boolean relaxLocality) { + ContainerId originContainerId = moveRequest.getOriginContainerId(); + RMContainer rmContainer = getRMContainer(originContainerId); + ResourceRequest request = ResourceRequest.newInstance(moveRequest.getPriority(), location, + rmContainer.getContainer().getResource(), 1, relaxLocality); + request.setIsMove(true); + request.setOriginContainerId(originContainerId); + return request; + } + + /** + * Resolves the rack for a given host. + */ + private String resolveRack(String host) { + String rack; + if (host != null) { + rack = RackResolver.resolve(host).getNetworkLocation(); + } else { + rack = null; + } + if (rack == null) { + LOG.warn("Failed to resolve rack for node " + host + "."); + } + return rack; + } public boolean unreserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index e9ffd09eca276..ccca9fc4ee26c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerMoveRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -384,6 +385,74 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, headroom, null, null, null, application.pullUpdatedNMTokens()); } } + + /** + * This is the version of allocate that handles container relocation requests + */ + public Allocation allocate(ApplicationAttemptId applicationAttemptId, + List ask, List release, + List blacklistAdditions, List blacklistRemovals, + List increaseRequests, + List decreaseRequests, + List moveAsk) { + FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); + if (application == null) { + LOG.error("Calling allocate on removed " + + "or non existant application " + applicationAttemptId); + return EMPTY_ALLOCATION; + } + + // Sanity check + SchedulerUtils.normalizeRequests(ask, resourceCalculator, + getClusterResource(), minimumAllocation, + getMaximumResourceCapability()); + + // Release containers + releaseContainers(release, application); + + synchronized (application) { + + // make sure we aren't stopping/removing the application + // when the allocate comes in + if (application.isStopped()) { + LOG.info("Calling allocate on a stopped " + + "application " + applicationAttemptId); + return EMPTY_ALLOCATION; + } + + // In order to handle ask and moveAsk uniformly, convert all container move requests + // into resource requests + application.convertAndAppendMoveAsk(moveAsk, ask); + + if (!ask.isEmpty()) { + LOG.debug("allocate: pre-update" + + " applicationId=" + applicationAttemptId + + " application=" + application); + application.showRequests(); + + // Update application requests + application.updateResourceRequests(ask); + + LOG.debug("allocate: post-update" + + " applicationId=" + applicationAttemptId + + " application=" + application); + application.showRequests(); + + LOG.debug("allocate:" + + " applicationId=" + applicationAttemptId + + " #ask=" + ask.size()); + } + + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + + Resource headroom = application.getHeadroom(); + application.setApplicationHeadroomForMetrics(headroom); + List newlyAllocatedContainers = application.pullNewlyAllocatedContainers(); + System.out.println(); + return new Allocation(newlyAllocatedContainers, + headroom, null, null, null, application.pullUpdatedNMTokens()); + } + } private FiCaSchedulerNode getNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); @@ -715,6 +784,15 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application schedulerKey.getPriority(), null, schedulerKey.getAllocationRequestId()); + // handle resource requests that were generated from container move requests, so that the + // allocated container is aware of the container relocation + if(request.getIsMove()) { + container.setIsMove(request.getIsMove()); + container.setOriginContainerId(request.getOriginContainerId()); + container.setOriginNodeId( + application.getRMContainer(request.getOriginContainerId()).getAllocatedNode()); + } + // Allocate! // Inform the application diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 2fc4b3f4696fb..f5a5a9f9f0ebd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; @@ -337,6 +339,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index b3d4d344294cd..b1376731abcc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -42,10 +42,12 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -182,6 +184,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 9a46e011dda22..5e3016fddbc22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -34,10 +34,12 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerLaunchContextResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; @@ -169,6 +171,13 @@ public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { return null; } + + @Override + public GetContainerLaunchContextResponse getContainerLaunchContext( + GetContainerLaunchContextRequest request) throws YarnException, IOException { + throw new UnsupportedOperationException("getting the container launch context is not " + + "supported for this implementation of ContainerManagementProtocol"); + } @Override public ReInitializeContainerResponse reInitializeContainer(