From b52179f99e5c8b8ecbe69a4c2e016a8176ff68da Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 18 Jul 2019 17:40:05 +0100 Subject: [PATCH 1/5] feat: promisify all api methods that accept callbacks This is a stop-gap until the full async/await migration can be completed. It means we can refactor tests of other modules that depend on this module without having to mix async flow control strategies. N.b. some methods that were previously callable without callbacks (e.g. `node.start()`, `node.stop()`, etc) now require callbacks otherwise a promise is returned which, if rejected, can cause `unhandledPromiseRejection` events and lead to memory leaks. --- package.json | 1 + src/content-routing.js | 9 +++++---- src/dht.js | 13 +++++++------ src/get-peer-info.js | 5 +++-- src/index.js | 12 ++++++++++-- src/peer-routing.js | 5 +++-- src/pubsub.js | 29 ++++++++++++++++++----------- test/fsm.spec.js | 18 +++++++++--------- 8 files changed, 56 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index 71e18a3e92..2a797d1c93 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,7 @@ "peer-book": "^0.9.1", "peer-id": "^0.12.2", "peer-info": "^0.15.1", + "promisify-es6": "^1.0.3", "superstruct": "^0.6.0" }, "devDependencies": { diff --git a/src/content-routing.js b/src/content-routing.js index 3ebacb59fa..099aa6e44e 100644 --- a/src/content-routing.js +++ b/src/content-routing.js @@ -3,6 +3,7 @@ const tryEach = require('async/tryEach') const parallel = require('async/parallel') const errCode = require('err-code') +const promisify = require('promisify-es6') module.exports = (node) => { const routers = node._modules.contentRouting || [] @@ -24,7 +25,7 @@ module.exports = (node) => { * @param {function(Error, Result)} callback * @returns {void} */ - findProviders: (key, options, callback) => { + findProviders: promisify((key, options, callback) => { if (typeof options === 'function') { callback = options options = {} @@ -60,7 +61,7 @@ module.exports = (node) => { results = results || [] callback(null, results) }) - }, + }), /** * Iterates over all content routers in parallel to notify it is @@ -70,7 +71,7 @@ module.exports = (node) => { * @param {function(Error)} callback * @returns {void} */ - provide: (key, callback) => { + provide: promisify((key, callback) => { if (!routers.length) { return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')) } @@ -78,6 +79,6 @@ module.exports = (node) => { parallel(routers.map((router) => { return (cb) => router.provide(key, cb) }), callback) - } + }) } } diff --git a/src/dht.js b/src/dht.js index f727c27108..f53c09b9cd 100644 --- a/src/dht.js +++ b/src/dht.js @@ -2,19 +2,20 @@ const nextTick = require('async/nextTick') const errCode = require('err-code') +const promisify = require('promisify-es6') const { messages, codes } = require('./errors') module.exports = (node) => { return { - put: (key, value, callback) => { + put: promisify((key, value, callback) => { if (!node._dht) { return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) } node._dht.put(key, value, callback) - }, - get: (key, options, callback) => { + }), + get: promisify((key, options, callback) => { if (typeof options === 'function') { callback = options options = {} @@ -25,8 +26,8 @@ module.exports = (node) => { } node._dht.get(key, options, callback) - }, - getMany: (key, nVals, options, callback) => { + }), + getMany: promisify((key, nVals, options, callback) => { if (typeof options === 'function') { callback = options options = {} @@ -37,6 +38,6 @@ module.exports = (node) => { } node._dht.getMany(key, nVals, options, callback) - } + }) } } diff --git a/src/get-peer-info.js b/src/get-peer-info.js index 9de1a7b652..4b6ec7d139 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -4,12 +4,13 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const errCode = require('err-code') +const promisify = require('promisify-es6') module.exports = (node) => { /* * Helper method to check the data type of peer and convert it to PeerInfo */ - return function (peer, callback) { + return promisify(function (peer, callback) { let p // PeerInfo if (PeerInfo.isPeerInfo(peer)) { @@ -62,5 +63,5 @@ module.exports = (node) => { } callback(null, p) - } + }) } diff --git a/src/index.js b/src/index.js index 826ed51ccd..58f8fdfe10 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const debug = require('debug') const log = debug('libp2p') log.error = debug('libp2p:error') const errCode = require('err-code') +const promisify = require('promisify-es6') const each = require('async/each') const series = require('async/series') @@ -186,6 +187,13 @@ class Libp2p extends EventEmitter { }) this._peerDiscovered = this._peerDiscovered.bind(this) + + // promisify all instance methods + ;['start', 'stop', 'dial', 'dialProtocol', 'dialFSM', 'hangUp', 'ping'].forEach(method => { + this[method] = promisify(this[method], { + context: this + }) + }) } /** @@ -557,7 +565,7 @@ module.exports = Libp2p * @param {function(Error, Libp2p)} callback * @returns {void} */ -module.exports.createLibp2p = (options, callback) => { +module.exports.createLibp2p = promisify((options, callback) => { if (options.peerInfo) { return nextTick(callback, null, new Libp2p(options)) } @@ -566,4 +574,4 @@ module.exports.createLibp2p = (options, callback) => { options.peerInfo = peerInfo callback(null, new Libp2p(options)) }) -} +}) diff --git a/src/peer-routing.js b/src/peer-routing.js index 998c802328..d1b768d2f4 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -2,6 +2,7 @@ const tryEach = require('async/tryEach') const errCode = require('err-code') +const promisify = require('promisify-es6') module.exports = (node) => { const routers = node._modules.peerRouting || [] @@ -21,7 +22,7 @@ module.exports = (node) => { * @param {function(Error, Result)} callback * @returns {void} */ - findPeer: (id, options, callback) => { + findPeer: promisify((id, options, callback) => { if (typeof options === 'function') { callback = options options = {} @@ -53,6 +54,6 @@ module.exports = (node) => { results = results || [] callback(null, results) }) - } + }) } } diff --git a/src/pubsub.js b/src/pubsub.js index 0706e5d471..174db8ed8d 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -3,6 +3,7 @@ const nextTick = require('async/nextTick') const { messages, codes } = require('./errors') const FloodSub = require('libp2p-floodsub') +const promisify = require('promisify-es6') const errCode = require('err-code') @@ -12,7 +13,7 @@ module.exports = (node) => { node._floodSub = floodSub return { - subscribe: (topic, options, handler, callback) => { + subscribe: promisify((topic, options, handler, callback) => { if (typeof options === 'function') { callback = handler handler = options @@ -33,13 +34,19 @@ module.exports = (node) => { } subscribe(callback) - }, + }), - unsubscribe: (topic, handler, callback) => { + unsubscribe: promisify((topic, handler, callback) => { if (!node.isStarted() && !floodSub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } - if (!handler && !callback) { + if (!callback) { + // we will always be passed a callback because of promisify. If we weren't passed one + // really it means no handler was passed.. + callback = handler + handler = null + } + if (!handler) { floodSub.removeAllListeners(topic) } else { floodSub.removeListener(topic, handler) @@ -52,9 +59,9 @@ module.exports = (node) => { if (typeof callback === 'function') { nextTick(() => callback()) } - }, + }), - publish: (topic, data, callback) => { + publish: promisify((topic, data, callback) => { if (!node.isStarted() && !floodSub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -64,9 +71,9 @@ module.exports = (node) => { } floodSub.publish(topic, data, callback) - }, + }), - ls: (callback) => { + ls: promisify((callback) => { if (!node.isStarted() && !floodSub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -74,9 +81,9 @@ module.exports = (node) => { const subscriptions = Array.from(floodSub.subscriptions) nextTick(() => callback(null, subscriptions)) - }, + }), - peers: (topic, callback) => { + peers: promisify((topic, callback) => { if (!node.isStarted() && !floodSub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -91,7 +98,7 @@ module.exports = (node) => { .map((peer) => peer.info.id.toB58String()) nextTick(() => callback(null, peers)) - }, + }), setMaxListeners (n) { return floodSub.setMaxListeners(n) diff --git a/test/fsm.spec.js b/test/fsm.spec.js index dceef0574e..d8d76f0019 100644 --- a/test/fsm.spec.js +++ b/test/fsm.spec.js @@ -60,9 +60,9 @@ describe('libp2p state machine (fsm)', () => { node.once('stop', done) // stop the stopped node - node.stop() + node.stop(() => {}) }) - node.start() + node.start(() => {}) }) it('should callback with an error when it occurs on stop', (done) => { @@ -79,7 +79,7 @@ describe('libp2p state machine (fsm)', () => { expect(2).checks(done) sinon.stub(node._switch, 'stop').callsArgWith(0, error) - node.start() + node.start(() => {}) }) it('should noop when starting a started node', (done) => { @@ -89,13 +89,13 @@ describe('libp2p state machine (fsm)', () => { }) node.once('start', () => { node.once('stop', done) - node.stop() + node.stop(() => {}) }) // start the started node - node.start() + node.start(() => {}) }) - node.start() + node.start(() => {}) }) it('should error on start with no transports', (done) => { @@ -115,7 +115,7 @@ describe('libp2p state machine (fsm)', () => { expect(2).checks(done) - node.start() + node.start(() => {}) }) it('should not start if the switch fails to start', (done) => { @@ -150,7 +150,7 @@ describe('libp2p state machine (fsm)', () => { }) }) - node.stop() + node.stop(() => {}) }) it('should not dial (fsm) when the node is stopped', (done) => { @@ -162,7 +162,7 @@ describe('libp2p state machine (fsm)', () => { }) }) - node.stop() + node.stop(() => {}) }) }) }) From f1b708769cda67bc271e1a435a748243746dc8b1 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 23 Jul 2019 21:22:00 +0200 Subject: [PATCH 2/5] docs: add a global note to the api about promisify --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2b155ed7d7..3d3a36f07c 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,8 @@ class Node extends Libp2p { ### API +**IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670. + #### Create a Node - `Libp2p.createLibp2p(options, callback)` > Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead From ad9d49e6760207d3901abf95ee0231475dfca5aa Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 26 Jul 2019 16:40:09 +0200 Subject: [PATCH 3/5] fix: update the logic for unsubscribe --- src/pubsub.js | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index 174db8ed8d..2ed88ea997 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -36,16 +36,33 @@ module.exports = (node) => { subscribe(callback) }), + /** + * Unsubscribes from a pubsub topic + * + * @param {string} topic + * @param {function|null} handler The handler to unsubscribe from + * @param {function} [callback] An optional callback + * + * @returns {Promise|void} A promise is returned if no callback is provided + * + * @example Unsubscribe a topic for all handlers + * + * // `null` must be passed until unsubscribe is no longer using promisify + * await libp2p.unsubscribe(topic, null) + * + * @example Unsubscribe a topic for 1 handler + * + * await libp2p.unsubscribe(topic, handler) + * + * @example Use a callback instead of the Promise api + * + * libp2p.unsubscribe(topic, handler, callback) + */ unsubscribe: promisify((topic, handler, callback) => { if (!node.isStarted() && !floodSub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } - if (!callback) { - // we will always be passed a callback because of promisify. If we weren't passed one - // really it means no handler was passed.. - callback = handler - handler = null - } + if (!handler) { floodSub.removeAllListeners(topic) } else { @@ -57,8 +74,10 @@ module.exports = (node) => { } if (typeof callback === 'function') { - nextTick(() => callback()) + return nextTick(() => callback()) } + + return Promise.resolve() }), publish: promisify((topic, data, callback) => { From e0ded98ee260df9ac63a0c83ce7ad7374ddcb307 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Jul 2019 12:51:10 +0200 Subject: [PATCH 4/5] test(fix): correct pubsub unsubscribe usage for api change --- test/pubsub.node.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index f3795c3590..ca1f1071b4 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -117,12 +117,8 @@ describe('.pubsub', () => { (cb) => nodes[1].pubsub.publish('pubsub', data, cb), // Wait a moment before unsubscribing (cb) => setTimeout(cb, 500), - // unsubscribe on the first - (cb) => { - nodes[0].pubsub.unsubscribe('pubsub') - // Wait a moment to make sure the ubsubscribe-from-all worked - setTimeout(cb, 500) - }, + // unsubscribe from all + (cb) => nodes[0].pubsub.unsubscribe('pubsub', null, cb), // Verify unsubscribed (cb) => { nodes[0].pubsub.ls((err, topics) => { From ca8551e8b7a6c0da2f3eb17b08d1729ea4447531 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Jul 2019 13:16:44 +0200 Subject: [PATCH 5/5] test(fix): update content routing tests for latest delegate version --- test/content-routing.node.js | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/test/content-routing.node.js b/test/content-routing.node.js index 9c7eecde50..8554b3b7a1 100644 --- a/test/content-routing.node.js +++ b/test/content-routing.node.js @@ -185,19 +185,10 @@ describe('.contentRouting', () => { it('should be able to register as a provider', (done) => { const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') const mockApi = nock('http://0.0.0.0:60197') - // mock the swarm connect - .post('/api/v0/swarm/connect') - .query({ - arg: `/ip4/0.0.0.0/tcp/60194/p2p-circuit/ipfs/${nodeA.peerInfo.id.toB58String()}`, - 'stream-channels': true - }) - .reply(200, { - Strings: [`connect ${nodeA.peerInfo.id.toB58String()} success`] - }, ['Content-Type', 'application/json']) // mock the refs call .post('/api/v0/refs') .query({ - recursive: true, + recursive: false, arg: cid.toBaseEncodedString(), 'stream-channels': true }) @@ -216,10 +207,11 @@ describe('.contentRouting', () => { it('should handle errors when registering as a provider', (done) => { const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') const mockApi = nock('http://0.0.0.0:60197') - // mock the swarm connect - .post('/api/v0/swarm/connect') + // mock the refs call + .post('/api/v0/refs') .query({ - arg: `/ip4/0.0.0.0/tcp/60194/p2p-circuit/ipfs/${nodeA.peerInfo.id.toB58String()}`, + recursive: false, + arg: cid.toBaseEncodedString(), 'stream-channels': true }) .reply(502, 'Bad Gateway', ['Content-Type', 'application/json'])