Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ module.exports = (common) => {
await psB._libp2p.start()
psB.start()

psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
await psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
await psA._libp2p.dial(psB.peerId)

// wait for remoteLibp2p to know about libp2p subscription
Expand Down
39 changes: 28 additions & 11 deletions packages/compliance-tests/src/topology/multicodec-topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const delay = require('delay')
const PeerId = require('peer-id')

const peers = require('../utils/peers')

module.exports = (test) => {
Expand Down Expand Up @@ -48,7 +47,7 @@ module.exports = (test) => {
const peerStore = topology._registrar.peerStore

const id2 = await PeerId.createFromJSON(peers[1])
peerStore.peers.set(id2.toB58String(), {
await peerStore.peers.set(id2.toB58String(), {
id: id2,
protocols: Array.from(topology.multicodecs)
})
Expand All @@ -58,6 +57,9 @@ module.exports = (test) => {
protocols: Array.from(topology.multicodecs)
})

// 'change:protocols' event handler is async
await delay(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not listen for the event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hander that invokes topology._updatePeers is async, so we can't just listen for the event as we're interested in the side-effects of the handler being invoked.

From a practical perspective the mock peer store used in the tests doesn't really do any async work, it just defers execution using the async keyword so delay here puts the rest of the test into the microtask queue - the handler is ahead of it in the same queue so this is enough for the test to pass.

Longer term we shouldn't test for side-effects like this as it's really fragile and makes refactoring hard.


expect(topology._updatePeers.callCount).to.equal(1)
expect(topology.peers.size).to.eql(1)
})
Expand All @@ -69,7 +71,7 @@ module.exports = (test) => {
const peerStore = topology._registrar.peerStore

const id2 = await PeerId.createFromJSON(peers[1])
peerStore.peers.set(id2.toB58String(), {
await peerStore.peers.set(id2.toB58String(), {
id: id2,
protocols: Array.from(topology.multicodecs)
})
Expand All @@ -79,9 +81,12 @@ module.exports = (test) => {
protocols: Array.from(topology.multicodecs)
})

// 'change:protocols' event handler is async
await delay(10)

expect(topology.peers.size).to.eql(1)

peerStore.peers.set(id2.toB58String(), {
await peerStore.peers.set(id2.toB58String(), {
id: id2,
protocols: []
})
Expand All @@ -91,41 +96,53 @@ module.exports = (test) => {
protocols: []
})

// 'change:protocols' event handler is async
await delay(10)

expect(topology.peers.size).to.eql(1)
expect(topology._onDisconnect.callCount).to.equal(1)
expect(topology._onDisconnect.calledWith(id2)).to.equal(true)
})

it('should trigger "onConnect" when a peer connects and has one of the topology multicodecs in its known protocols', () => {
it('should trigger "onConnect" when a peer connects and has one of the topology multicodecs in its known protocols', async () => {
sinon.spy(topology, '_onConnect')
sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(topology.multicodecs)
sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves(topology.multicodecs)

topology._registrar.connectionManager.emit('peer:connect', {
remotePeer: id
})

// 'peer:connect' event handler is async
await delay(10)

expect(topology._onConnect.callCount).to.equal(1)
})

it('should not trigger "onConnect" when a peer connects and has none of the topology multicodecs in its known protocols', () => {
it('should not trigger "onConnect" when a peer connects and has none of the topology multicodecs in its known protocols', async () => {
sinon.spy(topology, '_onConnect')
sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns([])
sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves([])

topology._registrar.connectionManager.emit('peer:connect', {
remotePeer: id
})

// 'peer:connect' event handler is async
await delay(10)

expect(topology._onConnect.callCount).to.equal(0)
})

it('should not trigger "onConnect" when a peer connects and its protocols are not known', () => {
it('should not trigger "onConnect" when a peer connects and its protocols are not known', async () => {
sinon.spy(topology, '_onConnect')
sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(undefined)
sinon.stub(topology._registrar.peerStore.protoBook, 'get').resolves(undefined)

topology._registrar.connectionManager.emit('peer:connect', {
remotePeer: id
})

// 'peer:connect' event handler is async
await delay(10)

expect(topology._onConnect.callCount).to.equal(0)
})
})
Expand Down
10 changes: 9 additions & 1 deletion packages/compliance-tests/test/topology/mock-peer-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ class MockPeerStore extends EventEmitter {
}
}

get (peerId) {
async * getPeers () {
yield * this.peers.values()
}

async get (peerId) {
return this.peers.get(peerId.toB58String())
}

async set (peerId, peer) {
return this.peers.set(peerId.toB58String(), peer)
}
}

module.exports = MockPeerStore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const MockPeerStore = require('./mock-peer-store')

describe('multicodec topology compliance tests', () => {
tests({
setup (properties, registrar) {
async setup (properties, registrar) {
const multicodecs = ['/echo/1.0.0']
const handlers = {
onConnect: () => { },
Expand All @@ -34,7 +34,7 @@ describe('multicodec topology compliance tests', () => {
}
}

topology.registrar = registrar
await topology.setRegistrar(registrar)

return topology
},
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/topology/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Topology {
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
async setRegistrar (registrar) {
this._registrar = registrar
}

Expand Down
73 changes: 42 additions & 31 deletions packages/interfaces/src/topology/multicodec-topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

const Topology = require('./index')
const multicodecTopologySymbol = Symbol.for('@libp2p/js-interfaces/topology/multicodec-topology')
const debug = require('debug')

const log = Object.assign(debug('libp2p:topology:multicodec-topology'), {
error: debug('libp2p:topology:multicodec-topology:error')
})

class MulticodecTopology extends Topology {
/**
Expand Down Expand Up @@ -59,23 +64,22 @@ class MulticodecTopology extends Topology {
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
async setRegistrar (registrar) { // eslint-disable-line
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
this._registrar.connectionManager.on('peer:connect', this._onPeerConnect)

// Update topology peers
this._updatePeers(this._registrar.peerStore.peers.values())
await this._updatePeers(this._registrar.peerStore.getPeers())
}

/**
* Update topology.
*
* @param {Array<{id: PeerId, multiaddrs: Array<Multiaddr>, protocols: Array<string>}>} peerDataIterable
* @returns {void}
* @param {AsyncIterable<any> | Iterable<any>} peerDataIterable
*/
_updatePeers (peerDataIterable) {
for (const { id, protocols } of peerDataIterable) {
async _updatePeers (peerDataIterable) {
for await (const { id, protocols } of peerDataIterable) {
if (this.multicodecs.filter(multicodec => protocols.includes(multicodec)).length) {
// Add the peer regardless of whether or not there is currently a connection
this.peers.add(id.toB58String())
Expand All @@ -96,43 +100,50 @@ class MulticodecTopology extends Topology {
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerId, protocols }) {
const hadPeer = this.peers.has(peerId.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))

// Not supporting the protocol anymore?
if (hadPeer && hasProtocol.length === 0) {
this._onDisconnect(peerId)
}
async _onProtocolChange ({ peerId, protocols }) {
try {
const hadPeer = this.peers.has(peerId.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))

// Not supporting the protocol anymore?
if (hadPeer && hasProtocol.length === 0) {
this._onDisconnect(peerId)
}

// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
const peerData = this._registrar.peerStore.get(peerId)
this._updatePeers([peerData])
return
// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
const peerData = await this._registrar.peerStore.get(peerId)
await this._updatePeers([peerData])
return
}
}
} catch (err) {
log.error(err)
}
}

/**
* Verify if a new connected peer has a topology multicodec and call _onConnect.
*
* @param {Connection} connection
* @returns {void}
*/
_onPeerConnect (connection) {
// @ts-ignore - remotePeer does not existist on Connection
const peerId = connection.remotePeer
const protocols = this._registrar.peerStore.protoBook.get(peerId)
async _onPeerConnect (connection) {
try {
// @ts-ignore - remotePeer does not existist on Connection
const peerId = connection.remotePeer
const protocols = await this._registrar.peerStore.protoBook.get(peerId)

if (!protocols) {
return
}
if (!protocols) {
return
}

if (this.multicodecs.find(multicodec => protocols.includes(multicodec))) {
this.peers.add(peerId.toB58String())
this._onConnect(peerId, connection)
if (this.multicodecs.find(multicodec => protocols.includes(multicodec))) {
this.peers.add(peerId.toB58String())
this._onConnect(peerId, connection)
}
} catch (err) {
log.error(err)
}
}
}
Expand Down