Skip to content

Commit 274b045

Browse files
committed
[ISSUE 3825] Use default SO_SNDBUF/SO_RCVBUF/WRITE_BUFFER_WATER_MARK value.
This commit may greatly increase thoughtput when network lantency is big or message size is big, especially for consumer. In one broker (M/S sync master async flush), one producer send test. 250 byte body: 0.93x old version 4000 byte body: 3.85x of old version 16000 byte body: 6.07x of old version 100000 byte body: 7.93x of old version The old version is 4.9.3-SNAPSHOT in this test. In this test the network latency from producer to broker is 1.61ms.
1 parent d5c8800 commit 274b045

File tree

6 files changed

+46
-24
lines changed

6 files changed

+46
-24
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,6 @@ public static void shutdown(final BrokerController controller) {
9090
public static BrokerController createBrokerController(String[] args) {
9191
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
9292

93-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
94-
NettySystemConfig.socketSndbufSize = 131072;
95-
}
96-
97-
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
98-
NettySystemConfig.socketRcvbufSize = 131072;
99-
}
100-
10193
try {
10294
//PackageConflictDetect.detectFastjson();
10395
Options options = ServerUtil.buildCommandlineOptions(new Options());

remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Enumeration;
3535
import org.apache.rocketmq.logging.InternalLogger;
3636
import org.apache.rocketmq.logging.InternalLoggerFactory;
37+
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
3738

3839
public class RemotingUtil {
3940
public static final String OS_NAME = System.getProperty("os.name");
@@ -193,8 +194,12 @@ public static SocketChannel connect(SocketAddress remote, final int timeoutMilli
193194
sc.configureBlocking(true);
194195
sc.socket().setSoLinger(false, -1);
195196
sc.socket().setTcpNoDelay(true);
196-
sc.socket().setReceiveBufferSize(1024 * 64);
197-
sc.socket().setSendBufferSize(1024 * 64);
197+
if (NettySystemConfig.socketSndbufSize > 0) {
198+
sc.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
199+
}
200+
if (NettySystemConfig.socketRcvbufSize > 0) {
201+
sc.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
202+
}
198203
sc.socket().connect(remote, timeoutMillis);
199204
sc.configureBlocking(false);
200205
return sc;

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,6 @@ public Thread newThread(Runnable r) {
166166
.option(ChannelOption.TCP_NODELAY, true)
167167
.option(ChannelOption.SO_KEEPALIVE, false)
168168
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
169-
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
170-
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
171-
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
172-
nettyClientConfig.getWriteBufferHighWaterMark()))
173169
.handler(new ChannelInitializer<SocketChannel>() {
174170
@Override
175171
public void initChannel(SocketChannel ch) throws Exception {
@@ -191,6 +187,20 @@ public void initChannel(SocketChannel ch) throws Exception {
191187
new NettyClientHandler());
192188
}
193189
});
190+
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
191+
log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
192+
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
193+
}
194+
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
195+
log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
196+
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
197+
}
198+
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
199+
log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
200+
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
201+
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
202+
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
203+
}
194204

195205
this.timer.scheduleAtFixedRate(new TimerTask() {
196206
@Override

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,6 @@ public Thread newThread(Runnable r) {
203203
.option(ChannelOption.SO_REUSEADDR, true)
204204
.option(ChannelOption.SO_KEEPALIVE, false)
205205
.childOption(ChannelOption.TCP_NODELAY, true)
206-
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
207-
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
208-
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
209-
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
210206
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
211207
.childHandler(new ChannelInitializer<SocketChannel>() {
212208
@Override
@@ -222,6 +218,20 @@ public void initChannel(SocketChannel ch) throws Exception {
222218
);
223219
}
224220
});
221+
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
222+
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
223+
childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
224+
}
225+
if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
226+
log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
227+
childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
228+
}
229+
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
230+
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
231+
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
232+
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
233+
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
234+
}
225235

226236
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
227237
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public class NettySystemConfig {
5050
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
5151
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
5252
public static int socketSndbufSize =
53-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
53+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "0"));
5454
public static int socketRcvbufSize =
55-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
55+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "0"));
5656
public static int socketBacklog =
5757
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_BACKLOG, "1024"));
5858
public static int clientWorkerSize =
@@ -64,8 +64,8 @@ public class NettySystemConfig {
6464
public static boolean clientCloseSocketIfTimeout =
6565
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "true"));
6666
public static int writeBufferHighWaterMark =
67-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE, "4194304"));//4M
67+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE, "0"));
6868
public static int writeBufferLowWaterMark =
69-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK, "1048576")); //1MB
69+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK, "0"));
7070

7171
}

store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.rocketmq.logging.InternalLogger;
2727
import org.apache.rocketmq.logging.InternalLoggerFactory;
2828
import org.apache.rocketmq.remoting.common.RemotingUtil;
29+
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
2930
import org.apache.rocketmq.store.SelectMappedBufferResult;
3031

3132
public class HAConnection {
@@ -46,8 +47,12 @@ public HAConnection(final HAService haService, final SocketChannel socketChannel
4647
this.socketChannel.configureBlocking(false);
4748
this.socketChannel.socket().setSoLinger(false, -1);
4849
this.socketChannel.socket().setTcpNoDelay(true);
49-
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
50-
this.socketChannel.socket().setSendBufferSize(1024 * 64);
50+
if (NettySystemConfig.socketSndbufSize > 0) {
51+
this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
52+
}
53+
if (NettySystemConfig.socketRcvbufSize > 0) {
54+
this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
55+
}
5156
this.writeSocketService = new WriteSocketService(this.socketChannel);
5257
this.readSocketService = new ReadSocketService(this.socketChannel);
5358
this.haService.getConnectionCount().incrementAndGet();

0 commit comments

Comments
 (0)