Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,9 @@ class IdentifyService {
*
* @async
* @param {Connection} connection
* @param {PeerID} expectedPeer The PeerId the identify response should match
* @returns {Promise<void>}
*/
async identify (connection, expectedPeer) {
async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe(
stream,
Expand Down Expand Up @@ -181,7 +180,7 @@ class IdentifyService {

const id = await PeerId.createFromPubKey(publicKey)
const peerInfo = new PeerInfo(id)
if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) {
if (connection.remotePeer.toString() !== id.toString()) {
throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER)
}

Expand All @@ -192,7 +191,7 @@ class IdentifyService {
IdentifyService.updatePeerAddresses(peerInfo, listenAddrs)
IdentifyService.updatePeerProtocols(peerInfo, protocols)

this.registrar.peerStore.update(peerInfo)
this.registrar.peerStore.replace(peerInfo)
// TODO: Track our observed address so that we can score it
log('received observed address of %s', observedAddr)
}
Expand Down Expand Up @@ -283,7 +282,7 @@ class IdentifyService {
IdentifyService.updatePeerProtocols(peerInfo, message.protocols)

// Update the peer in the PeerStore
this.registrar.peerStore.update(peerInfo)
this.registrar.peerStore.replace(peerInfo)
}
}

Expand Down
32 changes: 14 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ class Libp2p extends EventEmitter {
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
Expand Down Expand Up @@ -144,7 +142,7 @@ class Libp2p extends EventEmitter {
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)

this._peerDiscovered = this._peerDiscovered.bind(this)
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
}

/**
Expand Down Expand Up @@ -340,7 +338,7 @@ class Libp2p extends EventEmitter {

// TODO: this should be modified once random-walk is used as
// the other discovery modules
this._dht.on('peer', this._peerDiscovered)
this._dht.on('peer', this._onDiscoveryPeer)
}
}

Expand All @@ -351,6 +349,11 @@ class Libp2p extends EventEmitter {
_onDidStart () {
this._isStarted = true

this.peerStore.on('peer', peerInfo => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})

// Peer discovery
this._setupPeerDiscovery()

Expand All @@ -362,24 +365,17 @@ class Libp2p extends EventEmitter {
}

/**
* Handles discovered peers. Each discovered peer will be emitted via
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
* Called whenever peer discovery services emit `peer` events.
* Known peers may be emitted.
* @private
* @param {PeerInfo} peerInfo
*/
_peerDiscovered (peerInfo) {
if (peerInfo.id.toB58String() === this.peerInfo.id.toB58String()) {
_onDiscoveryPeer (peerInfo) {
if (peerInfo.id.toString() === this.peerInfo.id.toString()) {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerStore.put(peerInfo)

if (!this.isStarted()) return

this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
this.peerStore.put(peerInfo)
}

/**
Expand Down Expand Up @@ -432,7 +428,7 @@ class Libp2p extends EventEmitter {
discoveryService = DiscoveryService
}

discoveryService.on('peer', this._peerDiscovered)
discoveryService.on('peer', this._onDiscoveryPeer)
this._discovery.push(discoveryService)
}
}
Expand Down
29 changes: 14 additions & 15 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PeerStore extends EventEmitter {

let peer
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
if (this.has(peerInfo.id)) {
peer = this.update(peerInfo)
} else {
peer = this.add(peerInfo)
Expand Down Expand Up @@ -118,15 +118,12 @@ class PeerStore extends EventEmitter {

if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
multiaddrsIntersection.length !== recorded.multiaddrs.size) {
// recorded.multiaddrs = peerInfo.multiaddrs
recorded.multiaddrs.clear()

for (const ma of peerInfo.multiaddrs.toArray()) {
recorded.multiaddrs.add(ma)
}

this.emit('change:multiaddrs', {
peerInfo: peerInfo,
peerInfo: recorded,
multiaddrs: recorded.multiaddrs.toArray()
})
}
Expand All @@ -139,14 +136,12 @@ class PeerStore extends EventEmitter {

if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) {
recorded.protocols.clear()

for (const protocol of peerInfo.protocols) {
recorded.protocols.add(protocol)
}

this.emit('change:protocols', {
peerInfo: peerInfo,
peerInfo: recorded,
protocols: Array.from(recorded.protocols)
})
}
Expand All @@ -170,13 +165,7 @@ class PeerStore extends EventEmitter {
peerId = peerId.toB58String()
}

const peerInfo = this.peers.get(peerId)

if (peerInfo) {
return peerInfo
}

return undefined
return this.peers.get(peerId)
}

/**
Expand Down Expand Up @@ -217,6 +206,16 @@ class PeerStore extends EventEmitter {

this.remove(peerInfo.id.toB58String())
this.add(peerInfo)

// This should be cleaned up in PeerStore v2
this.emit('change:multiaddrs', {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the multiaddrs may not change, right?

peerInfo,
multiaddrs: peerInfo.multiaddrs.toArray()
})
this.emit('change:protocols', {
peerInfo,
protocols: Array.from(peerInfo.protocols)
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions test/dialing/direct.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ describe('Dialing (direct, WebSockets)', () => {
})

sinon.spy(libp2p.dialer.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'update')
sinon.spy(libp2p.peerStore, 'replace')
sinon.spy(libp2p.upgrader, 'onConnection')

const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
expect(connection).to.exist()
Expand All @@ -225,7 +226,7 @@ describe('Dialing (direct, WebSockets)', () => {
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
await libp2p.dialer.identifyService.identify.firstCall.returnValue

expect(libp2p.peerStore.update.callCount).to.equal(1)
expect(libp2p.peerStore.replace.callCount).to.equal(1)
})

it('should be able to use hangup to close connections', async () => {
Expand Down
26 changes: 13 additions & 13 deletions test/identify/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('Identify', () => {
protocols,
registrar: {
peerStore: {
update: () => {}
replace: () => {}
}
}
})
Expand All @@ -57,27 +57,27 @@ describe('Identify', () => {
})

const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
const localConnectionMock = { newStream: () => {} }
const localConnectionMock = { newStream: () => {}, remotePeer: remotePeer.id }
const remoteConnectionMock = { remoteAddr: observedAddr }

const [local, remote] = duplexPair()
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY })

sinon.spy(localIdentify.registrar.peerStore, 'update')
sinon.spy(localIdentify.registrar.peerStore, 'replace')

// Run identify
await Promise.all([
localIdentify.identify(localConnectionMock, remotePeer.id),
localIdentify.identify(localConnectionMock),
remoteIdentify.handleMessage({
connection: remoteConnectionMock,
stream: remote,
protocol: multicodecs.IDENTIFY
})
])

expect(localIdentify.registrar.peerStore.update.callCount).to.equal(1)
expect(localIdentify.registrar.peerStore.replace.callCount).to.equal(1)
// Validate the remote peer gets updated in the peer store
const call = localIdentify.registrar.peerStore.update.firstCall
const call = localIdentify.registrar.peerStore.replace.firstCall
expect(call.args[0].id.bytes).to.equal(remotePeer.id.bytes)
})

Expand All @@ -92,7 +92,7 @@ describe('Identify', () => {
})

const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
const localConnectionMock = { newStream: () => {} }
const localConnectionMock = { newStream: () => {}, remotePeer }
const remoteConnectionMock = { remoteAddr: observedAddr }

const [local, remote] = duplexPair()
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('Identify', () => {
peerInfo: remotePeer,
registrar: {
peerStore: {
update: () => {}
replace: () => {}
}
}
})
Expand All @@ -148,7 +148,7 @@ describe('Identify', () => {

sinon.spy(IdentifyService, 'updatePeerAddresses')
sinon.spy(IdentifyService, 'updatePeerProtocols')
sinon.spy(remoteIdentify.registrar.peerStore, 'update')
sinon.spy(remoteIdentify.registrar.peerStore, 'replace')

// Run identify
await Promise.all([
Expand All @@ -163,8 +163,8 @@ describe('Identify', () => {
expect(IdentifyService.updatePeerAddresses.callCount).to.equal(1)
expect(IdentifyService.updatePeerProtocols.callCount).to.equal(1)

expect(remoteIdentify.registrar.peerStore.update.callCount).to.equal(1)
const [peerInfo] = remoteIdentify.registrar.peerStore.update.firstCall.args
expect(remoteIdentify.registrar.peerStore.replace.callCount).to.equal(1)
const [peerInfo] = remoteIdentify.registrar.peerStore.replace.firstCall.args
expect(peerInfo.id.bytes).to.eql(localPeer.id.bytes)
expect(peerInfo.multiaddrs.toArray()).to.eql([listeningAddr])
expect(peerInfo.protocols).to.eql(localProtocols)
Expand Down Expand Up @@ -198,7 +198,7 @@ describe('Identify', () => {
})

sinon.spy(libp2p.dialer.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'update')
sinon.spy(libp2p.peerStore, 'replace')

const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
expect(connection).to.exist()
Expand All @@ -207,7 +207,7 @@ describe('Identify', () => {
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
await libp2p.dialer.identifyService.identify.firstCall.returnValue

expect(libp2p.peerStore.update.callCount).to.equal(1)
expect(libp2p.peerStore.replace.callCount).to.equal(1)
await connection.close()
})

Expand Down
32 changes: 22 additions & 10 deletions test/peer-discovery/index.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ describe('peer discovery scenarios', () => {
},
config: {
peerDiscovery: {
autoDial: false,
bootstrap: {
enabled: true,
list: bootstrappers
Expand Down Expand Up @@ -84,6 +85,7 @@ describe('peer discovery scenarios', () => {
},
config: {
peerDiscovery: {
autoDial: false,
mdns: {
enabled: true,
interval: 200, // discover quickly
Expand Down Expand Up @@ -111,9 +113,11 @@ describe('peer discovery scenarios', () => {
}
})

await remoteLibp2p1.start()
await remoteLibp2p2.start()
await libp2p.start()
await Promise.all([
remoteLibp2p1.start(),
remoteLibp2p2.start(),
libp2p.start()
])

await deferred.promise

Expand All @@ -130,11 +134,14 @@ describe('peer discovery scenarios', () => {
dht: KadDht
},
config: {
peerDiscovery: {
autoDial: false
},
dht: {
randomWalk: {
enabled: true,
delay: 100,
interval: 200, // start the query sooner
delay: 100, // start the first query quickly
interval: 1000,
timeout: 3000
},
enabled: true
Expand All @@ -153,9 +160,10 @@ describe('peer discovery scenarios', () => {
}
})

await remoteLibp2p1.start()
await remoteLibp2p2.start()
await libp2p.start()
await Promise.all([
remoteLibp2p1.start(),
remoteLibp2p2.start()
])

// Topology:
// A -> B
Expand All @@ -165,8 +173,12 @@ describe('peer discovery scenarios', () => {
remoteLibp2p2.dial(remotePeerInfo1)
])

libp2p.start()

await deferred.promise
await remoteLibp2p1.stop()
await remoteLibp2p2.stop()
return Promise.all([
remoteLibp2p1.stop(),
remoteLibp2p2.stop()
])
})
})
Loading