Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

Commit f84e744

Browse files
committed
chore: address review
1 parent c670d50 commit f84e744

File tree

7 files changed

+137
-111
lines changed

7 files changed

+137
-111
lines changed

LICENSE

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
MIT License
1+
The MIT License (MIT)
22

3-
Copyright (c) 2016 libp2p
3+
Copyright (c) 2019 Protocol Labs, Inc.
44

55
Permission is hereby granted, free of charge, to any person obtaining a copy
66
of this software and associated documentation files (the "Software"), to deal
@@ -9,13 +9,13 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
99
copies of the Software, and to permit persons to whom the Software is
1010
furnished to do so, subject to the following conditions:
1111

12-
The above copyright notice and this permission notice shall be included in all
13-
copies or substantial portions of the Software.
12+
The above copyright notice and this permission notice shall be included in
13+
all copies or substantial portions of the Software.
1414

1515
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1616
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1717
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1818
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1919
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20-
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21-
SOFTWARE.
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
THE SOFTWARE.

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,14 @@ js-libp2p-floodsub
3838
const FloodSub = require('libp2p-floodsub')
3939

4040
const registrar = {
41-
register: (multicodec, handlers) => {
41+
handle: (multicodecs, handle) => {
4242
// register multicodec to libp2p
43+
// handle function is called everytime a remote peer opens a stream to the peer.
44+
},
45+
register: (multicodecs, handlers) => {
4346
// handlers will be used to notify pubsub of peer connection establishment or closing
4447
},
45-
unregister: (multicodec) => {
48+
unregister: (id) => {
4649

4750
}
4851
}
@@ -99,4 +102,4 @@ This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/c
99102

100103
## License
101104

102-
MIT © David Dias
105+
Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@
5050
"dirty-chai": "^2.0.1",
5151
"it-pair": "^1.0.0",
5252
"lodash": "^4.17.15",
53-
"multiaddr": "^6.1.0",
53+
"multiaddr": "^7.1.0",
5454
"p-defer": "^3.0.0",
5555
"peer-id": "~0.13.3",
5656
"peer-info": "~0.17.0",
5757
"sinon": "^7.5.0"
5858
},
5959
"dependencies": {
60+
"async.nexttick": "^0.5.2",
6061
"debug": "^4.1.1",
6162
"it-length-prefixed": "^2.0.0",
6263
"it-pipe": "^1.0.1",

src/index.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const pipe = require('it-pipe')
1010
const lp = require('it-length-prefixed')
1111
const pMap = require('p-map')
1212
const TimeCache = require('time-cache')
13+
const nextTick = require('async.nexttick')
1314

1415
const PeerInfo = require('peer-info')
1516
const BaseProtocol = require('libp2p-pubsub')
@@ -27,6 +28,7 @@ class FloodSub extends BaseProtocol {
2728
/**
2829
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
2930
* @param {Object} registrar
31+
* @param {function} registrar.handle
3032
* @param {function} registrar.register
3133
* @param {function} registrar.unregister
3234
* @param {Object} [options]
@@ -37,6 +39,7 @@ class FloodSub extends BaseProtocol {
3739
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')
3840

3941
// registrar handling
42+
assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar')
4043
assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar')
4144
assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')
4245

@@ -77,9 +80,10 @@ class FloodSub extends BaseProtocol {
7780
* @override
7881
* @param {PeerInfo} peerInfo peer info
7982
* @param {Connection} conn connection to the peer
83+
* @returns {Promise<void>}
8084
*/
81-
_onPeerConnected (peerInfo, conn) {
82-
super._onPeerConnected(peerInfo, conn)
85+
async _onPeerConnected (peerInfo, conn) {
86+
await super._onPeerConnected(peerInfo, conn)
8387
const idB58Str = peerInfo.id.toB58String()
8488
const peer = this.peers.get(idB58Str)
8589

@@ -105,7 +109,7 @@ class FloodSub extends BaseProtocol {
105109
await pipe(
106110
conn,
107111
lp.decode(),
108-
async function collect (source) {
112+
async function (source) {
109113
for await (const data of source) {
110114
const rpc = Buffer.isBuffer(data) ? data : data.slice()
111115

@@ -209,7 +213,7 @@ class FloodSub extends BaseProtocol {
209213
/**
210214
* Unmounts the floodsub protocol and shuts down every connection
211215
* @override
212-
* @returns {Promise}
216+
* @returns {Promise<void>}
213217
*/
214218
async stop () {
215219
await super.stop()
@@ -222,7 +226,7 @@ class FloodSub extends BaseProtocol {
222226
* @override
223227
* @param {Array<string>|string} topics
224228
* @param {Array<any>|any} messages
225-
* @returns {Promise}
229+
* @returns {Promise<void>}
226230
*/
227231
async publish (topics, messages) {
228232
assert(this.started, 'FloodSub is not started')
@@ -267,10 +271,10 @@ class FloodSub extends BaseProtocol {
267271
assert(this.started, 'FloodSub is not started')
268272

269273
topics = ensureArray(topics)
270-
271274
topics.forEach((topic) => this.subscriptions.add(topic))
272275

273276
this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer))
277+
274278
// make sure that FloodSub is already mounted
275279
function sendSubscriptionsOnceReady (peer) {
276280
if (peer && peer.isWritable) {
@@ -303,6 +307,8 @@ class FloodSub extends BaseProtocol {
303307
function checkIfReady (peer) {
304308
if (peer && peer.isWritable) {
305309
peer.sendUnsubscriptions(topics)
310+
} else {
311+
nextTick(checkIfReady.bind(peer))
306312
}
307313
}
308314
}

test/2-nodes.spec.js

Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ const expect = chai.expect
99

1010
const pDefer = require('p-defer')
1111
const times = require('lodash/times')
12-
const DuplexPair = require('it-pair/duplex')
1312

1413
const FloodSub = require('../src')
1514
const { multicodec } = require('../src')
16-
const { first, createPeerInfo, expectSet } = require('./utils')
17-
18-
const defOptions = {
19-
emitSelf: true
20-
}
15+
const {
16+
defOptions,
17+
first,
18+
createPeerInfo,
19+
createMockRegistrar,
20+
expectSet,
21+
ConnectionPair
22+
} = require('./utils')
2123

2224
function shouldNotHappen (_) {
2325
expect.fail()
@@ -31,24 +33,15 @@ describe('basics between 2 nodes', () => {
3133
const registrarRecordA = {}
3234
const registrarRecordB = {}
3335

34-
const registrar = (registrarRecord) => ({
35-
register: (multicodecs, handlers) => {
36-
registrarRecord[multicodecs[0]] = handlers
37-
},
38-
unregister: (multicodecs) => {
39-
delete registrarRecord[multicodecs[0]]
40-
}
41-
})
42-
4336
// Mount pubsub protocol
4437
before(async () => {
4538
[peerInfoA, peerInfoB] = await Promise.all([
4639
createPeerInfo(),
4740
createPeerInfo()
4841
])
4942

50-
fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions)
51-
fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions)
43+
fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
44+
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)
5245

5346
expect(fsA.peers.size).to.be.eql(0)
5447
expect(fsA.subscriptions.size).to.eql(0)
@@ -63,14 +56,19 @@ describe('basics between 2 nodes', () => {
6356
]))
6457

6558
// Connect floodsub nodes
66-
before(() => {
59+
before(async () => {
6760
const onConnectA = registrarRecordA[multicodec].onConnect
68-
const onConnectB = registrarRecordB[multicodec].onConnect
61+
const handleB = registrarRecordB[multicodec].handler
6962

7063
// Notice peers of connection
71-
const [d0, d1] = DuplexPair()
72-
onConnectA(peerInfoB, d0)
73-
onConnectB(peerInfoA, d1)
64+
const [c0, c1] = ConnectionPair()
65+
await onConnectA(peerInfoB, c0)
66+
67+
await handleB({
68+
protocol: multicodec,
69+
stream: c1.stream,
70+
remotePeer: peerInfoA.id
71+
})
7472

7573
expect(fsA.peers.size).to.be.eql(1)
7674
expect(fsB.peers.size).to.be.eql(1)
@@ -236,24 +234,15 @@ describe('basics between 2 nodes', () => {
236234
const registrarRecordA = {}
237235
const registrarRecordB = {}
238236

239-
const registrar = (registrarRecord) => ({
240-
register: (multicodec, handlers) => {
241-
registrarRecord[multicodec] = handlers
242-
},
243-
unregister: (multicodec) => {
244-
delete registrarRecord[multicodec]
245-
}
246-
})
247-
248237
// Mount pubsub protocol
249238
before(async () => {
250239
[peerInfoA, peerInfoB] = await Promise.all([
251240
createPeerInfo(),
252241
createPeerInfo()
253242
])
254243

255-
fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions)
256-
fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions)
244+
fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
245+
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)
257246
})
258247

259248
// Start pubsub
@@ -281,19 +270,18 @@ describe('basics between 2 nodes', () => {
281270
})
282271

283272
it('existing subscriptions are sent upon peer connection', async () => {
273+
const dial = async () => {
274+
const onConnectA = registrarRecordA[multicodec].onConnect
275+
const onConnectB = registrarRecordB[multicodec].onConnect
276+
277+
// Notice peers of connection
278+
const [c0, c1] = ConnectionPair()
279+
await onConnectA(peerInfoB, c0)
280+
await onConnectB(peerInfoA, c1)
281+
}
282+
284283
await Promise.all([
285-
// nodeA.dial(nodeB.peerInfo),
286-
new Promise((resolve) => {
287-
const onConnectA = registrarRecordA[multicodec].onConnect
288-
const onConnectB = registrarRecordB[multicodec].onConnect
289-
290-
// Notice peers of connection
291-
const [d0, d1] = DuplexPair()
292-
onConnectA(peerInfoB, d0)
293-
onConnectB(peerInfoA, d1)
294-
295-
resolve()
296-
}),
284+
dial(),
297285
new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)),
298286
new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve))
299287
])

0 commit comments

Comments
 (0)