Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 7177375

Browse files
committed
feat: reprovider
1 parent 103e359 commit 7177375

File tree

9 files changed

+360
-15
lines changed

9 files changed

+360
-15
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
"hapi-pino": "^6.0.0",
9090
"human-to-milliseconds": "^1.0.0",
9191
"interface-datastore": "~0.6.0",
92-
"ipfs-bitswap": "~0.24.1",
92+
"ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider",
9393
"ipfs-block": "~0.8.1",
9494
"ipfs-block-service": "~0.15.1",
9595
"ipfs-http-client": "^32.0.0",
Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
'use strict'
22

3-
const CID = require('cids')
4-
const base32 = require('base32.js')
53
const pull = require('pull-stream')
64
const pullDefer = require('pull-defer')
5+
const { blockKeyToCid } = require('../../utils')
76

87
module.exports = function (self) {
98
return () => {
@@ -14,21 +13,12 @@ module.exports = function (self) {
1413
return deferred.resolve(pull.error(err))
1514
}
1615

17-
const refs = blocks.map(b => dsKeyToRef(b.key))
16+
const refs = blocks.map(b => ({
17+
ref: blockKeyToCid(b.key).toString()
18+
}))
1819
deferred.resolve(pull.values(refs))
1920
})
2021

2122
return deferred
2223
}
2324
}
24-
25-
function dsKeyToRef (key) {
26-
try {
27-
// Block key is of the form /<base32 encoded string>
28-
const decoder = new base32.Decoder()
29-
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
30-
return { ref: new CID(buff).toString() }
31-
} catch (err) {
32-
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
33-
}
34-
}

src/core/components/start.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
'use strict'
22

3+
const get = require('dlv')
34
const series = require('async/series')
45
const Bitswap = require('ipfs-bitswap')
56
const setImmediate = require('async/setImmediate')
67
const promisify = require('promisify-es6')
78

89
const IPNS = require('../ipns')
10+
const Provider = require('../provider')
911
const routingConfig = require('../ipns/routing/config')
1012
const createLibp2pBundle = require('./libp2p')
1113

@@ -45,6 +47,8 @@ module.exports = (self) => {
4547
libp2p.start(err => {
4648
if (err) return cb(err)
4749
self.libp2p = libp2p
50+
51+
self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider'))
4852
cb()
4953
})
5054
})
@@ -56,9 +60,15 @@ module.exports = (self) => {
5660
self._bitswap = new Bitswap(
5761
self.libp2p,
5862
self._repo.blocks,
63+
self._provider,
5964
{ statsEnabled: true }
6065
)
6166

67+
if (!get(self._options, 'offline') &&
68+
(get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) {
69+
self._provider.start()
70+
}
71+
6272
self._bitswap.start()
6373
self._blockService.setExchange(self._bitswap)
6474

src/core/provider/index.js

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
'use strict'
2+
3+
const errCode = require('err-code')
4+
const human = require('human-to-milliseconds')
5+
const promisify = require('promisify-es6')
6+
7+
const CID = require('cids')
8+
9+
const Reprovider = require('./reprovider')
10+
11+
class Provider {
12+
/**
13+
* Provider goal is to announce blocks to the network.
14+
* It keeps track of which blocks are provided, and allow them to be reprovided
15+
* @param {Libp2p} libp2p
16+
* @param {Blockstore} blockstore
17+
* @param {object} options
18+
* @memberof Provider
19+
*/
20+
constructor (libp2p, blockstore, options) {
21+
// the CIDs for which provide announcements should be made
22+
// this.tracker = [] // TODO use a queue
23+
this._running = false
24+
25+
this._contentRouting = libp2p.contentRouting
26+
this._blockstore = blockstore
27+
this._options = options
28+
this.reprovider = undefined
29+
}
30+
31+
/**
32+
* Begin processing the provider work
33+
* @returns {void}
34+
*/
35+
async start () {
36+
// do not run twice
37+
if (this._running) {
38+
return
39+
}
40+
41+
this._running = true
42+
43+
// handle options
44+
const strategy = this._options.strategy || 'all'
45+
const humanInterval = this._options.Interval || '12h'
46+
const interval = await promisify((callback) => human(humanInterval, callback))()
47+
const options = {
48+
interval,
49+
strategy
50+
}
51+
52+
this.reprovider = new Reprovider(this._contentRouting, this._blockstore, options)
53+
54+
// Start reprovider
55+
this.reprovider.start()
56+
}
57+
58+
/**
59+
* Stop the provider
60+
* @returns {void}
61+
*/
62+
stop () {
63+
this._running = true
64+
65+
// stop the reprovider
66+
this.reprovider.stop()
67+
}
68+
69+
/**
70+
* Announce block to the network and add and entry to the tracker
71+
* Takes a cid and makes an attempt to announce it to the network
72+
* @param {CID} cid
73+
*/
74+
async provide (cid) {
75+
if (!CID.isCID(cid)) {
76+
throw errCode('invalid CID to provide', 'ERR_INVALID_CID')
77+
}
78+
79+
await promisify((callback) => {
80+
this._contentRouting.provide(cid, callback)
81+
})()
82+
}
83+
84+
async findProviders (cid, options) { // eslint-disable-line require-await
85+
if (!CID.isCID(cid)) {
86+
throw errCode('invalid CID to find', 'ERR_INVALID_CID')
87+
}
88+
89+
return promisify((callback) => {
90+
this._contentRouting.findProviders(cid, options, callback)
91+
})()
92+
}
93+
}
94+
95+
exports = module.exports = Provider

src/core/provider/queue.js

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
'use strict'
2+
3+
const queue = require('async/queue')
4+
5+
const debug = require('debug')
6+
const log = debug('ipfs:provider')
7+
log.error = debug('ipfs:provider:error')
8+
9+
class WorkerQueue {
10+
/**
11+
* Creates an instance of a WorkerQueue.
12+
* @param {function} executeWork
13+
* @param {number} [concurrency=3]
14+
*/
15+
constructor (executeWork, concurrency = 3) {
16+
this.executeWork = executeWork
17+
this._concurrency = concurrency
18+
19+
this.running = false
20+
this.queue = this._setupQueue()
21+
}
22+
23+
/**
24+
* Create the underlying async queue.
25+
* @returns {queue}
26+
*/
27+
_setupQueue () {
28+
const q = queue(async (block) => {
29+
await this._processNext(block)
30+
}, this._concurrency)
31+
32+
// If there is an error, stop the worker
33+
q.error = (err) => {
34+
log.error(err)
35+
this.stop(err)
36+
}
37+
38+
q.buffer = 0
39+
40+
return q
41+
}
42+
43+
/**
44+
* Use the queue from async to keep `concurrency` amount items running
45+
* @param {Block[]} blocks
46+
* @returns {Promise}
47+
*/
48+
async execute (blocks) {
49+
this.running = true
50+
51+
// store the promise resolution functions to be resolved at end of queue
52+
this.execution = {}
53+
const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject }))
54+
55+
// When all blocks have been processed, stop the worker
56+
this.queue.drain = () => {
57+
log('queue:drain')
58+
this.stop()
59+
}
60+
61+
// Fill queue with blocks
62+
this.queue.push(blocks)
63+
64+
await execPromise
65+
}
66+
67+
/**
68+
* Stop the worker, optionally an error is thrown if received
69+
*
70+
* @param {object} error
71+
*/
72+
stop (error) {
73+
if (!this.running) {
74+
return
75+
}
76+
77+
this.running = false
78+
this.queue.kill()
79+
80+
if (error) {
81+
this.execution.reject(error)
82+
} else {
83+
this.execution.resolve()
84+
}
85+
}
86+
87+
/**
88+
* Process the next block in the queue.
89+
* @param {Block} block
90+
*/
91+
async _processNext (block) {
92+
if (!this.running) {
93+
return
94+
}
95+
96+
// Execute work
97+
log('queue:work')
98+
99+
let execErr
100+
try {
101+
await this.executeWork(block)
102+
} catch (err) {
103+
execErr = err
104+
}
105+
106+
log('queue:work:done', execErr)
107+
}
108+
}
109+
110+
exports = module.exports = WorkerQueue

