Skip to content

Commit 6577faa

Browse files
committed
StompEncoder performance improvement plus related polishing
Issue: SPR-14747 (cherry picked from commit 6c764f6)
1 parent c5722b3 commit 6577faa

File tree

4 files changed

+80
-78
lines changed

4 files changed

+80
-78
lines changed
Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.messaging.simp.stomp;
1818

19-
2019
import java.nio.ByteBuffer;
2120
import java.util.Collections;
2221
import java.util.List;
@@ -28,7 +27,6 @@
2827
import org.springframework.util.LinkedMultiValueMap;
2928
import org.springframework.util.MultiValueMap;
3029

31-
3230
/**
3331
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
3432
* that buffers content remaining in the input ByteBuffer after the parent
@@ -45,6 +43,7 @@
4543
*
4644
* @author Rossen Stoyanchev
4745
* @since 4.0.3
46+
* @see StompDecoder
4847
*/
4948
public class BufferingStompDecoder {
5049

@@ -57,78 +56,57 @@ public class BufferingStompDecoder {
5756
private volatile Integer expectedContentLength;
5857

5958

59+
/**
60+
* Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}.
61+
* @param stompDecoder the target decoder to wrap
62+
* @param bufferSizeLimit the buffer size limit
63+
*/
6064
public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) {
61-
Assert.notNull(stompDecoder, "'stompDecoder' is required");
62-
Assert.isTrue(bufferSizeLimit > 0, "Buffer size must be greater than 0");
65+
Assert.notNull(stompDecoder, "StompDecoder is required");
66+
Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
6367
this.stompDecoder = stompDecoder;
6468
this.bufferSizeLimit = bufferSizeLimit;
6569
}
6670

6771

6872
/**
69-
* Return the wrapped
70-
* {@link org.springframework.messaging.simp.stomp.StompDecoder}.
73+
* Return the wrapped {@link StompDecoder}.
7174
*/
72-
public StompDecoder getStompDecoder() {
75+
public final StompDecoder getStompDecoder() {
7376
return this.stompDecoder;
7477
}
7578

7679
/**
7780
* Return the configured buffer size limit.
7881
*/
79-
public int getBufferSizeLimit() {
82+
public final int getBufferSizeLimit() {
8083
return this.bufferSizeLimit;
8184
}
8285

83-
/**
84-
* Calculate the current buffer size.
85-
*/
86-
public int getBufferSize() {
87-
int size = 0;
88-
for (ByteBuffer buffer : this.chunks) {
89-
size = size + buffer.remaining();
90-
}
91-
return size;
92-
}
93-
94-
/**
95-
* Get the expected content length of the currently buffered, incomplete STOMP frame.
96-
*/
97-
public Integer getExpectedContentLength() {
98-
return this.expectedContentLength;
99-
}
100-
10186

10287
/**
10388
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
10489
* list of {@link Message}s.
105-
*
10690
* <p>If there was enough data to parse a "content-length" header, then the
10791
* value is used to determine how much more data is needed before a new
10892
* attempt to decode is made.
109-
*
11093
* <p>If there was not enough data to parse the "content-length", or if there
11194
* is "content-length" header, every subsequent call to decode attempts to
11295
* parse again with all available data. Therefore the presence of a "content-length"
11396
* header helps to optimize the decoding of large messages.
114-
*
11597
* @param newBuffer a buffer containing new data to decode
116-
*
11798
* @return decoded messages or an empty list
11899
* @throws StompConversionException raised in case of decoding issues
119100
*/
120101
public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
121-
122102
this.chunks.add(newBuffer);
123-
124103
checkBufferLimits();
125104

126105
if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) {
127106
return Collections.<Message<byte[]>>emptyList();
128107
}
129108

130109
ByteBuffer bufferToDecode = assembleChunksAndReset();
131-
132110
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
133111
List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers);
134112

@@ -140,21 +118,6 @@ public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
140118
return messages;
141119
}
142120

