diff --git a/packages/compliance-tests/src/pubsub/connection-handlers.js b/packages/compliance-tests/src/pubsub/connection-handlers.js index c04489e76..644f537ad 100644 --- a/packages/compliance-tests/src/pubsub/connection-handlers.js +++ b/packages/compliance-tests/src/pubsub/connection-handlers.js @@ -242,7 +242,7 @@ module.exports = (common) => { await psB._libp2p.start() psB.start() - psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs) + await psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs) await psA._libp2p.dial(psB.peerId) // wait for remoteLibp2p to know about libp2p subscription diff --git a/packages/compliance-tests/src/topology/multicodec-topology.js b/packages/compliance-tests/src/topology/multicodec-topology.js index 1a87ac63b..ce48445ee 100644 --- a/packages/compliance-tests/src/topology/multicodec-topology.js +++ b/packages/compliance-tests/src/topology/multicodec-topology.js @@ -5,9 +5,8 @@ const { expect } = require('aegir/utils/chai') const sinon = require('sinon') - +const delay = require('delay') const PeerId = require('peer-id') - const peers = require('../utils/peers') module.exports = (test) => { @@ -48,7 +47,7 @@ module.exports = (test) => { const peerStore = topology._registrar.peerStore const id2 = await PeerId.createFromJSON(peers[1]) - peerStore.peers.set(id2.toB58String(), { + await peerStore.peers.set(id2.toB58String(), { id: id2, protocols: Array.from(topology.multicodecs) }) @@ -58,6 +57,9 @@ module.exports = (test) => { protocols: Array.from(topology.multicodecs) }) + // 'change:protocols' event handler is async + await delay(10) + expect(topology._updatePeers.callCount).to.equal(1) expect(topology.peers.size).to.eql(1) }) @@ -69,7 +71,7 @@ module.exports = (test) => { const peerStore = topology._registrar.peerStore const id2 = await PeerId.createFromJSON(peers[1]) - peerStore.peers.set(id2.toB58String(), { + await peerStore.peers.set(id2.toB58String(), { id: id2, protocols: Array.from(topology.multicodecs) }) @@ -79,9 +81,12 @@ module.exports = (test) => { protocols: Array.from(topology.multicodecs) }) + // 'change:protocols' event handler is async + await delay(10) + expect(topology.peers.size).to.eql(1) - peerStore.peers.set(id2.toB58String(), { + await peerStore.peers.set(id2.toB58String(), { id: id2, protocols: [] }) @@ -91,41 +96,53 @@ module.exports = (test) => { protocols: [] }) + // 'change:protocols' event handler is async + await delay(10) + expect(topology.peers.size).to.eql(1) expect(topology._onDisconnect.callCount).to.equal(1) expect(topology._onDisconnect.calledWith(id2)).to.equal(true) }) - it('should trigger "onConnect" when a peer connects and has one of the topology multicodecs in its known protocols', () => { + it('should trigger "onConnect" when a peer connects and has one of the topology multicodecs in its known protocols', async () => { sinon.spy(topology, '_onConnect') - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(topology.multicodecs) + sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves(topology.multicodecs) topology._registrar.connectionManager.emit('peer:connect', { remotePeer: id }) + // 'peer:connect' event handler is async + await delay(10) + expect(topology._onConnect.callCount).to.equal(1) }) - it('should not trigger "onConnect" when a peer connects and has none of the topology multicodecs in its known protocols', () => { + it('should not trigger "onConnect" when a peer connects and has none of the topology multicodecs in its known protocols', async () => { sinon.spy(topology, '_onConnect') - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns([]) + sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves([]) topology._registrar.connectionManager.emit('peer:connect', { remotePeer: id }) + // 'peer:connect' event handler is async + await delay(10) + expect(topology._onConnect.callCount).to.equal(0) }) - it('should not trigger "onConnect" when a peer connects and its protocols are not known', () => { + it('should not trigger "onConnect" when a peer connects and its protocols are not known', async () => { sinon.spy(topology, '_onConnect') - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(undefined) + sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves(undefined) topology._registrar.connectionManager.emit('peer:connect', { remotePeer: id }) + // 'peer:connect' event handler is async + await delay(10) + expect(topology._onConnect.callCount).to.equal(0) }) }) diff --git a/packages/compliance-tests/test/topology/mock-peer-store.js b/packages/compliance-tests/test/topology/mock-peer-store.js index 637d71fa8..d4b298b46 100644 --- a/packages/compliance-tests/test/topology/mock-peer-store.js +++ b/packages/compliance-tests/test/topology/mock-peer-store.js @@ -11,9 +11,17 @@ class MockPeerStore extends EventEmitter { } } - get (peerId) { + async * getPeers () { + yield * this.peers.values() + } + + async get (peerId) { return this.peers.get(peerId.toB58String()) } + + async set (peerId, peer) { + return this.peers.set(peerId.toB58String(), peer) + } } module.exports = MockPeerStore diff --git a/packages/compliance-tests/test/topology/multicodec-topology.spec.js b/packages/compliance-tests/test/topology/multicodec-topology.spec.js index 699782bec..e9f9dd818 100644 --- a/packages/compliance-tests/test/topology/multicodec-topology.spec.js +++ b/packages/compliance-tests/test/topology/multicodec-topology.spec.js @@ -9,7 +9,7 @@ const MockPeerStore = require('./mock-peer-store') describe('multicodec topology compliance tests', () => { tests({ - setup (properties, registrar) { + async setup (properties, registrar) { const multicodecs = ['/echo/1.0.0'] const handlers = { onConnect: () => { }, @@ -34,7 +34,7 @@ describe('multicodec topology compliance tests', () => { } } - topology.registrar = registrar + await topology.setRegistrar(registrar) return topology }, diff --git a/packages/interfaces/src/topology/index.js b/packages/interfaces/src/topology/index.js index 85e72db7d..7e1b33d2e 100644 --- a/packages/interfaces/src/topology/index.js +++ b/packages/interfaces/src/topology/index.js @@ -65,7 +65,7 @@ class Topology { /** * @param {any} registrar */ - set registrar (registrar) { // eslint-disable-line + async setRegistrar (registrar) { this._registrar = registrar } diff --git a/packages/interfaces/src/topology/multicodec-topology.js b/packages/interfaces/src/topology/multicodec-topology.js index 02e739ea7..07c910fe7 100644 --- a/packages/interfaces/src/topology/multicodec-topology.js +++ b/packages/interfaces/src/topology/multicodec-topology.js @@ -2,6 +2,11 @@ const Topology = require('./index') const multicodecTopologySymbol = Symbol.for('@libp2p/js-interfaces/topology/multicodec-topology') +const debug = require('debug') + +const log = Object.assign(debug('libp2p:topology:multicodec-topology'), { + error: debug('libp2p:topology:multicodec-topology:error') +}) class MulticodecTopology extends Topology { /** @@ -59,23 +64,22 @@ class MulticodecTopology extends Topology { /** * @param {any} registrar */ - set registrar (registrar) { // eslint-disable-line + async setRegistrar (registrar) { // eslint-disable-line this._registrar = registrar this._registrar.peerStore.on('change:protocols', this._onProtocolChange) this._registrar.connectionManager.on('peer:connect', this._onPeerConnect) // Update topology peers - this._updatePeers(this._registrar.peerStore.peers.values()) + await this._updatePeers(this._registrar.peerStore.getPeers()) } /** * Update topology. * - * @param {Array<{id: PeerId, multiaddrs: Array, protocols: Array}>} peerDataIterable - * @returns {void} + * @param {AsyncIterable | Iterable} peerDataIterable */ - _updatePeers (peerDataIterable) { - for (const { id, protocols } of peerDataIterable) { + async _updatePeers (peerDataIterable) { + for await (const { id, protocols } of peerDataIterable) { if (this.multicodecs.filter(multicodec => protocols.includes(multicodec)).length) { // Add the peer regardless of whether or not there is currently a connection this.peers.add(id.toB58String()) @@ -96,22 +100,26 @@ class MulticodecTopology extends Topology { * @param {PeerId} props.peerId * @param {Array} props.protocols */ - _onProtocolChange ({ peerId, protocols }) { - const hadPeer = this.peers.has(peerId.toB58String()) - const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) - - // Not supporting the protocol anymore? - if (hadPeer && hasProtocol.length === 0) { - this._onDisconnect(peerId) - } + async _onProtocolChange ({ peerId, protocols }) { + try { + const hadPeer = this.peers.has(peerId.toB58String()) + const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) + + // Not supporting the protocol anymore? + if (hadPeer && hasProtocol.length === 0) { + this._onDisconnect(peerId) + } - // New to protocol support - for (const protocol of protocols) { - if (this.multicodecs.includes(protocol)) { - const peerData = this._registrar.peerStore.get(peerId) - this._updatePeers([peerData]) - return + // New to protocol support + for (const protocol of protocols) { + if (this.multicodecs.includes(protocol)) { + const peerData = await this._registrar.peerStore.get(peerId) + await this._updatePeers([peerData]) + return + } } + } catch (err) { + log.error(err) } } @@ -119,20 +127,23 @@ class MulticodecTopology extends Topology { * Verify if a new connected peer has a topology multicodec and call _onConnect. * * @param {Connection} connection - * @returns {void} */ - _onPeerConnect (connection) { - // @ts-ignore - remotePeer does not existist on Connection - const peerId = connection.remotePeer - const protocols = this._registrar.peerStore.protoBook.get(peerId) + async _onPeerConnect (connection) { + try { + // @ts-ignore - remotePeer does not existist on Connection + const peerId = connection.remotePeer + const protocols = await this._registrar.peerStore.protoBook.get(peerId) - if (!protocols) { - return - } + if (!protocols) { + return + } - if (this.multicodecs.find(multicodec => protocols.includes(multicodec))) { - this.peers.add(peerId.toB58String()) - this._onConnect(peerId, connection) + if (this.multicodecs.find(multicodec => protocols.includes(multicodec))) { + this.peers.add(peerId.toB58String()) + this._onConnect(peerId, connection) + } + } catch (err) { + log.error(err) } } }