From a893de338937e2209a637647db6aa52078102f1f Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 15 May 2019 13:39:42 +0200 Subject: [PATCH 1/2] feat(identify): update identify service to include latest push protocol --- package.json | 2 +- src/connection/index.js | 27 +++++++++--------- src/connection/manager.js | 59 ++++++++++++++++++++------------------- src/dialer/index.js | 21 ++++++++++++++ src/index.js | 6 ++++ src/protocol-muxer.js | 2 +- src/utils.js | 23 ++++++++------- test/dial-fsm.node.js | 4 +-- test/identify.node.js | 8 +++--- 9 files changed, 93 insertions(+), 59 deletions(-) diff --git a/package.json b/package.json index bc330b0..f9082e8 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "hashlru": "^2.3.0", "interface-connection": "~0.3.3", "libp2p-circuit": "~0.3.6", - "libp2p-identify": "~0.7.6", + "libp2p-identify": "libp2p/js-libp2p-identify#feat/identify-push", "moving-average": "^1.0.0", "multiaddr": "^6.0.6", "multistream-select": "~0.14.4", diff --git a/src/connection/index.js b/src/connection/index.js index a2df251..cf81349 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -7,9 +7,9 @@ const withIs = require('class-is') const BaseConnection = require('./base') const parallel = require('async/parallel') const nextTick = require('async/nextTick') -const identify = require('libp2p-identify') const errCode = require('err-code') -const { msHandle, msSelect, identifyDialer } = require('../utils') +const IdentifyService = require('libp2p-identify') +const { newStream } = require('../utils') const observeConnection = require('../observe-connection') const { @@ -391,8 +391,10 @@ class ConnectionFSM extends BaseConnection { if (this.switch.identify) { this._identify((err, results) => { if (err) { + this.log('identify errored, closing the connection', err) return this.close(err) } + this.log('identify successful') this.theirPeerInfo = this.switch._peerBook.put(results.peerInfo) }) } @@ -413,18 +415,17 @@ class ConnectionFSM extends BaseConnection { if (!this.muxer) { return nextTick(callback, errCode('The connection was already closed', 'ERR_CONNECTION_CLOSED')) } - this.muxer.newStream(async (err, conn) => { + + const stream = this.muxer.newStream() + newStream(stream, IdentifyService.multicodecs.identify, (err, conn) => { if (err) return callback(err) - const ms = new multistream.Dialer() - let results - try { - await msHandle(ms, conn) - const msConn = await msSelect(ms, identify.multicodec) - results = await identifyDialer(msConn, this.theirPeerInfo) - } catch (err) { - return callback(err) - } - callback(null, results) + this.switch.identifyService.identify(conn, this.theirPeerInfo, (err, peerInfo, observedAddr) => { + if (err) return callback(err) + callback(null, { + peerInfo, + observedAddr + }) + }) }) } diff --git a/src/connection/manager.js b/src/connection/manager.js index 38bb4e5..b751639 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -1,12 +1,12 @@ 'use strict' -const identify = require('libp2p-identify') +const IdentifyService = require('libp2p-identify') const multistream = require('multistream-select') const debug = require('debug') const log = debug('libp2p:switch:conn-manager') const once = require('once') const ConnectionFSM = require('../connection') -const { msHandle, msSelect, identifyDialer } = require('../utils') +const { newStream } = require('../utils') const Circuit = require('libp2p-circuit') @@ -155,7 +155,7 @@ class ConnectionManager { // 1. overload getPeerInfo // 2. call getPeerInfo // 3. add this conn to the pool - if (this.switch.identify) { + if (this.switch.identifyService) { // Get the peer info from the crypto exchange conn.getPeerInfo((err, cryptoPI) => { if (err || !cryptoPI) { @@ -163,35 +163,31 @@ class ConnectionManager { } // overload peerInfo to use Identify instead - conn.getPeerInfo = async (callback) => { - const conn = muxedConn.newStream() - const ms = new multistream.Dialer() - callback = once(callback) - - let results - try { - await msHandle(ms, conn) - const msConn = await msSelect(ms, identify.multicodec) - results = await identifyDialer(msConn, cryptoPI) - } catch (err) { - return muxedConn.end(() => { - callback(err, null) - }) - } + conn.getPeerInfo = (callback) => { + log('running identify') + newStream(muxedConn.newStream(), IdentifyService.multicodecs.identify, (err, stream) => { + if (err) { + return muxedConn.end(() => callback(err)) + } - const { peerInfo } = results + this.switch.identifyService.identify(stream, cryptoPI, (err, peerInfo, _observedAddr) => { + if (err) { + return muxedConn.end(() => callback(err)) + } - if (peerInfo) { - conn.setPeerInfo(peerInfo) - } - callback(null, peerInfo) + if (peerInfo) { + conn.setPeerInfo(peerInfo) + } + callback(null, peerInfo) + }) + }) } conn.getPeerInfo((err, peerInfo) => { - /* eslint no-warning-comments: off */ if (err) { return log('identify not successful') } + const b58Str = peerInfo.id.toB58String() peerInfo = this.switch._peerBook.put(peerInfo) @@ -273,10 +269,17 @@ class ConnectionManager { * @returns {void} */ reuse () { - this.switch.identify = true - this.switch.handle(identify.multicodec, (protocol, conn) => { - identify.listener(conn, this.switch._peerInfo) - }) + if (!this.switch.identify) { + this.switch.identify = true + this.switch.identifyService = new IdentifyService({ switch: this.switch }) + + // Setup all handlers for identify + Object.values(IdentifyService.multicodecs).forEach(protocol => { + this.switch.handle(protocol, (protocol, connection) => { + this.switch.identifyService.handleMessage(protocol, connection) + }) + }) + } } } diff --git a/src/dialer/index.js b/src/dialer/index.js index 8ee1ace..6f11102 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -106,11 +106,32 @@ module.exports = function (_switch) { _dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback }) } + /** + * Creates a new stream to the given `peerInfo`. If a muxed connection + * does not exist to the peer, an error will be passed to the `callback`. + * @param {PeerInfo} peerInfo + * @param {String} protocol The protocol to handshake for the new stream + * @param {function(Error, Connection)} callback + */ + function newStream (peerInfo, protocol, callback) { + if (!protocol) { + return callback(new Error('a protocol must be provided to create a new stream')) + } + + const connection = _switch.connection.getOne(peerInfo.id.toB58String()) + if (!connection) { + return callback(new Error('no muxed connection to create stream from')) + } + + connection.shake(protocol, callback) + } + return { connect, dial, dialFSM, clearBlacklist, + newStream, BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts, BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL, MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls, diff --git a/src/index.js b/src/index.js index f463d4d..93c485d 100644 --- a/src/index.js +++ b/src/index.js @@ -54,6 +54,8 @@ class Switch extends EventEmitter { // is the Identify protocol enabled? this.identify = false + this.identifyService = null + // Crypto details this.crypto = plaintext @@ -115,6 +117,10 @@ class Switch extends EventEmitter { this.dialFSM = this.dialer.dialFSM } + get peerInfo () { + return this._peerInfo + } + /** * Returns a list of the transports peerInfo has addresses for * diff --git a/src/protocol-muxer.js b/src/protocol-muxer.js index 90c9769..a383bcc 100644 --- a/src/protocol-muxer.js +++ b/src/protocol-muxer.js @@ -31,7 +31,7 @@ module.exports = function protocolMuxer (protocols, observer) { const handlerFunc = protocol && protocol.handlerFunc if (handlerFunc) { const conn = observeConn(null, protocolName, _conn, observer) - handlerFunc(protocol, conn) + handlerFunc(protocolName, conn) } } } diff --git a/src/utils.js b/src/utils.js index 55e0494..8765185 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,6 +1,7 @@ 'use strict' const Identify = require('libp2p-identify') +const Multistream = require('multistream-select') /** * For a given multistream, registers to handle the given connection @@ -33,17 +34,19 @@ module.exports.msSelect = (multistream, protocol) => { } /** - * Runs identify for the given connection and verifies it against the - * PeerInfo provided - * @param {Connection} connection - * @param {PeerInfo} cryptoPeerInfo The PeerInfo determined during crypto exchange - * @returns {Promise} Resolves {peerInfo, observedAddrs} + * Takes a stream and handshakes on the given `protocol`. + * The stream for that protocol will be returned in the `callback`. + * @param {Connection} connection A connection to create a sub stream on + * @param {string} protocol The protocol to communicate on + * @param {function(Error, Stream)} callback */ -module.exports.identifyDialer = (connection, cryptoPeerInfo) => { - return new Promise((resolve, reject) => { - Identify.dialer(connection, cryptoPeerInfo, (err, peerInfo, observedAddrs) => { - if (err) return reject(err) - resolve({ peerInfo, observedAddrs }) +module.exports.newStream = (connection, protocol, callback) => { + const ms = new Multistream.Dialer() + ms.handle(connection, (err) => { + if (err) return callback(err) + ms.select(protocol, (err, stream) => { + if (err) return callback(err) + callback(null, stream) }) }) } diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index e7ddd65..4e16330 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -186,13 +186,13 @@ describe('dialFSM', () => { // Verify the dialer knows the receiver's protocols expect(Array.from(peerB.protocols)).to.eql([ multiplex.multicodec, - identify.multicodec, + ...Object.values(identify.multicodecs), protocol ]).mark() // Verify the receiver knows the dialer's protocols expect(Array.from(peerA.protocols)).to.eql([ multiplex.multicodec, - identify.multicodec + ...Object.values(identify.multicodecs) ]).mark() switchA.hangUp(switchB._peerInfo) diff --git a/test/identify.node.js b/test/identify.node.js index dc85d3e..2ac9a9a 100644 --- a/test/identify.node.js +++ b/test/identify.node.js @@ -122,11 +122,11 @@ describe('Identify', () => { const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String()) expect(Array.from(peerB.protocols)).to.eql([ multiplex.multicodec, - identify.multicodec + ...Object.values(identify.multicodecs) ]) expect(Array.from(peerA.protocols)).to.eql([ multiplex.multicodec, - identify.multicodec, + ...Object.values(identify.multicodecs), '/id-test/1.0.0' ]) @@ -136,7 +136,7 @@ describe('Identify', () => { }) it('should close connection when identify fails', (done) => { - const stub = sinon.stub(identify, 'listener').callsFake((conn) => { + const stub = sinon.stub(switchA.identifyService, 'handleMessage').callsFake((_protocol, conn) => { conn.getObservedAddrs((err, observedAddrs) => { if (err) { return } observedAddrs = observedAddrs[0] @@ -144,7 +144,7 @@ describe('Identify', () => { // pretend to be another peer let publicKey = switchC._peerInfo.id.pubKey.bytes - const msgSend = identify.message.encode({ + const msgSend = identify.Message.encode({ protocolVersion: 'ipfs/0.1.0', agentVersion: 'na', publicKey: publicKey, From b080aedaeaae7ed130df4509adfc8f2c1c751c1c Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 15 May 2019 13:42:41 +0200 Subject: [PATCH 2/2] chore: fix linting --- src/connection/manager.js | 3 +-- src/dialer/index.js | 1 + src/utils.js | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/connection/manager.js b/src/connection/manager.js index b751639..d3e8bed 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -1,10 +1,9 @@ +/* eslint max-nested-callbacks: ["error", 5] */ 'use strict' const IdentifyService = require('libp2p-identify') -const multistream = require('multistream-select') const debug = require('debug') const log = debug('libp2p:switch:conn-manager') -const once = require('once') const ConnectionFSM = require('../connection') const { newStream } = require('../utils') diff --git a/src/dialer/index.js b/src/dialer/index.js index 6f11102..0edddef 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -112,6 +112,7 @@ module.exports = function (_switch) { * @param {PeerInfo} peerInfo * @param {String} protocol The protocol to handshake for the new stream * @param {function(Error, Connection)} callback + * @returns {void} */ function newStream (peerInfo, protocol, callback) { if (!protocol) { diff --git a/src/utils.js b/src/utils.js index 8765185..6fd8eb9 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,6 +1,5 @@ 'use strict' -const Identify = require('libp2p-identify') const Multistream = require('multistream-select') /**