Skip to content

Commit babb392

Browse files
committed
add backpressure support
Fixes #4 No backpressure in node. The `ws` module has a buggy `bufferedAmount` property. See: websockets/ws#492
1 parent e04f776 commit babb392

File tree

1 file changed

+56
-25
lines changed

1 file changed

+56
-25
lines changed

index.js

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ function Socket (url, opts) {
3131
self.connected = false
3232
self.destroyed = false
3333

34-
self._buffer = []
34+
self._chunk = null
35+
self._cb = null
36+
self._interval = null
3537

3638
self._ws = new WebSocket(self.url)
3739
self._ws.binaryType = 'arraybuffer'
@@ -44,7 +46,11 @@ function Socket (url, opts) {
4446
if (self.connected) {
4547
// When stream is finished writing, close socket connection. Half open connections
4648
// are currently not supported.
47-
self._destroy()
49+
// Wait a bit before destroying so the socket flushes.
50+
// TODO: is there a more reliable way to accomplish this?
51+
setTimeout(function () {
52+
self._destroy()
53+
}, 100)
4854
} else {
4955
// If socket is not connected when stream is finished writing, wait until data is
5056
// flushed to network at "connect" event.
@@ -56,12 +62,14 @@ function Socket (url, opts) {
5662
})
5763
}
5864
})
65+
5966
}
6067

61-
Socket.prototype.send = function (chunk, cb) {
68+
Socket.prototype.send = function (chunk) {
6269
var self = this
63-
if (!cb) cb = noop
64-
self._write(chunk, undefined, cb)
70+
var len = chunk.length || chunk.byteLength || chunk.size
71+
self._ws.send(chunk)
72+
debug('write: %d bytes', len)
6573
}
6674

6775
Socket.prototype.destroy = function (onclose) {
@@ -79,6 +87,11 @@ Socket.prototype._destroy = function (err, onclose) {
7987
self.connected = false
8088
self.destroyed = true
8189

90+
clearInterval(self._interval)
91+
self._interval = null
92+
self._chunk = null
93+
self._cb = null
94+
8295
if (self._ws) {
8396
try {
8497
self._ws.close()
@@ -112,24 +125,25 @@ Socket.prototype._write = function (chunk, encoding, cb) {
112125
var self = this
113126
if (self.destroyed) return cb(new Error('cannot write after socket is destroyed'))
114127

115-
var len = chunk.length || chunk.byteLength || chunk.size
116-
if (!self.connected) {
117-
debug('_write before ready: length %d', len)
118-
self._buffer.push(chunk)
119-
cb(null)
120-
return
128+
if (!isTypedArray.strict(chunk) && !(chunk instanceof ArrayBuffer) &&
129+
!Buffer.isBuffer(chunk) && typeof chunk !== 'string' &&
130+
(typeof Blob === 'undefined' || !(chunk instanceof Blob))) {
131+
chunk = JSON.stringify(chunk)
121132
}
122-
debug('_write: length %d', len)
123133

124-
if (isTypedArray.strict(chunk) || chunk instanceof ArrayBuffer ||
125-
Buffer.isBuffer(chunk) || typeof chunk === 'string' ||
126-
(typeof Blob !== 'undefined' && chunk instanceof Blob)) {
127-
self._ws.send(chunk)
134+
if (self.connected) {
135+
self.send(chunk)
136+
if (typeof ws !== 'function' && self._ws.bufferedAmount) {
137+
debug('start backpressure: bufferedAmount %d', self._ws.bufferedAmount)
138+
self._cb = cb
139+
} else {
140+
cb(null)
141+
}
128142
} else {
129-
self._ws.send(JSON.stringify(chunk))
143+
debug('write before connect')
144+
self._chunk = chunk
145+
self._cb = cb
130146
}
131-
132-
cb(null)
133147
}
134148

135149
Socket.prototype._onMessage = function (event) {
@@ -156,10 +170,29 @@ Socket.prototype._onOpen = function () {
156170
if (self.connected || self.destroyed) return
157171
self.connected = true
158172

159-
self._buffer.forEach(function (chunk) {
160-
self.send(chunk)
161-
})
162-
self._buffer = []
173+
if (self._chunk) {
174+
self.send(self._chunk)
175+
self._chunk = null
176+
debug('sent chunk from "write before connect"')
177+
178+
var cb = self._cb
179+
self._cb = null
180+
cb(null)
181+
}
182+
183+
// No backpressure in node. The `ws` module has a buggy `bufferedAmount` property.
184+
// See: https://github.com/websockets/ws/issues/492
185+
if (typeof ws !== 'function') {
186+
self._interval = setInterval(function () {
187+
console.log(self._ws.bufferedAmount)
188+
if (!self._cb || !self._ws || self._ws.bufferedAmount) return
189+
debug('ending backpressure: bufferedAmount %d', self._ws.bufferedAmount)
190+
var cb = self._cb
191+
self._cb = null
192+
cb(null)
193+
}, 150)
194+
if (self._interval.unref) self._interval.unref()
195+
}
163196

164197
debug('connect')
165198
self.emit('connect')
@@ -179,5 +212,3 @@ Socket.prototype._onError = function () {
179212
debug('error: %s', err.message || err)
180213
self._destroy(err)
181214
}
182-
183-
function noop () {}

0 commit comments

Comments
 (0)