Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 3f58dca

Browse files
dignifiedquiredaviddias
authored andcommitted
feat(pull): migrate to pull streams
1 parent 3c3a707 commit 3f58dca

File tree

7 files changed

+188
-199
lines changed

7 files changed

+188
-199
lines changed

gulpfile.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
const gulp = require('gulp')
44
const multiaddr = require('multiaddr')
5+
const pull = require('pull-stream')
6+
57
const WS = require('./src')
68

79
let listener
@@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => {
1012
const ws = new WS()
1113
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
1214
listener = ws.createListener((conn) => {
13-
conn.pipe(conn)
15+
pull(conn, conn)
1416
})
1517
listener.listen(ma, done)
1618
})

package.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@
3535
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
3636
"dependencies": {
3737
"detect-node": "^2.0.3",
38-
"interface-connection": "^0.1.8",
38+
"interface-connection": "^0.2.1",
3939
"lodash.contains": "^2.4.3",
4040
"mafmt": "^2.1.1",
41-
"run-parallel": "^1.1.6",
42-
"simple-websocket": "^4.1.0",
43-
"simple-websocket-server": "^0.1.4"
41+
"pull-ws": "^3.2.3"
4442
},
4543
"devDependencies": {
4644
"aegir": "^6.0.1",
47-
"multiaddr": "^2.0.2",
4845
"chai": "^3.5.0",
4946
"gulp": "^3.9.1",
50-
"interface-transport": "^0.2.0",
51-
"pre-commit": "^1.1.3"
47+
"interface-transport": "^0.3.1",
48+
"multiaddr": "^2.0.2",
49+
"pre-commit": "^1.1.3",
50+
"pull-goodbye": "0.0.1",
51+
"pull-stream": "^3.4.3"
5252
},
5353
"contributors": [
5454
"David Dias <[email protected]>",

src/index.js

Lines changed: 19 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,147 +1,55 @@
11
'use strict'
22

3-
const debug = require('debug')
4-
const log = debug('libp2p:websockets')
5-
const SW = require('simple-websocket')
6-
const isNode = require('detect-node')
7-
let SWS
8-
if (isNode) {
9-
SWS = require('simple-websocket-server')
10-
} else {
11-
SWS = {}
12-
}
3+
const connect = require('pull-ws/client')
134
const mafmt = require('mafmt')
145
const contains = require('lodash.contains')
156
const Connection = require('interface-connection').Connection
7+
const debug = require('debug')
8+
const log = debug('libp2p:websockets:dialer')
169

17-
const CLOSE_TIMEOUT = 2000
18-
// const IPFS_CODE = 421
19-
20-
exports = module.exports = WebSockets
21-
22-
function WebSockets () {
23-
if (!(this instanceof WebSockets)) {
24-
return new WebSockets()
25-
}
10+
const createListener = require('./listener')
2611

27-
this.dial = function (ma, options, callback) {
12+
module.exports = class WebSockets {
13+
dial (ma, options, callback) {
2814
if (typeof options === 'function') {
2915
callback = options
3016
options = {}
3117
}
3218

3319
if (!callback) {
34-
callback = function noop () {}
20+
callback = () => {}
3521
}
3622

3723
const maOpts = ma.toOptions()
3824

39-
const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)
40-
41-
const conn = new Connection(socket)
42-
43-
socket.on('timeout', () => {
44-
conn.emit('timeout')
45-
})
46-
47-
socket.on('error', (err) => {
48-
callback(err)
49-
conn.emit('error', err)
50-
})
51-
52-
socket.on('connect', () => {
53-
callback(null, conn)
54-
conn.emit('connect')
25+
const url = `ws://${maOpts.host}:${maOpts.port}`
26+
log('dialing %s', url)
27+
const socket = connect(url, {
28+
binary: true,
29+
onConnect: callback
5530
})
5631

57-
conn.getObservedAddrs = (cb) => {
58-
return cb(null, [ma])
59-
}
32+
const conn = new Connection(socket)
33+
conn.getObservedAddrs = (cb) => cb(null, [ma])
34+
conn.close = (cb) => socket.close(cb)
6035

6136
return conn
6237
}
6338

64-
this.createListener = (options, handler) => {
39+
createListener (options, handler) {
6540
if (typeof options === 'function') {
6641
handler = options
6742
options = {}
6843
}
6944

70-
const listener = SWS.createServer((socket) => {
71-
const conn = new Connection(socket)
72-
73-
conn.getObservedAddrs = (cb) => {
74-
// TODO research if we can reuse the address in anyway
75-
return cb(null, [])
76-
}
77-
handler(conn)
78-
})
79-
80-
let listeningMultiaddr
81-
82-
listener._listen = listener.listen
83-
listener.listen = (ma, callback) => {
84-
if (!callback) {
85-
callback = function noop () {}
86-
}
87-
88-
listeningMultiaddr = ma
89-
90-
if (contains(ma.protoNames(), 'ipfs')) {
91-
ma = ma.decapsulate('ipfs')
92-
}
93-
94-
listener._listen(ma.toOptions(), callback)
95-
}
96-
97-
listener._close = listener.close
98-
listener.close = (options, callback) => {
99-
if (typeof options === 'function') {
100-
callback = options
101-
options = { timeout: CLOSE_TIMEOUT }
102-
}
103-
if (!callback) { callback = function noop () {} }
104-
if (!options) { options = { timeout: CLOSE_TIMEOUT } }
105-
106-
let closed = false
107-
listener.once('close', () => {
108-
closed = true
109-
})
110-
listener._close(callback)
111-
setTimeout(() => {
112-
if (closed) {
113-
return
114-
}
115-
log('unable to close graciously, destroying conns')
116-
Object.keys(listener.__connections).forEach((key) => {
117-
log('destroying %s', key)
118-
listener.__connections[key].destroy()
119-
})
120-
}, options.timeout || CLOSE_TIMEOUT)
121-
}
122-
123-
// Keep track of open connections to destroy in case of timeout
124-
listener.__connections = {}
125-
listener.on('connection', (socket) => {
126-
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
127-
listener.__connections[key] = socket
128-
129-
socket.on('close', () => {
130-
delete listener.__connections[key]
131-
})
132-
})
133-
134-
listener.getAddrs = (callback) => {
135-
callback(null, [listeningMultiaddr])
136-
}
137-
138-
return listener
45+
return createListener(options, handler)
13946
}
14047

141-
this.filter = (multiaddrs) => {
48+
filter (multiaddrs) {
14249
if (!Array.isArray(multiaddrs)) {
14350
multiaddrs = [multiaddrs]
14451
}
52+
14553
return multiaddrs.filter((ma) => {
14654
if (contains(ma.protoNames(), 'ipfs')) {
14755
ma = ma.decapsulate('ipfs')

src/listener.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
'use strict'
2+
3+
const isNode = require('detect-node')
4+
const Connection = require('interface-connection').Connection
5+
const contains = require('lodash.contains')
6+
7+
// const IPFS_CODE = 421
8+
9+
let createServer
10+
11+
if (isNode) {
12+
createServer = require('pull-ws/server')
13+
} else {
14+
createServer = () => {}
15+
}
16+
17+
module.exports = (options, handler) => {
18+
const listener = createServer((socket) => {
19+
socket.getObservedAddrs = (cb) => {
20+
// TODO research if we can reuse the address in anyway
21+
return cb(null, [])
22+
}
23+
24+
handler(new Connection(socket))
25+
})
26+
27+
let listeningMultiaddr
28+
29+
listener._listen = listener.listen
30+
listener.listen = (ma, cb) => {
31+
cb = cb || (() => {})
32+
listeningMultiaddr = ma
33+
34+
if (contains(ma.protoNames(), 'ipfs')) {
35+
ma = ma.decapsulate('ipfs')
36+
}
37+
38+
listener._listen(ma.toOptions(), cb)
39+
}
40+
41+
listener.getAddrs = (cb) => {
42+
cb(null, [listeningMultiaddr])
43+
}
44+
45+
return listener
46+
}

test/browser.js

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,74 +3,67 @@
33

44
const expect = require('chai').expect
55
const multiaddr = require('multiaddr')
6+
const pull = require('pull-stream')
7+
const goodbye = require('pull-goodbye')
8+
69
const WS = require('../src')
710

811
describe('libp2p-websockets', () => {
12+
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
913
let ws
14+
let conn
1015

11-
it('create', (done) => {
16+
beforeEach((done) => {
1217
ws = new WS()
1318
expect(ws).to.exist
14-
done()
19+
conn = ws.dial(ma, done)
1520
})
1621

1722
it('echo', (done) => {
18-
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
19-
const conn = ws.dial(ma)
2023
const message = 'Hello World!'
21-
conn.write(message)
22-
conn.on('data', (data) => {
23-
expect(data.toString()).to.equal(message)
24-
conn.end()
25-
done()
26-
})
27-
})
2824

29-
describe('stress', () => {
30-
it('one big write', (done) => {
31-
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
32-
const conn = ws.dial(mh)
33-
const message = new Buffer(1000000).fill('a').toString('hex')
34-
conn.write(message)
35-
conn.on('data', (data) => {
36-
expect(data.toString()).to.equal(message)
37-
conn.end()
25+
const s = goodbye({
26+
source: pull.values([message]),
27+
sink: pull.collect((err, results) => {
28+
expect(err).to.not.exist
29+
expect(results).to.be.eql([message])
3830
done()
3931
})
4032
})
4133

42-
it('many writes in 2 batches', (done) => {
43-
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
44-
const conn = ws.dial(mh)
45-
let expected = ''
46-
let counter = 0
47-
while (++counter < 10000) {
48-
conn.write(`${counter} `)
49-
expected += `${counter} `
50-
}
51-
52-
setTimeout(() => {
53-
while (++counter < 20000) {
54-
conn.write(`${counter} `)
55-
expected += `${counter} `
56-
}
34+
pull(s, conn, s)
35+
})
5736

58-
conn.write('STOP')
59-
}, 1000)
37+
describe('stress', () => {
38+
it('one big write', (done) => {
39+
const rawMessage = new Buffer(1000000).fill('a')
6040

61-
let result = ''
62-
conn.on('data', (data) => {
63-
if (data.toString() === 'STOP') {
64-
conn.end()
65-
return
66-
}
67-
result += data.toString()
41+
const s = goodbye({
42+
source: pull.values([rawMessage]),
43+
sink: pull.collect((err, results) => {
44+
expect(err).to.not.exist
45+
expect(results).to.be.eql([rawMessage])
46+
done()
47+
})
6848
})
49+
pull(s, conn, s)
50+
})
6951

70-
conn.on('end', () => {
71-
expect(result).to.equal(expected)
72-
done()
52+
it('many writes', (done) => {
53+
const s = goodbye({
54+
source: pull(
55+
pull.infinite(),
56+
pull.take(1000),
57+
pull.map((val) => Buffer(val.toString()))
58+
),
59+
sink: pull.collect((err, result) => {
60+
expect(err).to.not.exist
61+
expect(result).to.have.length(1000)
62+
done()
63+
})
7364
})
65+
66+
pull(s, conn, s)
7467
})
7568
})
7669
})

test/compliance.node.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const tests = require('interface-transport')
5+
const multiaddr = require('multiaddr')
6+
const Ws = require('../src')
7+
8+
describe('compliance', () => {
9+
tests({
10+
setup (cb) {
11+
let ws = new Ws()
12+
const addrs = [
13+
multiaddr('/ip4/127.0.0.1/tcp/9091/ws'),
14+
multiaddr('/ip4/127.0.0.1/tcp/9092/ws'),
15+
multiaddr('/ip4/127.0.0.1/tcp/9093/ws')
16+
]
17+
cb(null, ws, addrs)
18+
},
19+
teardown (cb) {
20+
cb()
21+
}
22+
})
23+
})

0 commit comments

Comments
 (0)