Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit a72624a

Browse files
authored
Merge pull request #470 from haadcode/refactor/pubsub
(WIP) feature: floodsub
2 parents 2b498b9 + ee6d283 commit a72624a

25 files changed

+489
-162
lines changed

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"ipld-dag-pb": "^0.9.2",
3535
"is-ipfs": "^0.2.1",
3636
"isstream": "^0.1.2",
37+
"js-base64": "^2.1.9",
3738
"lru-cache": "^4.0.2",
3839
"multiaddr": "^2.1.1",
3940
"multipart-stream": "^2.0.1",
@@ -63,7 +64,7 @@
6364
"gulp": "^3.9.1",
6465
"hapi": "^16.0.1",
6566
"interface-ipfs-core": "^0.22.0",
66-
"ipfsd-ctl": "^0.17.0",
67+
"@haad/ipfsd-ctl": "^0.18.0-beta.5",
6768
"pre-commit": "^1.1.3",
6869
"socket.io": "^1.7.1",
6970
"socket.io-client": "^1.7.1",
@@ -115,4 +116,4 @@
115116
"url": "https://github.com/ipfs/js-ipfs-api/issues"
116117
},
117118
"homepage": "https://github.com/ipfs/js-ipfs-api"
118-
}
119+
}

src/api/add.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
'use strict'
22

33
const isStream = require('isstream')
4-
const addToDagNodesTransform = require('../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((files, callback) => {
9-
const good = Buffer.isBuffer(files) ||
9+
const ok = Buffer.isBuffer(files) ||
1010
isStream.isReadable(files) ||
1111
Array.isArray(files)
1212

13-
if (!good) {
14-
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
13+
if (!ok) {
14+
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
1515
}
1616

17-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
18-
19-
return sendWithTransform({
17+
const request = {
2018
path: 'add',
2119
files: files
22-
}, callback)
20+
}
21+
22+
// Transform the response stream to DAGNode values
23+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
24+
send.andTransform(request, transform, callback)
2325
})
2426
}

