Skip to content

Commit 9d1b43d

Browse files
committed
Fix utf8 decoding across combined buffers
1 parent 7c2a0a3 commit 9d1b43d

File tree

4 files changed

+205
-107
lines changed

4 files changed

+205
-107
lines changed

src/v1/internal/chunking.js

+55-55
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
19+
2020
import buf from './buf';
2121

2222
let
23-
_CHUNK_HEADER_SIZE= 2,
23+
_CHUNK_HEADER_SIZE = 2,
2424
_MESSAGE_BOUNDARY = 0x00,
2525
_DEFAULT_BUFFER_SIZE = 1400; // http://stackoverflow.com/questions/2613734/maximum-packet-size-for-a-tcp-connection
2626

@@ -29,98 +29,98 @@ let
2929
* @access private
3030
*/
3131
class Chunker extends buf.BaseBuffer {
32-
constructor( channel, bufferSize ) {
33-
super( 0 );
32+
constructor(channel, bufferSize) {
33+
super(0);
3434
this._bufferSize = bufferSize || _DEFAULT_BUFFER_SIZE;
3535
this._ch = channel;
36-
this._buffer = buf.alloc( this._bufferSize );
36+
this._buffer = buf.alloc(this._bufferSize);
3737
this._currentChunkStart = 0;
3838
this._chunkOpen = false;
3939
}
4040

41-
putUInt8 ( position, val ) {
41+
putUInt8(position, val) {
4242
this._ensure(1);
43-
this._buffer.writeUInt8( val );
43+
this._buffer.writeUInt8(val);
4444
}
4545

46-
putInt8 ( position, val ) {
46+
putInt8(position, val) {
4747
this._ensure(1);
48-
this._buffer.writeInt8( val );
48+
this._buffer.writeInt8(val);
4949
}
5050

51-
putFloat64 ( position, val ) {
51+
putFloat64(position, val) {
5252
this._ensure(8);
53-
this._buffer.writeFloat64( val );
53+
this._buffer.writeFloat64(val);
5454
}
5555

56-
putBytes ( position, data ) {
57-
// TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on rather than doing the copy here
58-
// TODO: *however* note that we need some way to find out when the data has been written (and thus the buffer can be re-used) if we take that approach
59-
while ( data.remaining() > 0 )
60-
{
56+
putBytes(position, data) {
57+
// TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on
58+
// rather than doing the copy here TODO: *however* note that we need some way to find out when the data has been
59+
// written (and thus the buffer can be re-used) if we take that approach
60+
while (data.remaining() > 0) {
6161
// Ensure there is an open chunk, and that it has at least one byte of space left
62-
this._ensure( 1 );
63-
if( this._buffer.remaining() > data.remaining() ) {
64-
this._buffer.writeBytes( data );
62+
this._ensure(1);
63+
if (this._buffer.remaining() > data.remaining()) {
64+
this._buffer.writeBytes(data);
6565
} else {
66-
this._buffer.writeBytes( data.readSlice( this._buffer.remaining() ) );
66+
this._buffer.writeBytes(data.readSlice(this._buffer.remaining()));
6767
}
6868
}
6969
return this;
7070
}
7171

72-
flush () {
73-
if ( this._buffer.position > 0 ) {
72+
flush() {
73+
if (this._buffer.position > 0) {
7474
this._closeChunkIfOpen();
7575

7676
// Local copy and clear the buffer field. This ensures that the buffer is not re-released if the flush call fails
7777
let out = this._buffer;
7878
this._buffer = null;
7979

80-
this._ch.write( out.getSlice( 0, out.position ) );
80+
this._ch.write(out.getSlice(0, out.position));
8181

8282
// Alloc a new output buffer. We assume we're using NodeJS's buffer pooling under the hood here!
83-
this._buffer = buf.alloc( this._bufferSize );
83+
this._buffer = buf.alloc(this._bufferSize);
8484
this._chunkOpen = false;
8585
}
8686
return this;
8787
}
8888

89-
/**
89+
/**
9090
* Bolt messages are encoded in one or more chunks, and the boundary between two messages
9191
* is encoded as a 0-length chunk, `00 00`. This inserts such a message boundary, closing
92-
* any currently open chunk as needed
92+
* any currently open chunk as needed
9393
*/
94-
messageBoundary () {
95-
94+
messageBoundary() {
95+
9696
this._closeChunkIfOpen();
9797

98-
if ( this._buffer.remaining() < _CHUNK_HEADER_SIZE ) {
98+
if (this._buffer.remaining() < _CHUNK_HEADER_SIZE) {
9999
this.flush();
100100
}
101101

102102
// Write message boundary
103-
this._buffer.writeInt16( _MESSAGE_BOUNDARY );
103+
this._buffer.writeInt16(_MESSAGE_BOUNDARY);
104104
}
105105

106106
/** Ensure at least the given size is available for writing */
107-
_ensure ( size ) {
107+
_ensure(size) {
108108
let toWriteSize = this._chunkOpen ? size : size + _CHUNK_HEADER_SIZE;
109-
if ( this._buffer.remaining() < toWriteSize ) {
109+
if (this._buffer.remaining() < toWriteSize) {
110110
this.flush();
111111
}
112112

113-
if ( !this._chunkOpen ) {
113+
if (!this._chunkOpen) {
114114
this._currentChunkStart = this._buffer.position;
115115
this._buffer.position = this._buffer.position + _CHUNK_HEADER_SIZE;
116116
this._chunkOpen = true;
117117
}
118118
}
119119

120-
_closeChunkIfOpen () {
121-
if ( this._chunkOpen ) {
120+
_closeChunkIfOpen() {
121+
if (this._chunkOpen) {
122122
let chunkSize = this._buffer.position - (this._currentChunkStart + _CHUNK_HEADER_SIZE);
123-
this._buffer.putUInt16( this._currentChunkStart, chunkSize );
123+
this._buffer.putUInt16(this._currentChunkStart, chunkSize);
124124
this._chunkOpen = false;
125125
}
126126
}
@@ -139,67 +139,67 @@ class Dechunker {
139139
this._state = this.AWAITING_CHUNK;
140140
}
141141

142-
AWAITING_CHUNK ( buf ) {
143-
if ( buf.remaining() >= 2 ) {
142+
AWAITING_CHUNK(buf) {
143+
if (buf.remaining() >= 2) {
144144
// Whole header available, read that
145-
return this._onHeader( buf.readUInt16() );
145+
return this._onHeader(buf.readUInt16());
146146
} else {
147147
// Only one byte available, read that and wait for the second byte
148148
this._partialChunkHeader = buf.readUInt8() << 8;
149149
return this.IN_HEADER;
150150
}
151151
}
152152

153-
IN_HEADER ( buf ) {
153+
IN_HEADER(buf) {
154154
// First header byte read, now we read the next one
155-
return this._onHeader( (this._partialChunkHeader | buf.readUInt8()) & 0xFFFF );
155+
return this._onHeader((this._partialChunkHeader | buf.readUInt8()) & 0xFFFF);
156156
}
157157

158-
IN_CHUNK ( buf ) {
159-
if ( this._chunkSize <= buf.remaining() ) {
158+
IN_CHUNK(buf) {
159+
if (this._chunkSize <= buf.remaining()) {
160160
// Current packet is larger than current chunk, or same size:
161-
this._currentMessage.push( buf.readSlice( this._chunkSize ) );
161+
this._currentMessage.push(buf.readSlice(this._chunkSize));
162162
return this.AWAITING_CHUNK;
163163
} else {
164164
// Current packet is smaller than the chunk we're reading, split the current chunk itself up
165165
this._chunkSize -= buf.remaining();
166-
this._currentMessage.push( buf.readSlice( buf.remaining() ) );
166+
this._currentMessage.push(buf.readSlice(buf.remaining()));
167167
return this.IN_CHUNK;
168168
}
169169
}
170170

171-
CLOSED ( buf ) {
171+
CLOSED(buf) {
172172
// no-op
173173
}
174174

175175
/** Called when a complete chunk header has been recieved */
176-
_onHeader ( header ) {
177-
if(header == 0) {
176+
_onHeader(header) {
177+
if (header == 0) {
178178
// Message boundary
179179
let message;
180-
if( this._currentMessage.length == 1 ) {
180+
if (this._currentMessage.length == 1) {
181181
message = this._currentMessage[0];
182182
} else {
183183
message = new buf.CombinedBuffer( this._currentMessage );
184184
}
185185
this._currentMessage = [];
186-
this.onmessage( message );
186+
this.onmessage(message);
187187
return this.AWAITING_CHUNK;
188188
} else {
189189
this._chunkSize = header;
190190
return this.IN_CHUNK;
191191
}
192192
}
193193

194-
write ( buf ) {
195-
while( buf.hasRemaining() ) {
196-
this._state = this._state( buf );
194+
write(buf) {
195+
while (buf.hasRemaining()) {
196+
this._state = this._state(buf);
197197
}
198198
}
199199
}
200200

201201

202202
export default {
203-
Chunker: Chunker,
204-
Dechunker: Dechunker
203+
Chunker: Chunker,
204+
Dechunker: Dechunker
205205
}

src/v1/internal/utf8.js

+41-35
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
19+
2020
// This module defines a cross-platform UTF-8 encoder and decoder that works
2121
// with the Buffer API defined in buf.js
2222

@@ -33,78 +33,84 @@ try {
3333
let node = require("buffer");
3434

3535
platformObj = {
36-
"encode" : function( str ) {
37-
return new buf.NodeBuffer( new node.Buffer(str, "UTF-8") );
36+
"encode": function (str) {
37+
return new buf.NodeBuffer(new node.Buffer(str, "UTF-8"));
3838
},
39-
"decode" : function( buffer, length ) {
40-
let start = buffer.position,
39+
"decode": function (buffer, length) {
40+
if (buffer instanceof buf.NodeBuffer) {
41+
let start = buffer.position,
4142
end = start + length;
42-
buffer.position = end;
43-
44-
if( buffer instanceof buf.NodeBuffer ) {
45-
return buffer._buffer.toString( 'utf8', start, end );
46-
}
47-
else if( buffer instanceof buf.CombinedBuffer ) {
48-
let out = streamDecodeCombinedBuffer(buffer._buffers, end,
43+
buffer.position = Math.min(end, buffer.length);
44+
return buffer._buffer.toString('utf8', start, end);
45+
}
46+
else if (buffer instanceof buf.CombinedBuffer) {
47+
let out = streamDecodeCombinedBuffer(buffer, length,
4948
(partBuffer) => {
5049
return decoder.write(partBuffer._buffer);
5150
},
52-
() => { return decoder.end().slice(start, end); }
51+
() => {
52+
return decoder.end();
53+
}
5354
);
5455
return out;
55-
}
56+
}
5657
else {
57-
throw newError( "Don't know how to decode strings from `" + buffer + "`.");
58+
throw newError("Don't know how to decode strings from `" + buffer + "`.");
5859
}
5960
}
6061
}
6162

62-
} catch( e ) {
63+
} catch (e) {
6364

6465
// Not on NodeJS, add shim for WebAPI TextEncoder/TextDecoder
6566
var textEncoding = require('../../external/text-encoding/index');
6667
let encoder = new textEncoding.TextEncoder("utf-8");
6768
let decoder = new textEncoding.TextDecoder("utf-8");
6869

6970
platformObj = {
70-
"encode" : function( str ) {
71-
return new buf.HeapBuffer( encoder.encode(str).buffer );
71+
"encode": function (str) {
72+
return new buf.HeapBuffer(encoder.encode(str).buffer);
7273
},
73-
"decode" : function( buffer, length ) {
74-
if( buffer instanceof buf.HeapBuffer ) {
75-
return decoder.decode( buffer.readView( length ) );
74+
"decode": function (buffer, length) {
75+
if (buffer instanceof buf.HeapBuffer) {
76+
return decoder.decode(buffer.readView(Math.min(length, buffer.length - buffer.position)));
7677
}
7778
else {
7879
// Decoding combined buffer is complicated. For simplicity, for now,
7980
// we simply copy the combined buffer into a regular buffer and decode that.
80-
var tmpBuf = buf.alloc( length );
81+
var tmpBuf = buf.alloc(length);
8182
for (var i = 0; i < length; i++) {
82-
tmpBuf.writeUInt8( buffer.readUInt8() );
83-
};
83+
tmpBuf.writeUInt8(buffer.readUInt8());
84+
}
8485
tmpBuf.reset();
85-
return decoder.decode( tmpBuf.readView( length ) );
86+
return decoder.decode(tmpBuf.readView(length));
8687
}
8788
}
8889
}
8990
}
9091

9192
let streamDecodeCombinedBuffer = (combinedBuffers, length, decodeFn, endFn) => {
9293
let remainingBytesToRead = length;
94+
let position = combinedBuffers.position;
95+
combinedBuffers._updatePos(Math.min(length, combinedBuffers.length - position));
9396
// Reduce CombinedBuffers to a decoded string
94-
let out = combinedBuffers.reduce(function(last, partBuffer){
95-
if(remainingBytesToRead <= 0) {
97+
let out = combinedBuffers._buffers.reduce(function (last, partBuffer) {
98+
if (remainingBytesToRead <= 0) {
9699
return last;
97-
}
98-
if(partBuffer.length > remainingBytesToRead) { // When we don't want the whole buffer
99-
let lastSlice = partBuffer.readSlice(remainingBytesToRead);
100-
partBuffer._updatePos(remainingBytesToRead);
101-
remainingBytesToRead = 0;
100+
} else if (position >= partBuffer.length) {
101+
position -= partBuffer.length;
102+
return '';
103+
} else {
104+
partBuffer._updatePos(position - partBuffer.position);
105+
let bytesToRead = Math.min(partBuffer.length - position, remainingBytesToRead);
106+
let lastSlice = partBuffer.readSlice(bytesToRead);
107+
partBuffer._updatePos(bytesToRead);
108+
remainingBytesToRead = Math.max(remainingBytesToRead - lastSlice.length, 0);
109+
position = 0;
102110
return last + decodeFn(lastSlice);
103111
}
104-
remainingBytesToRead -= partBuffer.length;
105-
return last + decodeFn(partBuffer);
106112
}, '');
107113
return out + endFn();
108-
}
114+
};
109115

110116
export default platformObj;

0 commit comments

Comments
 (0)