143-
private void checkBufferLimits() {
144-
if (getExpectedContentLength() != null) {
145-
if (getExpectedContentLength() > getBufferSizeLimit()) {
146-
throw new StompConversionException(
147-
"The 'content-length' header " + getExpectedContentLength() +
148-
" exceeds the configured message buffer size limit " + getBufferSizeLimit());
149-
}
150-
}
151-
if (getBufferSize() > getBufferSizeLimit()) {
152-
throw new StompConversionException("The configured stomp frame buffer size limit of " +
153-
getBufferSizeLimit() + " bytes has been exceeded");
154-
155-
}
156-
}
157-
158121
private ByteBuffer assembleChunksAndReset() {
159122
ByteBuffer result;
160123
if (this.chunks.size() == 1) {
@@ -172,4 +135,36 @@ private ByteBuffer assembleChunksAndReset() {
172135
return result;
173136
}
174137

138+
private void checkBufferLimits() {
139+
if (this.expectedContentLength != null) {
140+
if (this.expectedContentLength > this.bufferSizeLimit) {
141+
throw new StompConversionException(
142+
"'content-length' header value " + this.expectedContentLength +
143+
" exceeds configured message buffer size limit " + this.bufferSizeLimit);
144+
}
145+
}
146+
if (getBufferSize() > this.bufferSizeLimit) {
147+
throw new StompConversionException("The configured stomp frame buffer size limit of " +
148+
this.bufferSizeLimit + " bytes has been exceeded");
149+
}
150+
}
151+
152+
/**
153+
* Calculate the current buffer size.
154+
*/
155+
public int getBufferSize() {
156+
int size = 0;
157+
for (ByteBuffer buffer : this.chunks) {
158+
size = size + buffer.remaining();
159+
}
160+
return size;
161+
}
162+
163+
/**
164+
* Get the expected content length of the currently buffered, incomplete STOMP frame.
165+
*/
166+
public Integer getExpectedContentLength() {
167+
return this.expectedContentLength;
168+
}
169+
175170
}

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,15 +26,14 @@
2626
/**
2727
* Base class for STOMP client implementations.
2828
*
29-
* <p>Subclasses can connect over WebSocket or TCP using any library.
30-
* When creating a new connection a sub-class can create an instance of
31-
* {@link DefaultStompSession} which extends
32-
* {@link org.springframework.messaging.tcp.TcpConnectionHandler
33-
* TcpConnectionHandler} whose lifecycle methods the sub-class must then invoke.
29+
* <p>Subclasses can connect over WebSocket or TCP using any library. When creating
30+
* a new connection, a subclass can create an instance of @link DefaultStompSession}
31+
* which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler}
32+
* whose lifecycle methods the subclass must then invoke.
3433
*
35-
* <p>In effect {@code TcpConnectionHandler} and {@code TcpConnection} are the
36-
* contracts any sub-class must adapt to while using {@link StompEncoder} and
37-
* {@link StompDecoder} to encode and decode STOMP messages.
34+
* <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the
35+
* contracts that any subclass must adapt to while using {@link StompEncoder}
36+
* and {@link StompDecoder} to encode and decode STOMP messages.
3837
*
3938
* @author Rossen Stoyanchev
4039
* @since 4.2
@@ -58,7 +57,7 @@ public abstract class StompClientSupport {
5857
* @param messageConverter the message converter to use
5958
*/
6059
public void setMessageConverter(MessageConverter messageConverter) {
61-
Assert.notNull(messageConverter, "'messageConverter' must not be null");
60+
Assert.notNull(messageConverter, "MessageConverter must not be null");
6261
this.messageConverter = messageConverter;
6362
}
6463

@@ -92,30 +91,31 @@ public TaskScheduler getTaskScheduler() {
9291
* CONNECT frame. The first number represents how often the client will write
9392
* or send a heart-beat. The second is how often the server should write.
9493
* A value of 0 means no heart-beats.
95-
* <p>By default this is set to "10000,10000" but sub-classes may override
94+
* <p>By default this is set to "10000,10000" but subclasses may override
9695
* that default and for example set it to "0,0" if they require a
9796
* TaskScheduler to be configured first.
9897
* @param heartbeat the value for the CONNECT "heart-beat" header
9998
* @see <a href="http://stomp.github.io/stomp-specification-1.2.html#Heart-beating">
10099
* http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
101100
*/
102101
public void setDefaultHeartbeat(long[] heartbeat) {
103-
Assert.notNull(heartbeat);
104-
Assert.isTrue(heartbeat[0] >= 0 && heartbeat[1] >=0 , "Invalid heart-beat: " + Arrays.toString(heartbeat));
102+
if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
103+
throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
104+
}
105105
this.defaultHeartbeat = heartbeat;
106106
}
107107

108108
/**
109-
* Return the configured default heart-beat value, never {@code null}.
109+
* Return the configured default heart-beat value (never {@code null}).
110110
*/
111111
public long[] getDefaultHeartbeat() {
112112
return this.defaultHeartbeat;
113113
}
114114

115115
/**
116-
* Whether heartbeats are enabled. Returns {@code false} if
117-
* {@link #setDefaultHeartbeat defaultHeartbeat} is set to "0,0", and
118-
* {@code true} otherwise.
116+
* Determine whether heartbeats are enabled.
117+
* <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat}
118+
* is set to "0,0", and {@code true} otherwise.
119119
*/
120120
public boolean isDefaultHeartbeatEnabled() {
121121
return (getDefaultHeartbeat() != null && getDefaultHeartbeat()[0] != 0 && getDefaultHeartbeat()[1] != 0);

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,7 +53,6 @@ public class StompDecoder {
5353

5454
private static final Log logger = LogFactory.getLog(StompDecoder.class);
5555

56-
5756
private MessageHeaderInitializer headerInitializer;
5857

5958

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@
1919
import java.io.ByteArrayOutputStream;
2020
import java.io.DataOutputStream;
2121
import java.io.IOException;
22-
import java.util.Arrays;
22+
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Map.Entry;
@@ -39,14 +39,15 @@
3939
* @author Andy Wilkinson
4040
* @author Rossen Stoyanchev
4141
* @since 4.0
42+
* @see StompDecoder
4243
*/
43-
public final class StompEncoder {
44+
public class StompEncoder {
4445

4546
private static final byte LF = '\n';
4647

4748
private static final byte COLON = ':';
4849

49-
private final Log logger = LogFactory.getLog(StompEncoder.class);
50+
private static final Log logger = LogFactory.getLog(StompEncoder.class);
5051

5152

5253
/**
@@ -78,9 +79,13 @@ public byte[] encode(Map<String, Object> headers, byte[] payload) {
7879
}
7980
output.write(StompDecoder.HEARTBEAT_PAYLOAD);
8081
}
82+
8183
else {
8284
StompCommand command = StompHeaderAccessor.getCommand(headers);
83-
Assert.notNull(command, "Missing STOMP command: " + headers);
85+
if (command == null) {
86+
throw new IllegalStateException("Missing STOMP command: " + headers);
87+
}
88+
8489
output.write(command.toString().getBytes(StompDecoder.UTF8_CHARSET));
8590
output.write(LF);
8691
writeHeaders(command, headers, payload, output);
@@ -96,8 +101,8 @@ public byte[] encode(Map<String, Object> headers, byte[] payload) {
96101
}
97102
}
98103

99-
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload, DataOutputStream output)
100-
throws IOException {
104+
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
105+
DataOutputStream output) throws IOException {
101106

102107
@SuppressWarnings("unchecked")
103108
Map<String,List<String>> nativeHeaders =
@@ -114,22 +119,25 @@ private void writeHeaders(StompCommand command, Map<String, Object> headers, byt
114119
boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);
115120

116121
for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
117-
byte[] key = encodeHeaderString(entry.getKey(), shouldEscape);
118122
if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
119123
continue;
120124
}
125+
121126
List<String> values = entry.getValue();
122127
if (StompCommand.CONNECT.equals(command) &&
123128
StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
124-
values = Arrays.asList(StompHeaderAccessor.getPasscode(headers));
129+
values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
125130
}
131+
132+
byte[] encodedKey = encodeHeaderString(entry.getKey(), shouldEscape);
126133
for (String value : values) {
127-
output.write(key);
134+
output.write(encodedKey);
128135
output.write(COLON);
129136
output.write(encodeHeaderString(value, shouldEscape));
130137
output.write(LF);
131138
}
132139
}
140+
133141
if (command.requiresContentLength()) {
134142
int contentLength = payload.length;
135143
output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET));

0 commit comments

Comments
 (0)