src/api/dht.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return {
@@ -19,11 +20,13 @@ module.exports = (send) => {
1920
opts = {}
2021
}
2122

22-
send({
23+
const request = {
2324
path: 'dht/findprovs',
2425
args: args,
2526
qs: opts
26-
}, callback)
27+
}
28+
29+
send.andTransform(request, streamToValue, callback)
2730
}),
2831
get: promisify((key, opts, callback) => {
2932
if (typeof opts === 'function' &&

src/api/get.js

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
'use strict'
22

3-
const tarStreamToObjects = require('../tar-stream-to-objects')
4-
const cleanMultihash = require('../clean-multihash')
53
const promisify = require('promisify-es6')
4+
const cleanMultihash = require('../clean-multihash')
5+
const TarStreamToObjects = require('../tar-stream-to-objects')
66

77
module.exports = (send) => {
8-
return promisify(function get (path, opts, callback) {
8+
return promisify((path, opts, callback) => {
99
if (typeof opts === 'function' &&
1010
!callback) {
1111
callback = opts
@@ -26,12 +26,13 @@ module.exports = (send) => {
2626
return callback(err)
2727
}
2828

29-
var sendWithTransform = send.withTransform(tarStreamToObjects)
30-
31-
sendWithTransform({
29+
const request = {
3230
path: 'get',
3331
args: path,
3432
qs: opts
35-
}, callback)
33+
}
34+
35+
// Convert the response stream to TarStream objects
36+
send.andTransform(request, TarStreamToObjects.from, callback)
3637
})
3738
}

src/api/ping.js

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
return promisify((id, callback) => {
7-
send({
8+
const request = {
89
path: 'ping',
910
args: id,
1011
qs: { n: 1 }
11-
}, function (err, res) {
12-
if (err) {
13-
return callback(err, null)
14-
}
15-
callback(null, res[1])
16-
})
12+
}
13+
14+
// Transform the response stream to a value:
15+
// { Success: <boolean>, Time: <number>, Text: <string> }
16+
const transform = (res, callback) => {
17+
streamToValue(res, (err, res) => {
18+
if (err) {
19+
return callback(err)
20+
}
21+
22+
// go-ipfs http api currently returns 3 lines for a ping.
23+
// they're a little messed, so take the correct values from each lines.
24+
const pingResult = {
25+
Success: res[1].Success,
26+
Time: res[1].Time,
27+
Text: res[2].Text
28+
}
29+
30+
callback(null, pingResult)
31+
})
32+
}
33+
34+
send.andTransform(request, transform, callback)
1735
})
1836
}

src/api/pubsub.js

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const PubsubMessageStream = require('../pubsub-message-stream')
5+
const stringlistToArray = require('../stringlist-to-array')
6+
7+
/* Internal subscriptions state and functions */
8+
let subscriptions = {}
9+
10+
const addSubscription = (topic, request) => {
11+
subscriptions[topic] = { request: request }
12+
}
13+
14+
const removeSubscription = promisify((topic, callback) => {
15+
if (!subscriptions[topic]) {
16+
return callback(new Error(`Not subscribed to ${topic}`))
17+
}
18+
19+
subscriptions[topic].request.abort()
20+
delete subscriptions[topic]
21+
22+
if (callback) {
23+
callback(null)
24+
}
25+
})
26+
27+
/* Public API */
28+
module.exports = (send) => {
29+
return {
30+
subscribe: promisify((topic, options, callback) => {
31+
const defaultOptions = {
32+
discover: false
33+
}
34+
35+
if (typeof options === 'function') {
36+
callback = options
37+
options = defaultOptions
38+
}
39+
40+
if (!options) {
41+
options = defaultOptions
42+
}
43+
44+
// If we're already subscribed, return an error
45+
if (subscriptions[topic]) {
46+
return callback(new Error(`Already subscribed to '${topic}'`))
47+
}
48+
49+
// Request params
50+
const request = {
51+
path: 'pubsub/sub',
52+
args: [topic],
53+
qs: { discover: options.discover }
54+
}
55+
56+
// Start the request and transform the response stream to Pubsub messages stream
57+
const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
58+
if (err) {
59+
return callback(err)
60+
}
61+
// Add a cancel method to the stream so that the subscription can be cleanly cancelled
62+
stream.cancel = promisify((cb) => removeSubscription(topic, cb))
63+
// Add the request to the active subscriptions and return the stream
64+
addSubscription(topic, req)
65+
callback(null, stream)
66+
})
67+
}),
68+
publish: promisify((topic, data, callback) => {
69+
const buf = Buffer.isBuffer(data) ? data : new Buffer(data)
70+
71+
const request = {
72+
path: 'pubsub/pub',
73+
args: [topic, buf]
74+
}
75+
76+
send(request, callback)
77+
}),
78+
ls: promisify((callback) => {
79+
const request = {
80+
path: 'pubsub/ls'
81+
}
82+
83+
send.andTransform(request, stringlistToArray, callback)
84+
}),
85+
peers: promisify((topic, callback) => {
86+
if (!subscriptions[topic]) {
87+
return callback(new Error(`Not subscribed to '${topic}'`))
88+
}
89+
90+
const request = {
91+
path: 'pubsub/peers',
92+
args: [topic]
93+
}
94+
95+
send.andTransform(request, stringlistToArray, callback)
96+
})
97+
}
98+
}

src/api/refs.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,36 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const streamToValue = require('../stream-to-value')
45

56
module.exports = (send) => {
67
const refs = promisify((args, opts, callback) => {
78
if (typeof (opts) === 'function') {
89
callback = opts
910
opts = {}
1011
}
11-
return send({
12+
13+
const request = {
1214
path: 'refs',
1315
args: args,
1416
qs: opts
15-
}, callback)
17+
}
18+
19+
send.andTransform(request, streamToValue, callback)
1620
})
21+
1722
refs.local = promisify((opts, callback) => {
1823
if (typeof (opts) === 'function') {
1924
callback = opts
2025
opts = {}
2126
}
22-
return send({
27+
28+
const request = {
2329
path: 'refs',
2430
qs: opts
25-
}, callback)
31+
}
32+
33+
send.andTransform(request, streamToValue, callback)
2634
})
2735

2836
return refs

src/api/util/fs-add.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
'use strict'
22

33
const isNode = require('detect-node')
4-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
54
const promisify = require('promisify-es6')
5+
const DAGNodeStream = require('../../dagnode-stream')
66

77
module.exports = (send) => {
88
return promisify((path, opts, callback) => {
@@ -28,12 +28,14 @@ module.exports = (send) => {
2828
return callback(new Error('"path" must be a string'))
2929
}
3030

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
32-
33-
sendWithTransform({
31+
const request = {
3432
path: 'add',
3533
qs: opts,
3634
files: path
37-
}, callback)
35+
}
36+
37+
// Transform the response stream to DAGNode values
38+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
39+
send.andTransform(request, transform, callback)
3840
})
3941
}

src/api/util/url-add.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
const promisify = require('promisify-es6')
44
const once = require('once')
55
const parseUrl = require('url').parse
6-
76
const request = require('../../request')
8-
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
7+
const DAGNodeStream = require('../../dagnode-stream')
98

109
module.exports = (send) => {
1110
return promisify((url, opts, callback) => {
@@ -28,7 +27,6 @@ module.exports = (send) => {
2827
return callback(new Error('"url" param must be an http(s) url'))
2928
}
3029

31-
const sendWithTransform = send.withTransform(addToDagNodesTransform)
3230
callback = once(callback)
3331

3432
request(parseUrl(url).protocol)(url, (res) => {
@@ -37,11 +35,15 @@ module.exports = (send) => {
3735
return callback(new Error(`Failed to download with ${res.statusCode}`))
3836
}
3937

40-
sendWithTransform({
38+
const params = {
4139
path: 'add',
4240
qs: opts,
4341
files: res
44-
}, callback)
42+
}
43+
44+
// Transform the response stream to DAGNode values
45+
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
46+
send.andTransform(params, transform, callback)
4547
}).end()
4648
})
4749
}

0 commit comments

Comments
 (0)