Skip to content

Commit 70e21c8

Browse files
author
Yoseph Maguire
committed
feat: websocket streams removed
1 parent 963e554 commit 70e21c8

File tree

5 files changed

+89
-40
lines changed

5 files changed

+89
-40
lines changed

examples/ws/aedes_server.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const aedes = require('aedes')()
2+
const httpServer = require('http').createServer()
3+
const WebSocket = require('ws')
4+
const wsPort = 8080
5+
6+
// Here we are creating the Websocket Server that is using the HTTP Server...
7+
const wss = new WebSocket.Server({ server: httpServer })
8+
wss.on('connection', function connection (ws) {
9+
const duplex = WebSocket.createWebSocketStream(ws)
10+
aedes.handle(duplex)
11+
})
12+
13+
httpServer.listen(wsPort, function () {
14+
console.log('websocket server listening on port', wsPort)
15+
})
16+
17+
aedes.on('clientError', function (client, err) {
18+
console.log('client error', client.id, err.message, err.stack)
19+
})
20+
21+
aedes.on('connectionError', function (client, err) {
22+
console.log('client error', client, err.message, err.stack)
23+
})
24+
25+
aedes.on('publish', function (packet, client) {
26+
if (packet && packet.payload) {
27+
console.log('publish packet:', packet.payload.toString())
28+
}
29+
if (client) {
30+
console.log('message from client', client.id)
31+
}
32+
})
33+
34+
aedes.on('subscribe', function (subscriptions, client) {
35+
if (client) {
36+
console.log('subscribe from client', subscriptions, client.id)
37+
}
38+
})
39+
40+
aedes.on('client', function (client) {
41+
console.log('new client', client.id)
42+
})

examples/wss/client.js renamed to examples/ws/client.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
'use strict'
22

3-
var mqtt = require('mqtt')
3+
var mqtt = require('../../types')
44

55
var clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)
66

7-
var host = 'wss://localhost:3001/Mosca'
7+
// This sample should be run in tandem with the aedes_server.js file.
8+
// Simply run it:
9+
// $ node aedes_server.js
10+
//
11+
// Then run this file in a separate console:
12+
// $ node websocket_sample.js
13+
//
14+
var host = 'ws://localhost:8080'
815

916
var options = {
10-
keepalive: 10,
17+
keepalive: 30,
1118
clientId: clientId,
1219
protocolId: 'MQTT',
1320
protocolVersion: 4,
@@ -20,11 +27,10 @@ var options = {
2027
qos: 0,
2128
retain: false
2229
},
23-
username: 'demo',
24-
password: 'demo',
2530
rejectUnauthorized: false
2631
}
2732

33+
console.log('connecting mqtt client')
2834
var client = mqtt.connect(host, options)
2935

