diff --git a/package.json b/package.json index ce09553dc9..e74b8bdefb 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "hashlru": "^2.3.0", "human-to-milliseconds": "^2.0.0", "interface-datastore": "~0.6.0", - "ipfs-bitswap": "~0.24.1", + "ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.15.1", "ipfs-http-client": "^32.0.1", @@ -146,6 +146,7 @@ "multihashes": "~0.4.14", "multihashing-async": "~0.6.0", "node-fetch": "^2.3.0", + "p-queue": "^6.0.2", "peer-book": "~0.9.0", "peer-id": "~0.12.0", "peer-info": "~0.15.0", diff --git a/src/core/components/files-regular/refs-local-pull-stream.js b/src/core/components/files-regular/refs-local-pull-stream.js index 5691df2cc6..e0ce6ae479 100644 --- a/src/core/components/files-regular/refs-local-pull-stream.js +++ b/src/core/components/files-regular/refs-local-pull-stream.js @@ -1,9 +1,8 @@ 'use strict' -const CID = require('cids') -const base32 = require('base32.js') const pull = require('pull-stream') const pullDefer = require('pull-defer') +const { blockKeyToCid } = require('../../utils') module.exports = function (self) { return () => { @@ -14,21 +13,12 @@ module.exports = function (self) { return deferred.resolve(pull.error(err)) } - const refs = blocks.map(b => dsKeyToRef(b.key)) + const refs = blocks.map(b => ({ + ref: blockKeyToCid(b.key).toString() + })) deferred.resolve(pull.values(refs)) }) return deferred } } - -function dsKeyToRef (key) { - try { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) - return { ref: new CID(buff).toString() } - } catch (err) { - return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } - } -} diff --git a/src/core/components/start.js b/src/core/components/start.js index 8fc220cdbc..c7f9c10208 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -1,11 +1,14 @@ 'use strict' +const get = require('dlv') +const callbackify = require('callbackify') const series = require('async/series') const Bitswap = require('ipfs-bitswap') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const IPNS = require('../ipns') +const Provider = require('../provider') const routingConfig = require('../ipns/routing/config') const createLibp2pBundle = require('./libp2p') @@ -45,10 +48,24 @@ module.exports = (self) => { libp2p.start(err => { if (err) return cb(err) self.libp2p = libp2p + + // create provider + self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider')) cb() }) }) }, + (cb) => { + // start provider if libp2p routing enabled + if (!get(self._options, 'offline') && + (get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) { + const providerStart = callbackify(() => self._provider.start()) + + providerStart(cb) + } else { + cb() + } + }, (cb) => { const ipnsRouting = routingConfig(self) self._ipns = new IPNS(ipnsRouting, self._repo.datastore, self._peerInfo, self._keychain, self._options) @@ -56,6 +73,7 @@ module.exports = (self) => { self._bitswap = new Bitswap( self.libp2p, self._repo.blocks, + self._provider, { statsEnabled: true } ) diff --git a/src/core/config.js b/src/core/config.js index 2fb66bb558..0c240e1050 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -66,6 +66,11 @@ const configSchema = s({ Enabled: 'boolean?' })) })), + Reprovider: optional(s({ + Delay: 'string?', + Interval: 'string?', + Strategy: 'string?' + })), Bootstrap: optional(s(['multiaddr-ipfs'])) })), ipld: 'object?', diff --git a/src/core/provider/index.js b/src/core/provider/index.js new file mode 100644 index 0000000000..e54a0c88c8 --- /dev/null +++ b/src/core/provider/index.js @@ -0,0 +1,123 @@ +'use strict' + +const errCode = require('err-code') +const human = require('human-to-milliseconds') +const promisify = require('promisify-es6') +const assert = require('assert') + +const CID = require('cids') + +const Reprovider = require('./reprovider') + +class Provider { + /** + * Provider goal is to announce blocks to the network. + * It keeps track of which blocks are provided, and allow them to be reprovided + * @param {Libp2p} libp2p libp2p instance + * @param {Blockstore} blockstore blockstore instance + * @param {object} options reprovider options + * @param {string} options.delay reprovider initial delay in human friendly time + * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.strategy reprovider strategy + */ + constructor (libp2p, blockstore, options = {}) { + // Assert options + this._validateOptions(options) + + this._running = false + + this._contentRouting = libp2p.contentRouting + this._blockstore = blockstore + + // handle options (config uses uppercase) + const humanDelay = options.Delay || options.delay || '15s' + const delay = human(humanDelay) + const humanInterval = options.Interval || options.interval || '12h' + const interval = human(humanInterval) + const strategy = options.Strategy || options.strategy || 'all' + + this._options = { + delay, + interval, + strategy + } + + this.reprovider = new Reprovider(this._contentRouting, this._blockstore, this._options) + + } + + /** + * Begin processing the provider work + * @returns {void} + */ + async start () { + // do not run twice + if (this._running) { + return + } + + this._running = true + + // Start reprovider + this.reprovider.start() + } + + /** + * Stop the provider + * @returns {void} + */ + stop () { + this._running = false + + // stop the reprovider + this.reprovider.stop() + } + + /** + * Announce block to the network + * Takes a cid and makes an attempt to announce it to the network + * @param {CID} cid + */ + async provide (cid) { + if (!CID.isCID(cid)) { + throw errCode('invalid CID to provide', 'ERR_INVALID_CID') + } + + await promisify((callback) => { + this._contentRouting.provide(cid, callback) + })() + } + + /** + * Find providers of a block in the network + * @param {CID} cid cid of the block + * @param {object} options + * @param {number} options.timeout - how long the query should maximally run, in ms (default: 60000) + * @param {number} options.maxNumProviders - maximum number of providers to find + * @returns {Promise} + */ + async findProviders (cid, options) { // eslint-disable-line require-await + if (!CID.isCID(cid)) { + throw errCode('invalid CID to find', 'ERR_INVALID_CID') + } + + return promisify((callback) => { + this._contentRouting.findProviders(cid, options, callback) + })() + } + + // Validate Provider options + _validateOptions (options) { + const delay = (options.Delay || options.delay) + assert(delay && parseInt(delay) !== 0, '0 delay is not a valid value for reprovider') + + const interval = (options.Interval || options.interval) + assert(interval && parseInt(interval) !== 0, '0 interval is not a valid value for reprovider') + + const strategy = (options.Strategy || options.strategy) + assert(strategy && (strategy === 'all' || strategy === 'pinned' || strategy === 'roots'), + 'Reprovider must have one of the following strategies: `all`, `pinned` or `roots`') + } +} + +exports = module.exports = Provider diff --git a/src/core/provider/queue.js b/src/core/provider/queue.js new file mode 100644 index 0000000000..52c8492ddc --- /dev/null +++ b/src/core/provider/queue.js @@ -0,0 +1,74 @@ +'use strict' + +const { default: PQueue } = require('p-queue') + +const debug = require('debug') +const log = debug('ipfs:provider') +log.error = debug('ipfs:provider:error') + +class WorkerQueue { + /** + * Creates an instance of a WorkerQueue. + * @param {function} executeWork + * @param {number} [concurrency=3] + */ + constructor (executeWork, concurrency = 3) { + this.executeWork = executeWork + this._concurrency = concurrency + + this.running = false + this.queue = new PQueue({ concurrency }) + } + + /** + * Use the queue from async to keep `concurrency` amount items running + * @param {Block[]} blocks + */ + async execute (blocks) { + this.running = true + + // Fill queue with the processing blocks function + this.queue.addAll(blocks.map((block) => async () => this._processNext(block))) // eslint-disable-line require-await + + // Wait for finishing + await this.queue.onIdle() + + this.stop() + } + + /** + * Stop the worker + */ + stop () { + if (!this.running) { + return + } + + this.running = false + this.queue.clear() + } + + /** + * Process the next block in the queue. + * @param {Block} block + */ + async _processNext (block) { + if (!this.running) { + return + } + + // Execute work + log('queue:work') + + let execErr + try { + await this.executeWork(block) + } catch (err) { + execErr = err + } + + log('queue:work:done', execErr) + } +} + +exports = module.exports = WorkerQueue diff --git a/src/core/provider/reprovider.js b/src/core/provider/reprovider.js new file mode 100644 index 0000000000..dbe2ba5bbc --- /dev/null +++ b/src/core/provider/reprovider.js @@ -0,0 +1,95 @@ +'use strict' + +const promisify = require('promisify-es6') +const WorkerQueue = require('./queue') + +const { blockKeyToCid } = require('../utils') + +class Reprovider { + /** + * Reprovider goal is to reannounce blocks to the network. + * @param {object} contentRouting + * @param {Blockstore} blockstore + * @param {object} options + * @param {string} options.delay reprovider initial delay in milliseconds + * @param {string} options.interval reprovider interval in milliseconds + * @param {string} options.strategy reprovider strategy + */ + constructor (contentRouting, blockstore, options) { + this._contentRouting = contentRouting + this._blockstore = blockstore + this._options = options + + this._timeoutId = undefined + this._worker = new WorkerQueue(this._provideBlock) + } + + /** + * Begin processing the reprovider work and waiting for reprovide triggers + * @returns {void} + */ + start () { + // Start doing reprovides after the initial delay + this._timeoutId = setTimeout(() => { + this._runPeriodically() + }, this._options.delay) + } + + /** + * Stops the reprovider. Any active reprovide actions should be aborted + * @returns {void} + */ + stop () { + if (this._timeoutId) { + clearTimeout(this._timeoutId) + this._timeoutId = undefined + } + this._worker.stop() + } + + /** + * Run reprovide on every `options.interval` ms recursively + * @returns {void} + */ + async _runPeriodically () { + // Verify if stopped + if (!this._timeoutId) return + + // TODO strategy logic here + const blocks = await promisify((callback) => this._blockstore.query({}, callback))() + + if (this._options.strategy === 'pinned') { + + } else if (this._options.strategy === 'roots') { + + } + + // Verify if stopped + if (!this._timeoutId) return + + await this._worker.execute(blocks) + + // Verify if stopped + if (!this._timeoutId) return + + // Each subsequent walk should run on a `this._options.interval` interval + this._timeoutId = setTimeout(() => { + this._runPeriodically() + }, this._options.interval) + } + + /** + * Do the reprovide work to libp2p content routing + * @param {Block} block + * @returns {void} + */ + async _provideBlock (block) { + const cid = blockKeyToCid(block.key.toBuffer()) + + await promisify((callback) => { + this._contentRouting.provide(cid, callback) + })() + } +} + +exports = module.exports = Reprovider diff --git a/src/core/runtime/config-browser.js b/src/core/runtime/config-browser.js index 537316d431..bb673907d0 100644 --- a/src/core/runtime/config-browser.js +++ b/src/core/runtime/config-browser.js @@ -27,6 +27,11 @@ module.exports = () => ({ '/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic', '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], + Reprovider: { + Delay: '15s', + Interval: '12h', + Strategy: 'all' + }, Swarm: { ConnMgr: { LowWater: 200, diff --git a/src/core/runtime/config-nodejs.js b/src/core/runtime/config-nodejs.js index 60169ef562..45d5a56294 100644 --- a/src/core/runtime/config-nodejs.js +++ b/src/core/runtime/config-nodejs.js @@ -40,6 +40,11 @@ module.exports = () => ({ '/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic', '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], + Reprovider: { + Delay: '15s', + Interval: '12h', + Strategy: 'all' + }, Swarm: { ConnMgr: { LowWater: 200, diff --git a/src/core/utils.js b/src/core/utils.js index 98760b0338..d64764e308 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -4,6 +4,11 @@ const promisify = require('promisify-es6') const map = require('async/map') const isIpfs = require('is-ipfs') const CID = require('cids') +const base32 = require('base32.js') + +// const { Key } = require('interface-datastore') + +// const PIN_DS_KEY = new Key('/local/pins') const ERR_BAD_PATH = 'ERR_BAD_PATH' exports.OFFLINE_ERROR = 'This command must be run in online mode. Try running \'ipfs daemon\' first.' @@ -134,7 +139,43 @@ const resolvePath = promisify(function (objectAPI, ipfsPaths, callback) { } }, callback) }) +/** + * Convert a block key to cid + * @param {Key} key form / + * @returns {CID} + */ +function blockKeyToCid (key) { + try { + const decoder = new base32.Decoder() + const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) + return new CID(buff) + } catch (err) { + return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } + } +} + +/* +async function getInternalCidBlocks (ipfs) { + let mh + try { + mh = await promisify((cb) => ipfs.repo.datastore.get(PIN_DS_KEY, cb))() + } catch (err) { + if (err.code === 'ERR_NOT_FOUND') { + return [] + } + throw err + } + + const cid = new CID(mh) + const obj = await promisify((cb) => ipfs.dag.get(cid, '', { preload: false }, cb))() + + // The pinner stores an object that has two links to pin sets: + // 1. The directly pinned CIDs + // 2. The recursively pinned CIDs + // If large enough, these pin sets may have links to buckets to hold the pins +} */ exports.normalizePath = normalizePath exports.parseIpfsPath = parseIpfsPath exports.resolvePath = resolvePath +exports.blockKeyToCid = blockKeyToCid diff --git a/test/core/provider.spec.js b/test/core/provider.spec.js new file mode 100644 index 0000000000..cce293ee74 --- /dev/null +++ b/test/core/provider.spec.js @@ -0,0 +1,209 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const sinon = require('sinon') + +const CID = require('cids') +const human = require('human-to-milliseconds') + +const IPFS = require('../../src') +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc' }) + +const DELAY = '3s' +const INTERVAL = '10s' +const STRATEGY = 'all' + +const config = { + Bootstrap: [], + Reprovider: { + Delay: DELAY, + Interval: INTERVAL, + Strategy: STRATEGY + } +} + +describe('record provider', () => { + // if no dht nor delegated routing enabled + describe('disabled', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should not be running', () => { + expect(node._provider._running).to.equal(false) + expect(node._provider.reprovider._timeoutId).to.not.exist() + }) + }) + + describe('enabled with default configuration', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS, + libp2p: { + config: { + dht: { + enabled: true + } + } + } + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should be running', () => { + expect(node._provider._running).to.equal(true) + expect(node._provider.reprovider).to.exist() + expect(node._provider.reprovider._timeoutId).to.exist() + }) + + it('should use the defaults', () => { + expect(node._provider._options.interval).to.equal(human('12h')) + expect(node._provider._options.strategy).to.equal('all') + }) + + it('should be able to provide a valid CIDs', async () => { + const cid = new CID('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ') + + try { + await node._provider.provide(cid) + } catch (err) { + expect(err).to.not.exist() + throw err + } + }) + + it('should thrown providing an invalid CIDs', async () => { + const cid = 'Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ' + + try { + await node._provider.provide(cid) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_CID') + } + }) + + it('should be able to find providers of a valid CID', async () => { + const cid = new CID('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ') + + let providers + try { + providers = await node._provider.findProviders(cid) + } catch (err) { + expect(err).to.not.exist() + throw err + } + + expect(providers).to.exist() + }) + + it('should thrown finding providers of an invalid CID', async () => { + const cid = 'Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ' + + try { + await node._provider.findProviders(cid) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_CID') + } + }) + }) + + describe('enabled with custom config', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS, + config, + libp2p: { + config: { + dht: { + enabled: true + } + } + } + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should be running', () => { + expect(node._provider._running).to.equal(true) + expect(node._provider.reprovider).to.exist() + expect(node._provider.reprovider._timeoutId).to.exist() + }) + + it('should use the provided configuration', () => { + expect(node._provider._options.interval).to.equal(human(INTERVAL)) + expect(node._provider._options.strategy).to.equal(STRATEGY) + }) + + it('should reprovide after tens seconds', function (done) { + this.timeout(20 * 1000) + + const reprovider = node._provider.reprovider + sinon.spy(reprovider, '_runPeriodically') + sinon.spy(reprovider._worker, '_processNext') + + setTimeout(() => { + expect(reprovider._runPeriodically.called).to.equal(true) + expect(reprovider._worker._processNext.called).to.equal(true) + + sinon.restore() + done() + }, 10000) + }) + }) + + describe.skip('reprovide strategies', () => { + + }) +})