@@ -11,8 +11,7 @@ const config = require('./config')
11
11
const multicodec = config . multicodec
12
12
const ensureArray = utils . ensureArray
13
13
const setImmediate = require ( 'async/setImmediate' )
14
- const asyncMap = require ( 'async/map' )
15
- const noop = ( ) => { }
14
+ const pMap = require ( 'p-map' )
16
15
17
16
/**
18
17
* FloodSub (aka dumbsub is an implementation of pubsub focused on
@@ -49,19 +48,16 @@ class FloodSub extends BaseProtocol {
49
48
* @override
50
49
* @param {PeerInfo } peerInfo peer info
51
50
* @param {Connection } conn connection to the peer
52
- * @param {function } callback
53
51
*/
54
- _onDial ( peerInfo , conn , callback ) {
55
- super . _onDial ( peerInfo , conn , ( err ) => {
56
- if ( err ) return callback ( err )
57
- const idB58Str = peerInfo . id . toB58String ( )
58
- const peer = this . peers . get ( idB58Str )
59
- if ( peer && peer . isWritable ) {
60
- // Immediately send my own subscriptions to the newly established conn
61
- peer . sendSubscriptions ( this . subscriptions )
62
- }
63
- setImmediate ( ( ) => callback ( ) )
64
- } )
52
+ _onDial ( peerInfo , conn ) {
53
+ super . _onDial ( peerInfo , conn )
54
+ const idB58Str = peerInfo . id . toB58String ( )
55
+ const peer = this . peers . get ( idB58Str )
56
+
57
+ if ( peer && peer . isWritable ) {
58
+ // Immediately send my own subscriptions to the newly established conn
59
+ peer . sendSubscriptions ( this . subscriptions )
60
+ }
65
61
}
66
62
67
63
/**
@@ -71,7 +67,7 @@ class FloodSub extends BaseProtocol {
71
67
* @param {string } idB58Str peer id string in base58
72
68
* @param {Connection } conn connection
73
69
* @param {PeerInfo } peer peer info
74
- * @returns {undefined }
70
+ * @returns {void }
75
71
*
76
72
*/
77
73
_processConnection ( idB58Str , conn , peer ) {
@@ -119,7 +115,7 @@ class FloodSub extends BaseProtocol {
119
115
* @param {rpc.RPC.Message } message The message to process
120
116
* @returns {void }
121
117
*/
122
- _processRpcMessage ( message ) {
118
+ async _processRpcMessage ( message ) {
123
119
const msg = utils . normalizeInRpcMessage ( message )
124
120
const seqno = utils . msgId ( msg . from , msg . seqno )
125
121
// 1. check if I've seen the message, if yes, ignore
@@ -128,19 +124,27 @@ class FloodSub extends BaseProtocol {
128
124
}
129
125
130
126
this . seenCache . put ( seqno )
127
+
131
128
// 2. validate the message (signature verification)
132
- this . validate ( message , ( err , isValid ) => {
133
- if ( err || ! isValid ) {
134
- this . log ( 'Message could not be validated, dropping it. isValid=%s' , isValid , err )
135
- return
136
- }
129
+ let isValid
130
+ let error
137
131
138
- // 3. if message is valid, emit to self
139
- this . _emitMessages ( msg . topicIDs , [ msg ] )
132
+ try {
133
+ isValid = await this . validate ( message )
134
+ } catch ( err ) {
135
+ error = err
136
+ }
140
137
141
- // 4. if message is valid, propagate msg to others
142
- this . _forwardMessages ( msg . topicIDs , [ msg ] )
143
- } )
138
+ if ( error || ! isValid ) {
139
+ this . log ( 'Message could not be validated, dropping it. isValid=%s' , isValid , error )
140
+ return
141
+ }
142
+
143
+ // 3. if message is valid, emit to self
144
+ this . _emitMessages ( msg . topicIDs , [ msg ] )
145
+
146
+ // 4. if message is valid, propagate msg to others
147
+ this . _forwardMessages ( msg . topicIDs , [ msg ] )
144
148
}
145
149
146
150
_emitMessages ( topics , messages ) {
@@ -170,30 +174,25 @@ class FloodSub extends BaseProtocol {
170
174
/**
171
175
* Unmounts the floodsub protocol and shuts down every connection
172
176
* @override
173
- * @param {Function } callback
174
- * @returns {undefined }
177
+ * @returns {void }
175
178
*
176
179
*/
177
- stop ( callback ) {
178
- super . stop ( ( err ) => {
179
- if ( err ) return callback ( err )
180
- this . subscriptions = new Set ( )
181
- callback ( )
182
- } )
180
+ stop ( ) {
181
+ super . stop ( )
182
+
183
+ this . subscriptions = new Set ( )
183
184
}
184
185
185
186
/**
186
187
* Publish messages to the given topics.
187
188
* @override
188
189
* @param {Array<string>|string } topics
189
190
* @param {Array<any>|any } messages
190
- * @param {function(Error) } callback
191
- * @returns {undefined }
191
+ * @returns {Promise }
192
192
*
193
193
*/
194
- publish ( topics , messages , callback ) {
194
+ async publish ( topics , messages ) {
195
195
assert ( this . started , 'FloodSub is not started' )
196
- callback = callback || noop
197
196
198
197
this . log ( 'publish' , topics , messages )
199
198
@@ -202,7 +201,7 @@ class FloodSub extends BaseProtocol {
202
201
203
202
const from = this . libp2p . peerInfo . id . toB58String ( )
204
203
205
- const buildMessage = ( msg , cb ) => {
204
+ const buildMessage = ( msg ) => {
206
205
const seqno = utils . randomSeqno ( )
207
206
this . seenCache . put ( utils . msgId ( from , seqno ) )
208
207
@@ -216,24 +215,20 @@ class FloodSub extends BaseProtocol {
216
215
// Emit to self if I'm interested and it is enabled
217
216
this . _options . emitSelf && this . _emitMessages ( topics , [ message ] )
218
217
219
- this . _buildMessage ( message , cb )
218
+ return this . _buildMessage ( message )
220
219
}
221
220
222
- asyncMap ( messages , buildMessage , ( err , msgObjects ) => {
223
- if ( err ) return callback ( err )
224
-
225
- // send to all the other peers
226
- this . _forwardMessages ( topics , msgObjects )
221
+ const msgObjects = await pMap ( messages , buildMessage )
227
222
228
- callback ( null )
229
- } )
223
+ // send to all the other peers
224
+ this . _forwardMessages ( topics , msgObjects )
230
225
}
231
226
232
227
/**
233
228
* Subscribe to the given topic(s).
234
229
* @override
235
230
* @param {Array<string>|string } topics
236
- * @returns {undefined }
231
+ * @returns {void }
237
232
*/
238
233
subscribe ( topics ) {
239
234
assert ( this . started , 'FloodSub is not started' )
@@ -261,7 +256,7 @@ class FloodSub extends BaseProtocol {
261
256
* Unsubscribe from the given topic(s).
262
257
* @override
263
258
* @param {Array<string>|string } topics
264
- * @returns {undefined }
259
+ * @returns {void }
265
260
*/
266
261
unsubscribe ( topics ) {
267
262
// Avoid race conditions, by quietly ignoring unsub when shutdown.
0 commit comments