Skip to content
This repository was archived by the owner on Oct 19, 2022. It is now read-only.

Commit 3f32f51

Browse files
authored
Merge pull request #19 from multiformats/pull
[WIP] pull-streams
2 parents e5a48d0 + bdc239f commit 3f32f51

File tree

8 files changed

+433
-211
lines changed

8 files changed

+433
-211
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,33 @@ ms.ls(<callback>)
146146

147147
`callback` is a function of type `function (err, protocols)` where `err` is an error object that gets passed if something wrong happend and `protocols` is an array of the supported protocols in the other end.
148148

149+
### This module uses `pull-streams`
150+
151+
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
152+
153+
You can learn more about pull-streams at:
154+
155+
- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
156+
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
157+
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
158+
- [pull-streams documentation](https://pull-stream.github.io/)
159+
160+
#### Converting `pull-streams` to Node.js Streams
161+
162+
If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:
163+
164+
```js
165+
const pullToStream = require('pull-stream-to-stream')
166+
167+
const nodeStreamInstance = pullToStream(pullStreamInstance)
168+
// nodeStreamInstance is an instance of a Node.js Stream
169+
```
170+
171+
To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.
172+
173+
174+
175+
149176
## Maintainers
150177

151178
Captain: [@diasdavid](https://github.com/diasdavid).
@@ -161,4 +188,3 @@ Small note: If editing the Readme, please conform to the [standard-readme](https
161188
## License
162189

163190
[MIT](LICENSE) © David Dias
164-

package.json

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,28 @@
4141
"author": "David Dias <[email protected]>",
4242
"license": "MIT",
4343
"dependencies": {
44-
"babel-runtime": "^6.6.1",
45-
"length-prefixed-stream": "^1.5.0",
46-
"lodash.range": "^3.1.5",
47-
"run-series": "^1.1.4",
48-
"varint": "^4.0.0"
44+
"babel-runtime": "^6.11.6",
45+
"debug": "^2.2.0",
46+
"interface-connection": "^0.2.1",
47+
"lodash.isfunction": "^3.0.8",
48+
"lodash.range": "^3.1.7",
49+
"pull-handshake": "^1.1.3",
50+
"pull-length-prefixed": "^1.1.0",
51+
"pull-stream": "^3.4.3",
52+
"varint": "^4.0.1"
4953
},
5054
"devDependencies": {
51-
"aegir": "^3.1.0",
52-
"run-parallel": "^1.1.6",
53-
"bl": "^1.1.2",
55+
"aegir": "^8.0.0",
5456
"chai": "^3.5.0",
55-
"pre-commit": "^1.1.2",
56-
"stream-pair": "^1.0.3"
57+
"pre-commit": "^1.1.3",
58+
"pull-pair": "^1.1.0",
59+
"run-parallel": "^1.1.6",
60+
"run-series": "^1.1.4"
5761
},
5862
"contributors": [
5963
"David Dias <[email protected]>",
6064
"David Dias <[email protected]>",
6165
"Richard Littauer <[email protected]>",
6266
"dignifiedquire <[email protected]>"
6367
]
64-
}
68+
}

src/agreement.js

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict'
2+
3+
const handshake = require('pull-handshake')
4+
const lp = require('pull-length-prefixed')
5+
const pull = require('pull-stream')
6+
const Connection = require('interface-connection').Connection
7+
const debug = require('debug')
8+
const log = debug('multistream:agreement')
9+
log.error = debug('multistream:agreement:error')
10+
11+
exports.select = (multicodec, callback) => {
12+
const stream = handshake({
13+
timeout: 60 * 1000
14+
}, callback)
15+
16+
const shake = stream.handshake
17+
18+
log('writing multicodec %s', multicodec)
19+
writeEncoded(shake, new Buffer(multicodec + '\n'), callback)
20+
21+
lp.decodeFromReader(shake, (err, data) => {
22+
if (err) {
23+
return callback(err)
24+
}
25+
const protocol = data.toString().slice(0, -1)
26+
27+
if (protocol !== multicodec) {
28+
return callback(new Error(`"${multicodec}" not supported`), shake.rest())
29+
}
30+
31+
log('multicodec ack')
32+
callback(null, shake.rest())
33+
})
34+
35+
return stream
36+
}
37+
38+
exports.handlerSelector = (rawConn, handlersMap) => {
39+
const cb = (err) => {
40+
// incoming errors are irrelevant for the app
41+
log.error(err)
42+
}
43+
44+
const stream = handshake({
45+
timeout: 60 * 1000
46+
}, cb)
47+
48+
const shake = stream.handshake
49+
50+
next()
51+
52+
function next () {
53+
lp.decodeFromReader(shake, (err, data) => {
54+
if (err) {
55+
return cb(err)
56+
}
57+
log('received: %s', data.toString())
58+
const protocol = data.toString().slice(0, -1)
59+
const result = Object.keys(handlersMap).filter((id) => id === protocol)
60+
const key = result && result[0]
61+
62+
if (key) {
63+
log('ack: %s', protocol)
64+
writeEncoded(shake, data, cb)
65+
handlersMap[key](new Connection(shake.rest(), rawConn))
66+
} else {
67+
log('received multicodec of not supported protocol: %s', protocol)
68+
writeEncoded(shake, new Buffer('na\n'))
69+
next()
70+
}
71+
})
72+
}
73+
74+
return stream
75+
}
76+
77+
// prefixes a message with a varint
78+
function encode (msg, cb) {
79+
const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)]
80+
81+
pull(
82+
pull.values(values),
83+
lp.encode(),
84+
pull.collect((err, encoded) => {
85+
if (err) {
86+
return cb(err)
87+
}
88+
cb(null, encoded[0])
89+
})
90+
)
91+
}
92+
93+
function writeEncoded (writer, msg, cb) {
94+
encode(msg, (err, msg) => {
95+
if (err) {
96+
return cb(err)
97+
}
98+
writer.write(msg)
99+
})
100+
}

