Skip to content

Commit b2d17f0

Browse files
luogeluoge
authored andcommitted
bug fix: CGroupElasticMemoryController is not work
1 parent a0d8cde commit b2d17f0

File tree

3 files changed

+316
-1
lines changed

3 files changed

+316
-1
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,8 @@ private void checkLimit(ContainerId containerId, String pId,
742742
ProcessTreeInfo ptInfo,
743743
long currentVmemUsage,
744744
long currentPmemUsage) {
745-
if (strictMemoryEnforcement && !elasticMemoryEnforcement) {
745+
if ((strictMemoryEnforcement && !elasticMemoryEnforcement) ||
746+
(!strictMemoryEnforcement && elasticMemoryEnforcement)) {
746747
// When cgroup-based strict memory enforcement is used alone without
747748
// elastic memory control, the oom-kill would take care of it.
748749
// However, when elastic memory control is also enabled, the oom killer
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
20+
21+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
25+
26+
import java.io.File;
27+
import java.io.IOException;
28+
import java.net.InetAddress;
29+
import java.net.URI;
30+
import java.net.URISyntaxException;
31+
import java.net.UnknownHostException;
32+
import java.nio.file.Files;
33+
import java.nio.file.Paths;
34+
import java.util.HashSet;
35+
import java.util.Set;
36+
import java.util.concurrent.ConcurrentMap;
37+
import java.util.concurrent.ConcurrentSkipListMap;
38+
import org.apache.hadoop.conf.Configuration;
39+
import org.apache.hadoop.fs.FileUtil;
40+
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
41+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
42+
import org.apache.hadoop.yarn.api.records.ApplicationId;
43+
import org.apache.hadoop.yarn.api.records.ContainerId;
44+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
45+
import org.apache.hadoop.yarn.event.AsyncDispatcher;
46+
import org.apache.hadoop.yarn.event.EventHandler;
47+
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
48+
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
49+
import org.apache.hadoop.yarn.server.nodemanager.Context;
50+
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
51+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
52+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
53+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
54+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
55+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
56+
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
57+
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
58+
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
59+
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
60+
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
61+
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
62+
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
63+
import org.apache.log4j.Logger;
64+
import org.junit.After;
65+
import org.junit.Before;
66+
import org.junit.Test;
67+
import org.mockito.Mockito;
68+
69+
public class TestContainersMonitorWithElasticMemory {
70+
71+
static final Logger LOG = Logger
72+
.getLogger(TestContainersMonitorWithElasticMemory.class);
73+
74+
private ContainersMonitorImpl containersMonitor;
75+
private MockLinuxContainerExecutor executor;
76+
private Configuration conf;
77+
private AsyncDispatcher dispatcher;
78+
private Context context;
79+
private MockContainerEventHandler containerEventHandler;
80+
private ConcurrentMap<ContainerId, Container> containerMap;
81+
private static final String MOCK_EXECUTOR = "mock-container-executor";
82+
private static final String MOCK_OOM_LISTENER = "mock-oom-listener";
83+
private String tmpMockExecutor = System.getProperty("test.build.data") +
84+
"/tmp-mock-container-executor";
85+
private String tmpMockOOMListener = System.getProperty("test.build.data") +
86+
"/tmp-mock-oom-listener";
87+
private final File mockParamFile = new File("./params.txt");
88+
89+
static class MockLinuxContainerExecutor extends LinuxContainerExecutor {
90+
91+
@Override
92+
public String[] getIpAndHost(Container container) {
93+
String[] ipAndHost = new String[2];
94+
try {
95+
InetAddress address = InetAddress.getLocalHost();
96+
ipAndHost[0] = address.getHostAddress();
97+
ipAndHost[1] = address.getHostName();
98+
} catch (UnknownHostException e) {
99+
LOG.error("Unable to get Local hostname and ip for " + container
100+
.getContainerId(), e);
101+
}
102+
return ipAndHost;
103+
}
104+
105+
@Override
106+
public void startLocalizer(LocalizerStartContext ctx) {
107+
}
108+
109+
@Override
110+
public int launchContainer(ContainerStartContext ctx) throws
111+
IOException, ConfigurationException {
112+
return 0;
113+
}
114+
115+
@Override
116+
public int relaunchContainer(ContainerStartContext ctx) throws
117+
IOException, ConfigurationException {
118+
return 0;
119+
}
120+
121+
@Override
122+
public boolean signalContainer(ContainerSignalContext ctx)
123+
throws IOException {
124+
return true;
125+
}
126+
127+
@Override
128+
public boolean reapContainer(ContainerReapContext ctx) {
129+
return true;
130+
}
131+
132+
@Override
133+
public IOStreamPair execContainer(ContainerExecContext ctx) {
134+
return new IOStreamPair(null, null);
135+
}
136+
137+
@Override
138+
public void deleteAsUser(DeletionAsUserContext ctx) {
139+
}
140+
141+
@Override
142+
public void symLink(String target, String symlink) {
143+
}
144+
145+
@Override
146+
public String getProcessId(ContainerId containerId) {
147+
return String.valueOf(containerId.getContainerId());
148+
}
149+
150+
@Override
151+
public boolean isContainerAlive(ContainerLivenessContext ctx) {
152+
return true;
153+
}
154+
155+
@Override
156+
public void updateYarnSysFS(Context ctx, String user, String appId, String spec) {
157+
}
158+
159+
@Override
160+
public String getExposedPorts(Container container) {
161+
return null;
162+
}
163+
}
164+
165+
private static class MockContainerEventHandler implements
166+
EventHandler<ContainerEvent> {
167+
168+
final private Set<ContainerId> killedContainer
169+
= new HashSet<>();
170+
171+
@Override
172+
public void handle(ContainerEvent event) {
173+
if (event.getType() == ContainerEventType.KILL_CONTAINER) {
174+
synchronized (killedContainer) {
175+
killedContainer.add(event.getContainerID());
176+
}
177+
}
178+
}
179+
180+
public boolean isContainerKilled(ContainerId containerId) {
181+
synchronized (killedContainer) {
182+
return killedContainer.contains(containerId);
183+
}
184+
}
185+
}
186+
187+
public String prepareMockFile(String mockFile, String tmpMockFile)
188+
throws IOException, URISyntaxException {
189+
URI executorPath = getClass().getClassLoader().getResource(mockFile)
190+
.toURI();
191+
Files.copy(Paths.get(executorPath), Paths.get(tmpMockFile),
192+
REPLACE_EXISTING);
193+
194+
File executorMockFile = new File(tmpMockFile);
195+
196+
if (!FileUtil.canExecute(executorMockFile)) {
197+
FileUtil.setExecutable(executorMockFile, true);
198+
}
199+
return executorMockFile.getAbsolutePath();
200+
}
201+
202+
@Before
203+
public void setup() throws Exception {
204+
dispatcher = new AsyncDispatcher();
205+
context = Mockito.mock(Context.class);
206+
containerMap = new ConcurrentSkipListMap<>();
207+
Container container = Mockito.mock(ContainerImpl.class);
208+
containerMap.put(getContainerId(1), container);
209+
Mockito.doReturn(containerMap).when(context).getContainers();
210+
conf = new Configuration();
211+
conf.set(
212+
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
213+
MockResourceCalculatorPlugin.class.getCanonicalName());
214+
conf.set(
215+
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
216+
MockResourceCalculatorProcessTree.class.getCanonicalName());
217+
218+
String executorPath = prepareMockFile(MOCK_EXECUTOR, tmpMockExecutor);
219+
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH,
220+
executorPath);
221+
String oomListenerPath = prepareMockFile(MOCK_OOM_LISTENER, tmpMockOOMListener);
222+
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
223+
oomListenerPath);
224+
dispatcher.init(conf);
225+
dispatcher.start();
226+
containerEventHandler = new MockContainerEventHandler();
227+
dispatcher.register(ContainerEventType.class, containerEventHandler);
228+
}
229+
230+
private void deleteMockParamFile() {
231+
if(mockParamFile.exists()) {
232+
mockParamFile.delete();
233+
}
234+
}
235+
236+
@After
237+
public void tearDown() throws Exception {
238+
if (containersMonitor != null) {
239+
containersMonitor.stop();
240+
}
241+
if (dispatcher != null) {
242+
dispatcher.stop();
243+
}
244+
deleteMockParamFile();
245+
}
246+
247+
@Test
248+
public void testContainersResourceChangeForElasticMemoryController() throws Exception {
249+
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
250+
conf.setBoolean(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED, true);
251+
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED, true);
252+
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
253+
executor = new MockLinuxContainerExecutor();
254+
executor.setConf(conf);
255+
256+
try {
257+
executor.init(context);
258+
} catch (IOException e) {
259+
}
260+
261+
containersMonitor = createContainersMonitor(executor);
262+
containersMonitor.init(conf);
263+
containersMonitor.start();
264+
// create container 1
265+
containersMonitor.handle(new ContainerStartMonitoringEvent(
266+
getContainerId(1), 2100L, 1000L, 1, 0, 0));
267+
assertNotNull(getProcessTreeInfo(getContainerId(1)));
268+
assertEquals(1000L, getProcessTreeInfo(getContainerId(1))
269+
.getPmemLimit());
270+
assertEquals(2100L, getProcessTreeInfo(getContainerId(1))
271+
.getVmemLimit());
272+
// sleep longer than the monitor interval to make sure resource
273+
// enforcement has started
274+
Thread.sleep(20000);
275+
MockResourceCalculatorProcessTree mockTree =
276+
(MockResourceCalculatorProcessTree) getProcessTreeInfo(
277+
getContainerId(1)).getProcessTree();
278+
mockTree.setRssMemorySize(2500L);
279+
Thread.sleep(200);
280+
assertFalse(containerEventHandler
281+
.isContainerKilled(getContainerId(1)));
282+
containersMonitor.stop();
283+
}
284+
285+
private ContainersMonitorImpl createContainersMonitor(ContainerExecutor containerExecutor) {
286+
return new ContainersMonitorImpl(containerExecutor, dispatcher, context);
287+
}
288+
289+
private ContainerId getContainerId(int id) {
290+
return ContainerId.newContainerId(ApplicationAttemptId.newInstance(
291+
ApplicationId.newInstance(123456L, 1), 1), id);
292+
}
293+
294+
private ProcessTreeInfo getProcessTreeInfo(ContainerId id) {
295+
return containersMonitor.trackingContainers.get(id);
296+
}
297+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/sh
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
while true
15+
do
16+
sleep 1
17+
done

0 commit comments

Comments
 (0)