Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit ee6d283

Browse files
committed
refactor(pubsub): Refactored and completed pubsub API
1 parent 3fbddff commit ee6d283

8 files changed

+220
-13
lines changed

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"ipld-dag-pb": "^0.9.2",
3535
"is-ipfs": "^0.2.1",
3636
"isstream": "^0.1.2",
37+
"js-base64": "^2.1.9",
3738
"lru-cache": "^4.0.2",
3839
"multiaddr": "^2.1.1",
3940
"multipart-stream": "^2.0.1",
@@ -63,7 +64,7 @@
6364
"gulp": "^3.9.1",
6465
"hapi": "^16.0.1",
6566
"interface-ipfs-core": "^0.22.0",
66-
"ipfsd-ctl": "^0.17.0",
67+
"@haad/ipfsd-ctl": "^0.18.0-beta.5",
6768
"pre-commit": "^1.1.3",
6869
"socket.io": "^1.7.1",
6970
"socket.io-client": "^1.7.1",
@@ -115,4 +116,4 @@
115116
"url": "https://github.com/ipfs/js-ipfs-api/issues"
116117
},
117118
"homepage": "https://github.com/ipfs/js-ipfs-api"
118-
}
119+
}

src/api/pubsub.js

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const PubsubMessageStream = require('../pubsub-message-stream')
5+
const stringlistToArray = require('../stringlist-to-array')
6+
7+
/* Internal subscriptions state and functions */
8+
let subscriptions = {}
9+
10+
const addSubscription = (topic, request) => {
11+
subscriptions[topic] = { request: request }
12+
}
13+
14+
const removeSubscription = promisify((topic, callback) => {
15+
if (!subscriptions[topic]) {
16+
return callback(new Error(`Not subscribed to ${topic}`))
17+
}
18+
19+
subscriptions[topic].request.abort()
20+
delete subscriptions[topic]
21+
22+
if (callback) {
23+
callback(null)
24+
}
25+
})
26+
27+
/* Public API */
28+
module.exports = (send) => {
29+
return {
30+
subscribe: promisify((topic, options, callback) => {
31+
const defaultOptions = {
32+
discover: false
33+
}
34+
35+
if (typeof options === 'function') {
36+
callback = options
37+
options = defaultOptions
38+
}
39+
40+
if (!options) {
41+
options = defaultOptions
42+
}
43+
44+
// If we're already subscribed, return an error
45+
if (subscriptions[topic]) {
46+
return callback(new Error(`Already subscribed to '${topic}'`))
47+
}
48+
49+
// Request params
50+
const request = {
51+
path: 'pubsub/sub',
52+
args: [topic],
53+
qs: { discover: options.discover }
54+
}
55+
56+
// Start the request and transform the response stream to Pubsub messages stream
57+
const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
58+
if (err) {
59+
return callback(err)
60+
}
61+
// Add a cancel method to the stream so that the subscription can be cleanly cancelled
62+
stream.cancel = promisify((cb) => removeSubscription(topic, cb))
63+
// Add the request to the active subscriptions and return the stream
64+
addSubscription(topic, req)
65+
callback(null, stream)
66+
})
67+
}),
68+
publish: promisify((topic, data, callback) => {
69+
const buf = Buffer.isBuffer(data) ? data : new Buffer(data)
70+
71+
const request = {
72+
path: 'pubsub/pub',
73+
args: [topic, buf]
74+
}
75+
76+
send(request, callback)
77+
}),
78+
ls: promisify((callback) => {
79+
const request = {
80+
path: 'pubsub/ls'
81+
}
82+
83+
send.andTransform(request, stringlistToArray, callback)
84+
}),
85+
peers: promisify((topic, callback) => {
86+
if (!subscriptions[topic]) {
87+
return callback(new Error(`Not subscribed to '${topic}'`))
88+
}
89+
90+
const request = {
91+
path: 'pubsub/peers',
92+
args: [topic]
93+
}
94+
95+
send.andTransform(request, stringlistToArray, callback)
96+
})
97+
}
98+
}

src/load-commands.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ function requireCommands () {
2525
refs: require('./api/refs'),
2626
repo: require('./api/repo'),
2727
swarm: require('./api/swarm'),
28+
pubsub: require('./api/pubsub'),
2829
update: require('./api/update'),
2930
version: require('./api/version')
3031
}

src/pubsub-message-stream.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict'
2+
3+
const TransformStream = require('readable-stream').Transform
4+
const PubsubMessage = require('./pubsub-message-utils')
5+
6+
class PubsubMessageStream extends TransformStream {
7+
constructor (options) {
8+
const opts = Object.assign(options || {}, { objectMode: true })
9+
super(opts)
10+
}
11+
12+
static from (inputStream, callback) {
13+
let outputStream = inputStream.pipe(new PubsubMessageStream())
14+
inputStream.on('end', () => outputStream.emit('end'))
15+
callback(null, outputStream)
16+
}
17+
18+
_transform (obj, enc, callback) {
19+
try {
20+
const message = PubsubMessage.deserialize(obj, 'base64')
21+
this.push(message)
22+
} catch (e) {
23+
// Not a valid pubsub message
24+
// go-ipfs returns '{}' as the very first object atm, we skip that
25+
}
26+
callback()
27+
}
28+
}
29+
30+
module.exports = PubsubMessageStream