src/constants.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
'use strict'
2+
3+
module.exports = {
4+
PROTOCOL_ID: '/multistream/1.0.0'
5+
}

src/dialer.js

Lines changed: 75 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,100 @@
11
'use strict'
22

3-
const lps = require('length-prefixed-stream')
4-
const PROTOCOL_ID = require('./protocol-id')
3+
const lp = require('pull-length-prefixed')
54
const varint = require('varint')
6-
const range = require('lodash.range')
7-
const series = require('run-series')
5+
const pull = require('pull-stream')
6+
const Connection = require('interface-connection').Connection
7+
const debug = require('debug')
8+
const log = debug('multistream:dialer')
89

9-
exports = module.exports = Dialer
10+
const PROTOCOL_ID = require('./constants').PROTOCOL_ID
11+
const agrmt = require('./agreement')
1012

11-
function Dialer () {
12-
if (!(this instanceof Dialer)) {
13-
return new Dialer()
13+
module.exports = class Dialer {
14+
constructor () {
15+
this.conn = null
1416
}
1517

16-
const encode = lps.encode()
17-
const decode = lps.decode()
18-
let conn
19-
2018
// perform the multistream handshake
21-
this.handle = (_conn, callback) => {
22-
encode.pipe(_conn)
23-
_conn.pipe(decode)
24-
25-
decode.once('data', (buffer) => {
26-
const msg = buffer.toString().slice(0, -1)
27-
if (msg === PROTOCOL_ID) {
28-
encode.write(new Buffer(PROTOCOL_ID + '\n'))
29-
conn = _conn
30-
callback()
31-
} else {
32-
callback(new Error('Incompatible multistream'))
19+
handle (rawConn, callback) {
20+
log('handling connection')
21+
const ms = agrmt.select(PROTOCOL_ID, (err, conn) => {
22+
if (err) {
23+
return callback(err)
3324
}
25+
log('handshake success')
26+
27+
this.conn = new Connection(conn, rawConn)
28+
29+
callback()
3430
})
31+
pull(rawConn, ms, rawConn)
3532
}
3633

37-
this.select = (protocol, callback) => {
38-
if (!conn) {
34+
select (protocol, callback) {
35+
log('dialer select %s', protocol)
36+
if (!this.conn) {
3937
return callback(new Error('multistream handshake has not finalized yet'))
4038
}
4139

42-
encode.write(new Buffer(protocol + '\n'))
43-
decode.once('data', function (msgBuffer) {
44-
const msg = msgBuffer.toString().slice(0, -1)
45-
if (msg === protocol) {
46-
return callback(null, conn)
47-
}
48-
if (msg === 'na') {
49-
return callback(new Error(protocol + ' not supported'))
40+
const selectStream = agrmt.select(protocol, (err, conn) => {
41+
if (err) {
42+
this.conn = new Connection(conn, this.conn)
43+
return callback(err)
5044
}
45+
callback(null, new Connection(conn, this.conn))
5146
})
47+
48+
pull(
49+
this.conn,
50+
selectStream,
51+
this.conn
52+
)
5253
}
5354

54-
this.ls = (callback) => {
55-
encode.write(new Buffer('ls' + '\n'))
56-
let protos = []
57-
decode.once('data', function (msgBuffer) {
58-
const size = varint.decode(msgBuffer) // eslint-disable-line
59-
const nProtos = varint.decode(msgBuffer, varint.decode.bytes)
60-
61-
timesSeries(nProtos, (n, next) => {
62-
decode.once('data', function (msgBuffer) {
63-
protos.push(msgBuffer.toString().slice(0, -1))
64-
next()
55+
ls (callback) {
56+
const lsStream = agrmt.select('ls', (err, conn) => {
57+
if (err) {
58+
return callback(err)
59+
}
60+
61+
pull(
62+
conn,
63+
lp.decode(),
64+
collectLs(conn),
65+
pull.map(stringify),
66+
pull.collect((err, list) => {
67+
if (err) {
68+
return callback(err)
69+
}
70+
callback(null, list.slice(1))
6571
})
66-
}, (err) => {
67-
if (err) {
68-
return callback(err)
69-
}
70-
callback(null, protos)
71-
})
72+
)
7273
})
74+
75+
pull(
76+
this.conn,
77+
lsStream,
78+
this.conn
79+
)
7380
}
7481
}
7582

76-
function timesSeries (i, work, callback) {
77-
series(range(i).map((i) => (callback) => work(i, callback)), callback)
83+
function stringify (buf) {
84+
return buf.toString().slice(0, -1)
85+
}
86+
87+
function collectLs (conn) {
88+
let first = true
89+
let counter = 0
90+
91+
return pull.take((msg) => {
92+
if (first) {
93+
varint.decode(msg)
94+
counter = varint.decode(msg, varint.decode.bytes)
95+
return true
96+
}
97+
98+
return counter-- > 0
99+
})
78100
}

0 commit comments

Comments
 (0)