3036
client.on('error', function (err) {
@@ -34,12 +40,10 @@ client.on('error', function (err) {
3440

3541
client.on('connect', function () {
3642
console.log('client connected:' + clientId)
43+
client.subscribe('topic', { qos: 0 })
44+
client.publish('topic', 'wss secure connection demo...!', { qos: 0, retain: false })
3745
})
3846

39-
client.subscribe('topic', { qos: 0 })
40-
41-
client.publish('topic', 'wss secure connection demo...!', { qos: 0, retain: false })
42-
4347
client.on('message', function (topic, message, packet) {
4448
console.log('Received Message:= ' + message.toString() + '\nOn topic:= ' + topic)
4549
})

lib/connect/ws.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

3+
var WebSocket = require('ws')
34
var debug = require('debug')('mqttjs:ws')
4-
var websocket = require('websocket-stream')
55
var urlModule = require('url')
66
var WSS_OPTIONS = [
77
'rejectUnauthorized',
@@ -51,15 +51,18 @@ function setDefaultOpts (opts) {
5151

5252
function createWebSocket (client, opts) {
5353
debug('createWebSocket')
54+
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
5455
var websocketSubProtocol =
5556
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
5657
? 'mqttv3.1'
5758
: 'mqtt'
5859

5960
setDefaultOpts(opts)
6061
var url = buildUrl(opts, client)
61-
debug('url %s protocol %s', url, websocketSubProtocol)
62-
return websocket(url, [websocketSubProtocol], opts.wsOptions)
62+
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
63+
var ws = new WebSocket(url, [websocketSubProtocol], opts.wsOptions)
64+
var duplex = WebSocket.createWebSocketStream(ws, opts.wsOptions)
65+
return duplex
6366
}
6467

6568
function streamBuilder (client, opts) {

package.json

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,20 @@
6464
"dependencies": {
6565
"base64-js": "^1.3.0",
6666
"commist": "^1.0.0",
67-
"concat-stream": "^1.6.2",
67+
"concat-stream": "^2.0.0",
6868
"debug": "^4.1.1",
6969
"end-of-stream": "^1.4.1",
7070
"es6-map": "^0.1.5",
7171
"help-me": "^1.0.1",
7272
"inherits": "^2.0.3",
73-
"minimist": "^1.2.0",
74-
"mqtt-packet": "^6.0.0",
73+
"minimist": "^1.2.5",
74+
"mqtt-packet": "^6.3.2",
7575
"pump": "^3.0.0",
7676
"readable-stream": "^2.3.6",
7777
"reinterval": "^1.1.0",
7878
"split2": "^3.1.0",
79-
"websocket-stream": "^5.1.2",
79+
"utf-8-validate": "^5.0.2",
80+
"ws": "^7.3.1",
8081
"xtend": "^4.0.1"
8182
},
8283
"devDependencies": {
@@ -86,10 +87,11 @@
8687
"chai": "^4.2.0",
8788
"codecov": "^3.0.4",
8889
"global": "^4.3.2",
89-
"mkdirp": "^0.5.1",
90-
"mocha": "^4.1.0",
90+
"aedes": "^0.42.5",
91+
"mkdirp": "^1.0.4",
92+
"mocha": "^7.2.0",
9193
"mqtt-connection": "^4.0.0",
92-
"nyc": "^15.0.0",
94+
"nyc": "^15.0.1",
9395
"pre-commit": "^1.2.2",
9496
"rimraf": "^3.0.2",
9597
"safe-buffer": "^5.1.2",
@@ -101,8 +103,7 @@
101103
"tslint": "^5.11.0",
102104
"tslint-config-standard": "^8.0.1",
103105
"typescript": "^3.2.2",
104-
"uglify-es": "^3.3.9",
105-
"ws": "^3.3.3"
106+
"uglify-es": "^3.3.9"
106107
},
107108
"standard": {
108109
"env": [

test/websocket_client.js

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
11
'use strict'
22

33
var http = require('http')
4-
var websocket = require('websocket-stream')
5-
var WebSocketServer = require('ws').Server
6-
var Connection = require('mqtt-connection')
4+
var WebSocket = require('ws')
5+
var MQTTConnection = require('mqtt-connection')
76
var abstractClientTests = require('./abstract_client')
87
var mqtt = require('../')
98
var xtend = require('xtend')
109
var assert = require('assert')
1110
var port = 9999
12-
var server = http.createServer()
11+
var httpServer = http.createServer()
1312

14-
function attachWebsocketServer (wsServer) {
15-
var wss = new WebSocketServer({server: wsServer, perMessageDeflate: false})
13+
function attachWebsocketServer (httpServer) {
14+
var webSocketServer = new WebSocket.Server({server: httpServer, perMessageDeflate: false})
1615

17-
wss.on('connection', function (ws) {
18-
var stream = websocket(ws)
19-
var connection = new Connection(stream)
20-
21-
wsServer.emit('client', connection)
16+
webSocketServer.on('connection', function (ws) {
17+
var stream = WebSocket.createWebSocketStream(ws)
18+
var connection = new MQTTConnection(stream)
19+
connection.protocol = ws.protocol
20+
httpServer.emit('client', connection)
2221
stream.on('error', function () {})
2322
connection.on('error', function () {})
2423
})
2524

26-
return wsServer
25+
return httpServer
2726
}
2827

2928
function attachClientEventHandlers (client) {
3029
client.on('connect', function (packet) {
3130
if (packet.clientId === 'invalid') {
3231
client.connack({ returnCode: 2 })
3332
} else {
34-
server.emit('connect', client)
33+
httpServer.emit('connect', client)
3534
client.connack({returnCode: 0})
3635
}
3736
})
@@ -81,9 +80,9 @@ function attachClientEventHandlers (client) {
8180
})
8281
}
8382

84-
attachWebsocketServer(server)
83+
attachWebsocketServer(httpServer)
8584

86-
server.on('client', attachClientEventHandlers).listen(port)
85+
httpServer.on('client', attachClientEventHandlers).listen(port)
8786

8887
describe('Websocket Client', function () {
8988
var baseConfig = { protocol: 'ws', port: port }
@@ -94,8 +93,8 @@ describe('Websocket Client', function () {
9493
}
9594

9695
it('should use mqtt as the protocol by default', function (done) {
97-
server.once('client', function (client) {
98-
assert.strictEqual(client.stream.socket.protocol, 'mqtt')
96+
httpServer.once('client', function (client) {
97+
assert.strictEqual(client.protocol, 'mqtt')
9998
})
10099
mqtt.connect(makeOptions()).on('connect', function () {
101100
this.end(true, done)
@@ -128,7 +127,7 @@ describe('Websocket Client', function () {
128127
})
129128

130129
it('should use mqttv3.1 as the protocol if using v3.1', function (done) {
131-
server.once('client', function (client) {
130+
httpServer.once('client', function (client) {
132131
assert.strictEqual(client.stream.socket.protocol, 'mqttv3.1')
133132
})
134133

@@ -142,5 +141,5 @@ describe('Websocket Client', function () {
142141
})
143142
})
144143

145-
abstractClientTests(server, makeOptions())
144+
abstractClientTests(httpServer, makeOptions())
146145
})

0 commit comments

Comments
 (0)