src/core/provider/reprovider.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const WorkerQueue = require('./queue')
5+
6+
const { blockKeyToCid } = require('../utils')
7+
8+
// const initialDelay = 15000
9+
const initialDelay = 3000
10+
11+
class Reprovider {
12+
/**
13+
* Reprovider goal is to reannounce blocks to the network.
14+
* @param {object} contentRouting
15+
* @param {Blockstore} blockstore
16+
* @param {object} options
17+
* @memberof Reprovider
18+
*/
19+
constructor (contentRouting, blockstore, options) {
20+
this._contentRouting = contentRouting
21+
this._blockstore = blockstore
22+
this._options = options
23+
24+
this._timeoutId = undefined
25+
this._worker = new WorkerQueue(this._provideBlock)
26+
}
27+
28+
/**
29+
* Begin processing the reprovider work and waiting for reprovide triggers
30+
* @returns {void}
31+
*/
32+
start () {
33+
// Start doing reprovides after the initial delay
34+
this._timeoutId = setTimeout(() => {
35+
this._runPeriodically()
36+
}, initialDelay)
37+
}
38+
39+
/**
40+
* Stops the reprovider. Any active reprovide actions should be aborted
41+
* @returns {void}
42+
*/
43+
stop () {
44+
if (this._timeoutId) {
45+
clearTimeout(this._timeoutId)
46+
this._timeoutId = undefined
47+
}
48+
this._worker.stop()
49+
}
50+
51+
/**
52+
* Run reprovide on every `options.interval` ms
53+
* @returns {void}
54+
*/
55+
async _runPeriodically () {
56+
while (this._timeoutId) {
57+
const blocks = await promisify((callback) => this._blockstore.query({}, callback))()
58+
59+
// TODO strategy logic here
60+
if (this._options.strategy === 'pinned') {
61+
62+
} else if (this._options.strategy === 'pinned') {
63+
64+
}
65+
66+
await this._worker.execute(blocks)
67+
68+
// Each subsequent walk should run on a `this._options.interval` interval
69+
await new Promise(resolve => {
70+
// this._timeoutId = setTimeout(resolve, this._options.interval)
71+
this._timeoutId = setTimeout(resolve, 5000)
72+
})
73+
}
74+
}
75+
76+
/**
77+
* Do the reprovide work to libp2p content routing
78+
* @param {Block} block
79+
* @returns {void}
80+
*/
81+
async _provideBlock (block) {
82+
const cid = blockKeyToCid(block.key.toBuffer())
83+
// console.log('cid', cid.multihash)
84+
85+
await promisify((callback) => {
86+
this._contentRouting.provide(cid, callback)
87+
})()
88+
}
89+
}
90+
91+
exports = module.exports = Reprovider

src/core/runtime/config-browser.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ module.exports = () => ({
2626
'/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic',
2727
'/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
2828
],
29+
Reprovider: {
30+
Interval: '12h',
31+
Strategy: 'all'
32+
},
2933
Swarm: {
3034
ConnMgr: {
3135
LowWater: 200,

0 commit comments

Comments
 (0)