Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3075,6 +3075,10 @@ public static boolean isAclEnabled(Configuration conf) {
+ "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;

public static final String AMRM_PROXY_WAIT_UAM_REGISTER_DONE =
NM_PREFIX + "amrmproxy.wait.uam-register.done";
public static final boolean DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE = false;

public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8049;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5354,6 +5354,16 @@
<value></value>
</property>

<property>
<description>
Whether we wait for uam registration to complete.
The default value is false. If we set it to true,
the UAM needs to be registered before attempting to allocate a container.
</description>
<name>yarn.nodemanager.amrmproxy.wait.uam-register.done</name>
<value>false</value>
</property>

<property>
<description>
YARN Federation supports Non-HA mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server;

import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
Expand Down Expand Up @@ -183,7 +184,7 @@
* change the implementation with care.
*/
public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol, Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(MockResourceManagerFacade.class);
Expand Down Expand Up @@ -967,4 +968,9 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq
public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
return applicationContainerIdMap;
}

@Override
public void close() throws IOException {
LOG.info("MockResourceManagerFacade Close.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;

private boolean waitUamRegisterDone;

private MonotonicClock clock = new MonotonicClock();

/**
Expand Down Expand Up @@ -353,6 +355,8 @@ public void init(AMRMProxyApplicationContext appContext) {
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
}

@Override
Expand Down Expand Up @@ -1332,6 +1336,18 @@ public void run() {
});
this.uamRegisterFutures.put(scId, future);
}

if (this.waitUamRegisterDone) {
for (Map.Entry<SubClusterId, Future<?>> entry : this.uamRegisterFutures.entrySet()) {
SubClusterId subClusterId = entry.getKey();
Future<?> future = entry.getValue();
while (!future.isDone()) {
LOG.info("subClusterId {} Wait Uam Register done.", subClusterId);
}
}
}


return newSubClusters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
Expand Down Expand Up @@ -178,6 +179,9 @@ protected YarnConfiguration createConfiguration() {
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);

// Wait UAM Register Down
conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);

return conf;
}

Expand Down Expand Up @@ -593,6 +597,10 @@ public Object run() throws Exception {
interceptor.recover(recoveredDataMap);

Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());

// Waiting for SC-1 to time out.
GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);

// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());

Expand Down Expand Up @@ -851,7 +859,7 @@ public Object run() throws Exception {
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
LOG.info("Allocated container " + c.getId());
LOG.info("Allocated container {}", c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());

Expand Down Expand Up @@ -885,6 +893,10 @@ public Object run() throws Exception {
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());

// Waiting for SC-1 to time out.
GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);

// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
Expand Down