src/pubsub-message-utils.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
'use strict'
2+
3+
const Base58 = require('bs58')
4+
const Base64 = require('js-base64').Base64
5+
const PubsubMessage = require('./pubsub-message')
6+
7+
class PubsubMessageUtils {
8+
static create (senderId, data, seqNo, topics) {
9+
return new PubsubMessage(senderId, data, seqNo, topics)
10+
}
11+
12+
static deserialize (data, enc = 'json') {
13+
enc = enc ? enc.toLowerCase() : null
14+
15+
if (enc === 'json') {
16+
return PubsubMessageUtils._deserializeFromJson(data)
17+
} else if (enc === 'base64') {
18+
return PubsubMessageUtils._deserializeFromBase64(data)
19+
}
20+
21+
throw new Error(`Unsupported encoding: '${enc}'`)
22+
}
23+
24+
static _deserializeFromJson (data) {
25+
const json = JSON.parse(data)
26+
return PubsubMessageUtils._deserializeFromBase64(json)
27+
}
28+
29+
static _deserializeFromBase64 (obj) {
30+
if (!PubsubMessageUtils._isPubsubMessage(obj)) {
31+
throw new Error(`Not a pubsub message`)
32+
}
33+
34+
const senderId = Base58.encode(obj.from)
35+
const payload = Base64.decode(obj.data)
36+
const seqno = Base64.decode(obj.seqno)
37+
const topics = obj.topicIDs
38+
39+
return PubsubMessageUtils.create(senderId, payload, seqno, topics)
40+
}
41+
42+
static _isPubsubMessage (obj) {
43+
return obj && obj.from && obj.seqno && obj.data && obj.topicIDs
44+
}
45+
}
46+
47+
module.exports = PubsubMessageUtils

src/pubsub-message.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict'
2+
3+
class PubsubMessage {
4+
constructor (senderId, data, seqNo, topics) {
5+
this._senderId = senderId
6+
this._data = data
7+
this._seqNo = seqNo
8+
this._topics = topics
9+
}
10+
11+
get from () {
12+
return this._senderId
13+
}
14+
15+
get data () {
16+
return this._data
17+
}
18+
19+
get seqno () {
20+
return this._seqNo
21+
}
22+
23+
get topicIDs () {
24+
return this._topics
25+
}
26+
}
27+
28+
module.exports = PubsubMessage

test/factory/daemon-spawner.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
// const defaultConfig = require('./default-config.json')
4-
const ipfsd = require('ipfsd-ctl')
4+
const ipfsd = require('@haad/ipfsd-ctl')
55
const series = require('async/series')
66
const eachSeries = require('async/eachSeries')
77
const once = require('once')
@@ -73,17 +73,19 @@ function spawnEphemeralNode (callback) {
7373
(cb) => {
7474
const configValues = {
7575
Bootstrap: [],
76-
Discovery: {},
77-
'HTTPHeaders.Access-Control-Allow-Origin': ['*'],
78-
'HTTPHeaders.Access-Control-Allow-Credentials': 'true',
79-
'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET']
76+
// Discovery: {},
77+
API: {
78+
'HTTPHeaders.Access-Control-Allow-Origin': ['*'],
79+
'HTTPHeaders.Access-Control-Allow-Credentials': 'true',
80+
'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET']
81+
}
8082
}
8183

8284
eachSeries(Object.keys(configValues), (configKey, cb) => {
83-
node.setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb)
85+
node.setConfig(`${configKey}`, JSON.stringify(configValues[configKey]), cb)
8486
}, cb)
8587
},
86-
(cb) => node.startDaemon(cb)
88+
(cb) => node.startDaemon(['--enable-pubsub-experiment'], cb)
8789
], (err) => {
8890
if (err) {
8991
return callback(err)

test/setup/spawn-daemons.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
const gulp = require('gulp')
66
const fs = require('fs')
77
const path = require('path')
8-
const ipfsd = require('ipfsd-ctl')
8+
const ipfsd = require('@haad/ipfsd-ctl')
99
const eachSeries = require('async/eachSeries')
1010
const parallel = require('async/parallel')
1111

@@ -27,7 +27,7 @@ function startDisposableDaemons (callback) {
2727

2828
const configValues = {
2929
Bootstrap: [],
30-
Discovery: {},
30+
// Discovery: {},
3131
API: {
3232
'HTTPHeaders.Access-Control-Allow-Origin': ['*'],
3333
'HTTPHeaders.Access-Control-Allow-Credentials': 'true',
@@ -36,13 +36,13 @@ function startDisposableDaemons (callback) {
3636
}
3737

3838
eachSeries(Object.keys(configValues), (configKey, cb) => {
39-
nodes[key].setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb)
39+
nodes[key].setConfig(`${configKey}`, JSON.stringify(configValues[configKey]), cb)
4040
}, (err) => {
4141
if (err) {
4242
return cb(err)
4343
}
4444

45-
nodes[key].startDaemon(cb)
45+
nodes[key].startDaemon(['--enable-pubsub-experiment'], cb)
4646
})
4747
})
4848
}

0 commit comments

Comments
 (0)