Skip to content

Commit 3f08a62

Browse files
author
chaosju
committed
YARN-11110. An implementation for using CGroups to control the number of the process in container
1 parent d5cba5c commit 3f08a62

File tree

6 files changed

+334
-2
lines changed

6 files changed

+334
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,18 @@ public static boolean isAclEnabled(Configuration conf) {
20182018
public static final long DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
20192019
10000000000L;
20202020

2021+
/** Enable switch for container process number monitoring. */
2022+
public static final String NM_CONTAINER_PROCESS_MONITOR_ENABLED =
2023+
NM_PREFIX + "container-process-monitor.enable";
2024+
public static final boolean
2025+
DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED= false;
2026+
2027+
/** The max process number limit for a single container. */
2028+
public static final String NM_CONTAINER_PROCESS_MAX_LIMIT_NUM =
2029+
NM_PREFIX + "container-process-monitor.max-limit-num";
2030+
public static final int DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT =
2031+
10000;
2032+
20212033
/** Enable/disable container metrics. */
20222034
@Private
20232035
public static final String NM_CONTAINER_METRICS_ENABLE =

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ enum CGroupController {
4646
CPUACCT("cpuacct"),
4747
CPUSET("cpuset"),
4848
FREEZER("freezer"),
49-
DEVICES("devices");
49+
DEVICES("devices"),
50+
PIDS("pids");
5051

5152
private final String name;
5253

@@ -84,7 +85,7 @@ public static Set<String> getValidCGroups() {
8485
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
8586
String CGROUP_NO_LIMIT = "-1";
8687
String UNDER_OOM = "under_oom 1";
87-
88+
String CGROUP_PIDS_MAX = "max";
8889

8990
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
9091
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
27+
import org.apache.hadoop.yarn.api.records.ContainerId;
28+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
29+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
30+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
31+
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
32+
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
/**
37+
* An implementation for using CGroups to control the number of the process in container.
38+
*
39+
* The process number controller is used to allow a cgroup hierarchy to stop any
40+
* new tasks from being fork()'d or clone()'d after a certain limit is reached.
41+
* @see <a href="https://www.kernel.org/doc/Documentation/cgroup-v1/pids.txt">PIDS</a>
42+
*/
43+
44+
@InterfaceStability.Unstable
45+
@InterfaceAudience.Private
46+
public class CGroupsPidsResourceHandlerImpl implements PidsResourceHandler {
47+
48+
static final Log LOG = LogFactory.getLog(CGroupsPidsResourceHandlerImpl.class);
49+
50+
private CGroupsHandler cGroupsHandler;
51+
private static final CGroupsHandler.CGroupController PIDS = CGroupsHandler.CGroupController.PIDS;
52+
private int processMaxCount;
53+
54+
CGroupsPidsResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
55+
this.cGroupsHandler = cGroupsHandler;
56+
}
57+
58+
@Override
59+
public List<PrivilegedOperation> bootstrap(Configuration conf)
60+
throws ResourceHandlerException {
61+
this.cGroupsHandler.initializeCGroupController(PIDS);
62+
processMaxCount =
63+
conf.getInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM,
64+
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT);
65+
if (processMaxCount < 0){
66+
throw new ResourceHandlerException(
67+
"Illegal value '" + processMaxCount + "' "
68+
+ YarnConfiguration.
69+
NM_CONTAINER_PROCESS_MAX_LIMIT_NUM
70+
+ ". Value must be positive number.");
71+
}
72+
LOG.info("Maximum number of processes is " + processMaxCount);
73+
74+
return null;
75+
}
76+
77+
@Override
78+
public List<PrivilegedOperation> preStart(Container container)
79+
throws ResourceHandlerException {
80+
81+
String cgroupId = container.getContainerId().toString();
82+
cGroupsHandler.createCGroup(PIDS, cgroupId);
83+
try {
84+
cGroupsHandler.updateCGroupParam(PIDS, cgroupId,
85+
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(processMaxCount));
86+
} catch (ResourceHandlerException re) {
87+
cGroupsHandler.deleteCGroup(PIDS, cgroupId);
88+
LOG.error("Could not update cgroup for container", re);
89+
throw re;
90+
}
91+
List<PrivilegedOperation> ret = new ArrayList<>();
92+
ret.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
93+
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
94+
.getPathForCGroupTasks(PIDS, cgroupId)));
95+
return ret;
96+
}
97+
98+
@Override
99+
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
100+
throws ResourceHandlerException {
101+
return null;
102+
}
103+
104+
@Override
105+
public List<PrivilegedOperation> updateContainer(Container container) throws ResourceHandlerException {
106+
return null;
107+
}
108+
109+
@Override
110+
public List<PrivilegedOperation> postComplete(ContainerId containerId)
111+
throws ResourceHandlerException {
112+
cGroupsHandler.deleteCGroup(PIDS, containerId.toString());
113+
return null;
114+
}
115+
116+
@Override
117+
public List<PrivilegedOperation> teardown()
118+
throws ResourceHandlerException {
119+
return null;
120+
}
121+
122+
123+
public int getProcessMaxCount() {
124+
return processMaxCount;
125+
}
126+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
21+
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
25+
/**
26+
* Resource handler for pid resources.
27+
*/
28+
@InterfaceAudience.Private
29+
@InterfaceStability.Unstable
30+
public interface PidsResourceHandler extends ResourceHandler {
31+
32+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,15 @@ private static ResourceHandler getNumaResourceHandler(Configuration conf,
278278
return null;
279279
}
280280

281+
private static ResourceHandler
282+
getPidsResourceHandler(Configuration conf) throws ResourceHandlerException{
283+
if (conf.getBoolean(YarnConfiguration.NM_CONTAINER_PROCESS_MONITOR_ENABLED,
284+
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED)){
285+
return new CGroupsPidsResourceHandlerImpl(getCGroupsHandler());
286+
}
287+
return null;
288+
}
289+
281290
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
282291
ResourceHandler handler) {
283292
if (handler != null) {
@@ -299,6 +308,7 @@ private static void initializeConfiguredResourceHandlerChain(
299308
addHandlerIfNotNull(handlerList,
300309
initCGroupsCpuResourceHandler(conf));
301310
addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
311+
addHandlerIfNotNull(handlerList, getPidsResourceHandler(conf));
302312
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
303313
resourceHandlerChain = new ResourceHandlerChain(handlerList);
304314
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.yarn.api.records.ContainerId;
22+
import org.apache.hadoop.yarn.api.records.ExecutionType;
23+
import org.apache.hadoop.yarn.api.records.Resource;
24+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
25+
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
26+
import org.apache.hadoop.yarn.server.nodemanager.Context;
27+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
28+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
29+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
30+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
31+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
32+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
33+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceHandlerImpl;
34+
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
35+
import org.junit.Assert;
36+
import org.junit.Before;
37+
import org.junit.Test;
38+
39+
import java.io.IOException;
40+
import java.util.Arrays;
41+
import java.util.List;
42+
import java.util.concurrent.ConcurrentHashMap;
43+
44+
import static org.junit.Assert.assertEquals;
45+
import static org.junit.Assert.assertNull;
46+
import static org.mockito.ArgumentMatchers.any;
47+
import static org.mockito.Mockito.*;
48+
import static org.mockito.Mockito.times;
49+
50+
/**
51+
* Test class for CGroupsPidsResourceHandlerImpl.
52+
*
53+
*/
54+
public class TestPidsResourceHandlerImpl {
55+
56+
private CGroupsPidsResourceHandlerImpl pidsResourceHandler;
57+
private Container mockContainer;
58+
private CGroupsHandler mockCGroupsHandler;
59+
60+
@Before
61+
public void setUp() throws IOException, ResourceHandlerException {
62+
mockCGroupsHandler = mock(CGroupsHandler.class);
63+
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
64+
pidsResourceHandler =
65+
new CGroupsPidsResourceHandlerImpl(mockCGroupsHandler);
66+
}
67+
68+
@Test
69+
public void testBootstrap() throws Exception {
70+
Configuration conf = new YarnConfiguration();
71+
List<PrivilegedOperation> ret =
72+
pidsResourceHandler.bootstrap(conf);
73+
verify(mockCGroupsHandler, times(1))
74+
.initializeCGroupController(CGroupsHandler.CGroupController.PIDS);
75+
Assert.assertNull(ret);
76+
Assert.assertEquals("Default process number incorrect", 10000,
77+
pidsResourceHandler.getProcessMaxCount());
78+
}
79+
80+
@Test
81+
public void testProcessNumbers() throws Exception {
82+
Configuration conf = new YarnConfiguration();
83+
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, -1);
84+
try {
85+
pidsResourceHandler.bootstrap(conf);
86+
Assert.fail("Negative values for process number should not be allowed.");
87+
} catch (ResourceHandlerException re) {
88+
// do nothing
89+
}
90+
91+
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1000);
92+
pidsResourceHandler.bootstrap(conf);
93+
Assert.assertEquals("process number value incorrect", 1000,
94+
pidsResourceHandler.getProcessMaxCount());
95+
}
96+
97+
@Test
98+
public void testPreStart() throws Exception {
99+
Configuration conf = new Configuration();
100+
pidsResourceHandler.bootstrap(conf);
101+
String id = "container_01_01";
102+
String path = "test-path/" + id;
103+
ContainerId mockContainerId = mock(ContainerId.class);
104+
when(mockContainerId.toString()).thenReturn(id);
105+
Container mockContainer = mock(Container.class);
106+
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
107+
when(mockCGroupsHandler
108+
.getPathForCGroupTasks(CGroupsHandler.CGroupController.PIDS, id))
109+
.thenReturn(path);
110+
int maxProcess = 1024;
111+
List<PrivilegedOperation> ret =
112+
pidsResourceHandler.preStart(mockContainer);
113+
verify(mockCGroupsHandler, times(1))
114+
.createCGroup(CGroupsHandler.CGroupController.PIDS, id);
115+
verify(mockCGroupsHandler, times(1))
116+
.updateCGroupParam(CGroupsHandler.CGroupController.PIDS, id,
117+
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(maxProcess));
118+
Assert.assertNotNull(ret);
119+
Assert.assertEquals(1, ret.size());
120+
PrivilegedOperation op = ret.get(0);
121+
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
122+
op.getOperationType());
123+
List<String> args = op.getArguments();
124+
Assert.assertEquals(1, args.size());
125+
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
126+
args.get(0));
127+
}
128+
129+
@Test
130+
public void testReacquireContainer() throws Exception {
131+
ContainerId containerIdMock = mock(ContainerId.class);
132+
Assert.assertNull(
133+
pidsResourceHandler.reacquireContainer(containerIdMock));
134+
}
135+
136+
@Test
137+
public void testPostComplete() throws Exception {
138+
String id = "container_01_01";
139+
ContainerId mockContainerId = mock(ContainerId.class);
140+
when(mockContainerId.toString()).thenReturn(id);
141+
Assert
142+
.assertNull(pidsResourceHandler.postComplete(mockContainerId));
143+
verify(mockCGroupsHandler, times(1))
144+
.deleteCGroup(CGroupsHandler.CGroupController.PIDS, id);
145+
}
146+
147+
@Test
148+
public void testTeardown() throws Exception {
149+
Assert.assertNull(pidsResourceHandler.teardown());
150+
}
151+
}

0 commit comments

Comments
 (0)