diff --git a/.gitignore b/.gitignore index 03aca806..9c3c4a1b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ node_modules dist test/test-repo-for* +docs + +test/test-repo/datastore \ No newline at end of file diff --git a/README.md b/README.md index f0288d21..08c4517e 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ - [Use in a browser with browserify, webpack or any other bundler](#use-in-a-browser-with-browserify-webpack-or-any-other-bundler) - [Use in a browser using a script tag](#use-in-a-browser-using-a-script-tag) - [Usage](#usage) +- [API](#api) - [Contribute](#contribute) - [License](#license) @@ -30,8 +31,8 @@ ### npm -```sh -> npm install ipfs-bitswap --save +```bash +> npm install ipfs-bitswap ``` ### Use in Node.js @@ -42,8 +43,6 @@ const Bitswap = require('ipfs-bitswap') ### Use in a browser with browserify, webpack or any other bundler -The code published to npm that gets loaded on require is in fact a ES5 transpiled version with the right shims added. This means that you can require it and use with your favourite bundler without having to adjust asset management process. - ```js const Bitswap = require('ipfs-bitswap') ``` @@ -60,84 +59,11 @@ Loading this module through a script tag will make the `IpfsBitswap` object avai ## Usage -For the documentation see [API.md](API.md). - -### API - -#### `new Bitswap(libp2p, blockstore)` - -- `libp2p: Libp2p`, instance of the local network stack. -- `blockstore: Blockstore`, instance of the local database (`IpfsRepo.blockstore`) - -Create a new instance. - -#### `getStream(cid)` - -- `cid: CID|Array` - -Returns a source `pull-stream`. Values emitted are the received blocks. - -Example: - -```js -// Single block -pull( - bitswap.getStream(cid), - pull.collect((err, blocks) => { - // blocks === [block] - }) -) - -// Many blocks -pull( - bitswap.getStream([cid1, cid2, cid3]), - pull.collect((err, blocks) => { - // blocks === [block1, block2, block3] - }) -) -``` - -> Note: This is safe guarded so that the network is not asked -> for blocks that are in the local `datastore`. - -#### `unwant(cids)` - -- `cids: CID|[]CID` - -Cancel previously requested cids, forcefully. That means they are removed from the -wantlist independent of how many other resources requested these cids. Callbacks -attached to `getBlock` are errored with `Error('manual unwant: cid)`. - -#### `cancelWants(cids)` - -- `cid: CID|[]CID` - -Cancel previously requested cids. - -#### `putStream()` - -Returns a duplex `pull-stream` that emits an object `{cid: CID}` for every written block when it was stored. -Objects passed into here should be of the form `{data: Buffer, cid: CID}` - -#### `put(blockAndCid, callback)` - -- `blockAndCid: {data: Buffer, cid: CID}` -- `callback: Function` - -Announce that the current node now has the block containing `data`. This will store it -in the local database and attempt to serve it to all peers that are known - to have requested it. The callback is called when we are sure that the block - is stored. - -#### `wantlistForPeer(peerId)` - -- `peerId: PeerId` - -Get the wantlist for a given peer. +See https://ipfs.github.io/js-ipfs-bitswap -#### `stat()` +## API -Get stats about about the current state of the bitswap instance. +See https://ipfs.github.io/js-ipfs-bitswap ## Development diff --git a/benchmarks/index.js b/benchmarks/index.js index 2652449d..83000b96 100644 --- a/benchmarks/index.js +++ b/benchmarks/index.js @@ -8,10 +8,10 @@ const mapSeries = require('async/mapSeries') const each = require('async/each') const _ = require('lodash') const Block = require('ipfs-block') -const pull = require('pull-stream') const assert = require('assert') const crypto = require('crypto') const CID = require('cids') +const multihashing = require('multihashing-async') const utils = require('../test/utils') @@ -50,12 +50,11 @@ function shutdown (nodeArr, cb) { } function round (nodeArr, blockFactor, n, cb) { - const blocks = createBlocks(n, blockFactor) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { + createBlocks(n, blockFactor, (err, blocks) => { if (err) { return cb(err) } - const cids = keys.map((k) => new CID(k)) + const cids = blocks.map((b) => b.cid) let d series([ // put blockFactor amount of blocks per node @@ -64,10 +63,7 @@ function round (nodeArr, blockFactor, n, cb) { const data = _.map(_.range(blockFactor), (j) => { const index = i * blockFactor + j - return { - block: blocks[index], - cid: cids[index] - } + return blocks[index] }) each( data, @@ -81,17 +77,14 @@ function round (nodeArr, blockFactor, n, cb) { }, // fetch all blocks on every node (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { - pull( - node.bitswap.getStream(cids), - pull.collect((err, res) => { - if (err) { - return callback(err) - } + map(cids, (cid, cb) => node.bitswap.get(cid, cb), (err, res) => { + if (err) { + return callback(err) + } - assert(res.length === blocks.length) - callback() - }) - ) + assert(res.length === blocks.length) + callback() + }) }), cb) ], (err) => { if (err) { @@ -103,8 +96,14 @@ function round (nodeArr, blockFactor, n, cb) { }) } -function createBlocks (n, blockFactor) { - return _.map(_.range(n * blockFactor), () => { - return new Block(crypto.randomBytes(n * blockFactor)) - }) +function createBlocks (n, blockFactor, callback) { + map(_.range(n * blockFactor), (i, cb) => { + const data = crypto.randomBytes(n * blockFactor) + multihashing(data, 'sha2-256', (err, hash) => { + if (err) { + return cb(err) + } + cb(null, new Block(data, new CID(hash))) + }) + }, callback) } diff --git a/benchmarks/put-get.js b/benchmarks/put-get.js index 33f9d8a8..17feba8c 100644 --- a/benchmarks/put-get.js +++ b/benchmarks/put-get.js @@ -1,13 +1,15 @@ +/* eslint max-nested-callbacks: ["error", 8] */ 'use strict' const Benchmark = require('benchmark') const _ = require('lodash') const Block = require('ipfs-block') const assert = require('assert') -const pull = require('pull-stream') const series = require('async/series') +const map = require('async/map') const crypto = require('crypto') const CID = require('cids') +const multihashing = require('multihashing-async') const utils = require('../test/utils') @@ -25,15 +27,19 @@ utils.genBitswapNetwork(1, (err, nodes) => { blockCounts.forEach((n) => blockSizes.forEach((k) => { suite.add(`put-get ${n} blocks of size ${k}`, (defer) => { - const blocks = createBlocks(n, k) - series([ - (cb) => put(blocks, bitswap, cb), - (cb) => get(blocks, bitswap, cb) - ], (err) => { + createBlocks(n, k, (err, blocks) => { if (err) { throw err } - defer.resolve() + series([ + (cb) => bitswap.putMany(blocks, cb), + (cb) => get(blocks, bitswap, cb) + ], (err) => { + if (err) { + throw err + } + defer.resolve() + }) }) }, { defer: true @@ -52,40 +58,27 @@ utils.genBitswapNetwork(1, (err, nodes) => { }) }) -function createBlocks (n, k) { - return _.map(_.range(n), () => { - return new Block(crypto.randomBytes(k)) - }) -} - -function put (blocks, bs, callback) { - pull( - pull.values(blocks), - pull.asyncMap((b, cb) => { - b.key((err, key) => { - if (err) { - return cb(err) - } - cb(null, {cid: new CID(key), block: b}) - }) - }), - bs.putStream(), - pull.onEnd(callback) - ) -} - -function get (blocks, bs, callback) { - pull( - pull.values(blocks), - pull.asyncMap((b, cb) => b.key(cb)), - pull.map((k) => bs.getStream(new CID(k))), - pull.flatten(), - pull.collect((err, res) => { +function createBlocks (n, k, callback) { + map(_.range(n), (i, cb) => { + const data = crypto.randomBytes(k) + multihashing(data, 'sha2-256', (err, hash) => { if (err) { - return callback(err) + return cb(err) } - assert(res.length === blocks.length) - callback() + cb(null, new Block(data, new CID(hash))) }) - ) + }, callback) +} + +function get (blocks, bs, callback) { + map(blocks, (b, cb) => { + bs.get(b.cid, cb) + }, (err, res) => { + if (err) { + return callback(err) + } + + assert(res.length === blocks.length) + callback() + }) } diff --git a/package.json b/package.json index 4092b7af..a8a2e1bc 100644 --- a/package.json +++ b/package.json @@ -11,13 +11,14 @@ "test:browser": "aegir-test browser", "test:node": "aegir-test node", "lint": "aegir-lint", - "release": "aegir-release", - "release-minor": "aegir-release --type minor", - "release-major": "aegir-release --type major", + "release": "aegir-release --docs", + "release-minor": "aegir-release --type minor --docs", + "release-major": "aegir-release --type major --docs", "bench": "node benchmarks/index", "build": "aegir-build", "coverage": "aegir-coverage", - "coverage-publish": "aegir-coverage publish" + "coverage-publish": "aegir-coverage publish", + "docs": "aegir-docs" }, "repository": { "type": "git", @@ -36,42 +37,39 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^10.0.0", + "aegir": "^11.0.0", "benchmark": "^2.1.3", - "buffer-loader": "0.0.1", "chai": "^3.5.0", - "fs-pull-blob-store": "~0.4.1", - "idb-pull-blob-store": "~0.5.1", - "interface-pull-blob-store": "~0.6.0", - "ipfs-repo": "~0.11.3", + "dirty-chai": "^1.2.2", + "ipfs-repo": "~0.12.0", "libp2p-ipfs-nodejs": "~0.19.0", "lodash": "^4.17.4", - "multiaddr": "^2.2.1", + "multiaddr": "^2.2.2", "ncp": "^2.0.0", "peer-book": "~0.3.1", - "peer-id": "~0.8.2", - "peer-info": "~0.8.3", + "peer-id": "~0.8.4", + "peer-info": "~0.8.4", "rimraf": "^2.6.1", "safe-buffer": "^5.0.1" }, "dependencies": { "async": "^2.1.5", - "cids": "~0.4.1", - "debug": "^2.6.2", - "heap": "^0.2.6", - "ipfs-block": "~0.5.5", + "cids": "~0.4.2", + "debug": "^2.6.3", + "ipfs-block": "~0.6.0", "lodash.debounce": "^4.0.8", "lodash.find": "^4.6.0", "lodash.groupby": "^4.6.0", "lodash.isequalwith": "^4.4.0", "lodash.isundefined": "^3.0.1", "lodash.pullallwith": "^4.7.0", + "lodash.sortby": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", + "multihashing-async": "^0.4.4", "protocol-buffers": "^3.2.1", "pull-defer": "^0.2.2", "pull-length-prefixed": "^1.2.0", - "pull-paramap": "^1.2.1", "pull-pushable": "^2.0.1", "pull-stream": "^3.5.0", "varint-decoder": "^0.1.1" @@ -87,4 +85,4 @@ "greenkeeperio-bot ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/components/decision-engine/index.js b/src/components/decision-engine/index.js index f4b6d507..de3370d1 100644 --- a/src/components/decision-engine/index.js +++ b/src/components/decision-engine/index.js @@ -1,7 +1,6 @@ 'use strict' const debug = require('debug') -const pull = require('pull-stream') const each = require('async/each') const eachSeries = require('async/eachSeries') const waterfall = require('async/waterfall') @@ -41,7 +40,7 @@ class DecisionEngine { // split into messges of max 512 * 1024 bytes const blocks = env.blocks const total = blocks.reduce((acc, b) => { - return acc + b.block.data.byteLength + return acc + b.data.byteLength }, 0) if (total < MAX_MESSAGE_SIZE) { @@ -53,7 +52,7 @@ class DecisionEngine { eachSeries(blocks, (b, cb) => { batch.push(b) - size += b.block.data.byteLength + size += b.data.byteLength if (size >= MAX_MESSAGE_SIZE) { const nextBatch = batch.slice() @@ -69,7 +68,7 @@ class DecisionEngine { const msg = new Message(false) blocks.forEach((b) => { - msg.addBlock(b.cid, b.block) + msg.addBlock(b) }) // console.log('sending %s blocks', msg.blocks.size) @@ -95,18 +94,7 @@ class DecisionEngine { waterfall([ (cb) => map(uniqCids, (cid, cb) => { - pull( - this.blockstore.getStream(cid.multihash), - pull.collect((err, blocks) => { - if (err) { - return cb(err) - } - cb(null, { - cid: cid, - block: blocks[0] - }) - }) - ) + this.blockstore.get(cid, cb) }, cb), (blocks, cb) => each(values(groupedTasks), (tasks, cb) => { // all tasks have the same target @@ -123,7 +111,7 @@ class DecisionEngine { log.error('failed to send', err) } blockList.forEach((block) => { - this.messageSent(peer, block.block, block.cid) + this.messageSent(peer, block) }) cb() }) @@ -216,7 +204,7 @@ class DecisionEngine { _addWants (ledger, peerId, entries, cb) { each(entries, (entry, cb) => { // If we already have the block, serve it - this.blockstore.has(entry.cid.multihash, (err, exists) => { + this.blockstore.has(entry.cid, (err, exists) => { if (err) { log.error('failed existence check') } else if (exists) { @@ -236,8 +224,8 @@ class DecisionEngine { _processBlocks (blocks, ledger, callback) { const cids = [] blocks.forEach((b, cidStr) => { - log('got block (%s bytes)', b.block.data.length) - ledger.receivedBytes(b.block.data.length) + log('got block (%s bytes)', b.data.length) + ledger.receivedBytes(b.data.length) cids.push(b.cid) }) @@ -245,11 +233,11 @@ class DecisionEngine { } // Clear up all accounting things after message was sent - messageSent (peerId, block, cid) { + messageSent (peerId, block) { const ledger = this._findOrCreate(peerId) ledger.sentBytes(block ? block.data.length : 0) - if (cid) { - ledger.wantlist.remove(cid) + if (block && block.cid) { + ledger.wantlist.remove(block.cid) } } diff --git a/src/components/network/index.js b/src/components/network/index.js index 79b54e69..fdb385d4 100644 --- a/src/components/network/index.js +++ b/src/components/network/index.js @@ -75,8 +75,7 @@ class Network { return cb(err) } // log('data from', peerInfo.id.toB58String()) - this.bitswap._receiveMessage(peerInfo.id, msg) - cb() + this.bitswap._receiveMessage(peerInfo.id, msg, cb) }) }), pull.onEnd((err) => { diff --git a/src/components/want-manager/msg-queue.js b/src/components/want-manager/msg-queue.js index 3ea18ac0..f9f25652 100644 --- a/src/components/want-manager/msg-queue.js +++ b/src/components/want-manager/msg-queue.js @@ -57,7 +57,6 @@ module.exports = class MsgQueue { this.network.sendMessage(this.peerId, msg, (err) => { if (err) { log.error('send error: %s', err.message) - return } }) }) diff --git a/src/index.js b/src/index.js index 87c9b6f8..de5a5684 100644 --- a/src/index.js +++ b/src/index.js @@ -1,11 +1,9 @@ 'use strict' const waterfall = require('async/waterfall') +const reject = require('async/reject') const each = require('async/each') const EventEmitter = require('events').EventEmitter -const pull = require('pull-stream') -const paramap = require('pull-paramap') -const defer = require('pull-defer/source') const debug = require('debug') const CONSTANTS = require('./constants') @@ -16,7 +14,18 @@ const DecisionEngine = require('./components/decision-engine') const log = debug('bitswap') log.error = debug('bitswap:error') +/** + * + */ class Bitswap { + /** + * Create a new bitswap instance. + * + * @param {Libp2p} libp2p + * @param {Blockstore} blockstore + * @param {PeerBook} peerBook + * @returns {Bitswap} + */ constructor (libp2p, blockstore, peerBook) { this.libp2p = libp2p // the network delivers messages @@ -39,49 +48,45 @@ class Bitswap { } // handle messages received through the network - _receiveMessage (peerId, incoming, cb) { - cb = cb || (() => {}) + _receiveMessage (peerId, incoming, callback) { this.engine.messageReceived(peerId, incoming, (err) => { if (err) { log('failed to receive message', incoming) } if (incoming.blocks.size === 0) { - return cb() + return callback() } - const cidsAndBlocks = Array.from(incoming.blocks.values()) + const blocks = Array.from(incoming.blocks.values()) // quickly send out cancels, reduces chances of duplicate block receives - const toCancel = cidsAndBlocks + const toCancel = blocks .filter((b) => this.wm.wantlist.contains(b.cid)) .map((b) => b.cid) this.wm.cancelWants(toCancel) each( - cidsAndBlocks, - this._handleReceivedBlock.bind(this, peerId), - cb + blocks, + (b, cb) => this._handleReceivedBlock(peerId, b, cb), + callback ) }) } - _handleReceivedBlock (peerId, cidAndBlock, callback) { - const cid = cidAndBlock.cid - const block = cidAndBlock.block + _handleReceivedBlock (peerId, block, callback) { + log('received block') waterfall([ - (cb) => this.blockstore.has(cid.multihash, cb), - (exists, cb) => { - this._updateReceiveCounters(block, exists) - log('got block') - - if (exists) { + (cb) => this.blockstore.has(block.cid, cb), + (has, cb) => { + this._updateReceiveCounters(block, has) + if (has) { return cb() } - this._putBlockStore(cidAndBlock, cb) + this._putBlock(block, cb) } ], callback) } @@ -111,37 +116,47 @@ class Bitswap { this.engine.peerDisconnected(peerId) } - // return the current wantlist for a given `peerId` - wantlistForPeer (peerId) { - return this.engine.wantlistForPeer(peerId) - } + _putBlock (block, callback) { + this.blockstore.put(block, (err) => { + if (err) { + return callback(err) + } - getStream (cids) { - if (!Array.isArray(cids)) { - return this._getStreamSingle(cids) - } + this.notifications.emit( + `block:${block.cid.buffer.toString()}`, + block + ) + this.engine.receivedBlocks([block.cid]) + callback() + }) + } - return pull( - pull.values(cids), - paramap((cid, cb) => { - pull( - this._getStreamSingle(cid), - pull.collect(cb) - ) - }), - pull.flatten() - ) + /** + * Return the current wantlist for a given `peerId` + * + * @param {PeerId} peerId + * @returns {Wantlist} + */ + wantlistForPeer (peerId) { + return this.engine.wantlistForPeer(peerId) } - _getStreamSingle (cid) { + /** + * Fetch a given block by cid. If the block is in the local + * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us. + * + * @param {CID} cid + * @param {function(Error, Block)} callback + * @returns {void} + */ + get (cid, callback) { const unwantListeners = {} const blockListeners = {} const cidStr = cid.buffer.toString() const unwantEvent = `unwant:${cidStr}` const blockEvent = `block:${cidStr}` - const d = defer() - + log('get: %s', cidStr) const cleanupListener = () => { if (unwantListeners[cidStr]) { this.notifications.removeListener(unwantEvent, unwantListeners[cidStr]) @@ -159,33 +174,32 @@ class Bitswap { log(`manual unwant: ${cidStr}`) cleanupListener() this.wm.cancelWants([cid]) - d.resolve(pull.empty()) + callback() } blockListeners[cidStr] = (block) => { this.wm.cancelWants([cid]) cleanupListener(cid) - d.resolve(pull.values([block])) + callback(null, block) } this.notifications.once(unwantEvent, unwantListeners[cidStr]) this.notifications.once(blockEvent, blockListeners[cidStr]) } - this.blockstore.has(cid.multihash, (err, exists) => { + this.blockstore.has(cid, (err, has) => { if (err) { - return d.resolve(pull.error(err)) + return callback(err) } - if (exists) { + + if (has) { log('already have block: %s', cidStr) - return d.resolve(this.blockstore.getStream(cid.multihash)) + return this.blockstore.get(cid, callback) } addListener() this.wm.wantBlocks([cid]) }) - - return d } // removes the given cids from the wantlist independent of any ref counts @@ -208,63 +222,73 @@ class Bitswap { this.wm.cancelWants(cids) } - putStream () { - return pull( - pull.asyncMap((blockAndCid, cb) => { - this.blockstore.has(blockAndCid.cid.multihash, (err, exists) => { - if (err) { - return cb(err) - } - - cb(null, [blockAndCid, exists]) - }) - }), - pull.filter((val) => !val[1]), - pull.asyncMap((val, cb) => { - this._putBlockStore(val[0], cb) - }) - ) - } + /** + * Put the given block to the underlying blockstore and + * send it to nodes that have it in their wantlist. + * + * @param {Block} block + * @param {function(Error)} callback + * @returns {void} + */ + put (block, callback) { + log('putting block') - _putBlockStore (blockAndCid, callback) { - const block = blockAndCid.block - const cid = blockAndCid.cid - const cidStr = cid.buffer.toString() + waterfall([ + (cb) => this.blockstore.has(block.cid, cb), + (has, cb) => { + if (has) { + return cb() + } - log('putting block') + this._putBlock(block, cb) + } + ], callback) + } - pull( - pull.values([{ - data: block.data, - key: cid.multihash - }]), - this.blockstore.putStream(), - pull.collect((err, meta) => { + /** + * Put the given blocks to the underlying blockstore and + * send it to nodes that have it them their wantlist. + * + * @param {Array} blocks + * @param {function(Error)} callback + * @returns {void} + */ + putMany (blocks, callback) { + waterfall([ + (cb) => reject(blocks, (b, cb) => { + this.blockstore.has(b.cid, cb) + }, cb), + (newBlocks, cb) => this.blockstore.putMany(newBlocks, (err) => { if (err) { - return callback(err) + return cb(err) } - log('put block') - this.notifications.emit(`block:${cidStr}`, block) - this.engine.receivedBlocks([cid]) - callback(null, meta) + newBlocks.forEach((block) => { + this.notifications.emit( + `block:${block.cid.buffer.toString()}`, + block + ) + this.engine.receivedBlocks([block.cid]) + }) + cb() }) - ) - } - - // announces the existance of a block to this service - put (blockAndCid, callback) { - pull( - pull.values([blockAndCid]), - this.putStream(), - pull.onEnd(callback) - ) + ], callback) } + /** + * Get the current list of wants. + * + * @returns {Array} + */ getWantlist () { return this.wm.wantlist.entries() } + /** + * Get stats about the bitswap node. + * + * @returns {Object} + */ stat () { return { wantlist: this.getWantlist(), @@ -275,13 +299,22 @@ class Bitswap { } } + /** + * Start the bitswap node. + * + * @returns {void} + */ start () { this.wm.run() this.network.start() this.engine.start() } - // Halt everything + /** + * Stooop the bitswap node. + * + * @returns {void} + */ stop () { this.wm.stop(this.libp2p.peerInfo.id) this.network.stop() diff --git a/src/types/message/index.js b/src/types/message/index.js index fcb94883..d7b6bd5c 100644 --- a/src/types/message/index.js +++ b/src/types/message/index.js @@ -4,10 +4,11 @@ const protobuf = require('protocol-buffers') const Block = require('ipfs-block') const isEqualWith = require('lodash.isequalwith') const assert = require('assert') -const map = require('async/map') +const each = require('async/each') const CID = require('cids') const codecName = require('multicodec/src/name-table') const vd = require('varint-decoder') +const multihashing = require('multihashing-async') const pbm = protobuf(require('./message.proto')) const Entry = require('./entry') @@ -38,10 +39,10 @@ class BitswapMessage { } } - addBlock (cid, block) { - assert(CID.isCID(cid), 'must be a valid cid') - const cidStr = cid.buffer.toString() - this.blocks.set(cidStr, {block: block, cid: cid}) + addBlock (block) { + assert(Block.isBlock(block), 'must be a valid cid') + const cidStr = block.cid.buffer.toString() + this.blocks.set(cidStr, block) } cancel (cid) { @@ -67,7 +68,7 @@ class BitswapMessage { }) }, blocks: Array.from(this.blocks.values()) - .map((block) => block.block.data) + .map((block) => block.data) } if (this.full) { @@ -102,7 +103,7 @@ class BitswapMessage { this.blocks.forEach((block) => { msg.payload.push({ prefix: block.cid.prefix, - data: block.block.data + data: block.data }) }) @@ -155,14 +156,13 @@ BitswapMessage.deserialize = (raw, callback) => { // Bitswap 1.0.0 // decoded.blocks are just the byte arrays if (decoded.blocks.length > 0) { - map(decoded.blocks, (b, cb) => { - const block = new Block(b) - block.key((err, key) => { + return each(decoded.blocks, (b, cb) => { + multihashing(b, 'sha2-256', (err, hash) => { if (err) { return cb(err) } - const cid = new CID(key) - msg.addBlock(cid, block) + const cid = new CID(hash) + msg.addBlock(new Block(b, cid)) cb() }) }, (err) => { @@ -171,12 +171,11 @@ BitswapMessage.deserialize = (raw, callback) => { } callback(null, msg) }) - return } // Bitswap 1.1.0 if (decoded.payload.length > 0) { - map(decoded.payload, (p, cb) => { + return each(decoded.payload, (p, cb) => { if (!p.prefix || !p.data) { cb() } @@ -185,13 +184,14 @@ BitswapMessage.deserialize = (raw, callback) => { const multicodec = values[1] const hashAlg = values[2] // const hashLen = values[3] // We haven't need to use this so far - const block = new Block(p.data) - block.key(hashAlg, (err, multihash) => { + multihashing(p.data, hashAlg, (err, hash) => { if (err) { return cb(err) } - const cid = new CID(cidVersion, codecName[multicodec.toString('16')], multihash) - msg.addBlock(cid, block) + + const cid = new CID(cidVersion, codecName[multicodec.toString('16')], hash) + + msg.addBlock(new Block(p.data, cid)) cb() }) }, (err) => { @@ -200,8 +200,8 @@ BitswapMessage.deserialize = (raw, callback) => { } callback(null, msg) }) - return } + callback(null, msg) } diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js index c12f8ebe..edb6d436 100644 --- a/src/types/wantlist/index.js +++ b/src/types/wantlist/index.js @@ -1,5 +1,6 @@ 'use strict' +const sort = require('lodash.sortby') const Entry = require('./entry') class Wantlist { @@ -52,7 +53,11 @@ class Wantlist { } sortedEntries () { - return new Map(Array.from(this.set.entries()).sort()) + return new Map( + sort(Array.from(this.set.entries()), (o) => { + return o[1].key + }) + ) } contains (cid) { diff --git a/test/browser.js b/test/browser.js index c23f79bc..f06d62f1 100644 --- a/test/browser.js +++ b/test/browser.js @@ -1,12 +1,7 @@ 'use strict' -const eachSeries = require('async/eachSeries') -const Store = require('idb-pull-blob-store') -const _ = require('lodash') const IPFSRepo = require('ipfs-repo') -const pull = require('pull-stream') - -const repoContext = require.context('buffer!./test-repo', true) +const series = require('async/series') const idb = self.indexedDB || self.mozIndexedDB || @@ -17,35 +12,16 @@ const idb = self.indexedDB || let dbs = [] function createRepo (id, done) { - const repoData = [] - repoContext.keys().forEach(function (key) { - repoData.push({ - key: key.replace('./', ''), - value: repoContext(key) - }) - }) - - const mainBlob = new Store(id) - const blocksBlob = new Store(`${id}/blocks`) - dbs.push(id) - eachSeries(repoData, (file, cb) => { - if (_.startsWith(file.key, 'datastore/')) { - return cb() + const repo = new IPFSRepo(id) + series([ + (cb) => repo.init({}, cb), + (cb) => repo.open(cb) + ], (err) => { + if (err) { + return done(err) } - - const blocks = _.startsWith(file.key, 'blocks/') - const blob = blocks ? blocksBlob : mainBlob - - const key = blocks ? file.key.replace(/^blocks\//, '') : file.key - - pull( - pull.values([file.value]), - blob.write(key, cb) - ) - }, () => { - const repo = new IPFSRepo(id, {stores: Store}) done(null, repo) }) } diff --git a/test/components/decision-engine/index-test.js b/test/components/decision-engine/index-test.js index 0b5408cd..b4bff343 100644 --- a/test/components/decision-engine/index-test.js +++ b/test/components/decision-engine/index-test.js @@ -2,17 +2,20 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerId = require('peer-id') const _ = require('lodash') const Block = require('ipfs-block') const parallel = require('async/parallel') const series = require('async/series') const map = require('async/map') +const each = require('async/each') +const waterfall = require('async/waterfall') const eachSeries = require('async/eachSeries') -const pull = require('pull-stream') -const paramap = require('pull-paramap') const CID = require('cids') +const multihashing = require('multihashing-async') const Message = require('../../../src/types/message') const DecisionEngine = require('../../../src/components/decision-engine') @@ -21,7 +24,7 @@ const mockNetwork = require('../../utils').mockNetwork function messageToString (m) { return Array.from(m[1].blocks.values()) - .map((b) => b.block.data.toString()) + .map((b) => b.data.toString()) } function stringifyMessages (messages) { @@ -55,47 +58,38 @@ module.exports = (repo) => { (cb) => newEngine('Ernie', cb), (cb) => newEngine('Bert', cb) ], (err, res) => { - expect(err).to.not.exist + expect(err).to.not.exist() const sender = res[0] const receiver = res[1] - pull( - pull.values(_.range(1000)), - pull.map((i) => { - const content = `this is message ${i}` - return new Block(content) - }), - paramap((block, cb) => { + map(_.range(1000), (i, cb) => { + const data = new Buffer(`this is message ${i}`) + multihashing(data, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + const m = new Message(false) - block.key((err, key) => { - if (err) { - return cb(err) - } - const cid = new CID(key) - m.addBlock(cid, block) - sender.engine.messageSent(receiver.peer, block, cid) - receiver.engine.messageReceived(sender.peer, m, cb) - }) - }, 100), - pull.onEnd((err) => { - expect(err).to.not.exist - - expect(sender.engine.numBytesSentTo(receiver.peer)) - .to.be.above(0) - - expect(sender.engine.numBytesSentTo(receiver.peer)) - .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) - - expect(receiver.engine.numBytesSentTo(sender.peer)) - .to.eql(0) - - expect(sender.engine.numBytesReceivedFrom(receiver.peer)) - .to.eql(0) - - done() + const block = new Block(data, new CID(hash)) + m.addBlock(block) + sender.engine.messageSent(receiver.peer, block) + receiver.engine.messageReceived(sender.peer, m, cb) }) - ) + }, (err) => { + expect(err).to.not.exist() + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.be.above(0) + + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) + + expect(receiver.engine.numBytesSentTo(sender.peer)) + .to.eql(0) + + expect(sender.engine.numBytesReceivedFrom(receiver.peer)) + .to.eql(0) + + done() + }) }) }) @@ -104,7 +98,7 @@ module.exports = (repo) => { (cb) => newEngine('sf', cb), (cb) => newEngine('sea', cb) ], (err, res) => { - expect(err).to.not.exist + expect(err).to.not.exist() const sanfrancisco = res[0] const seattle = res[1] @@ -112,7 +106,7 @@ module.exports = (repo) => { const m = new Message(true) sanfrancisco.engine.messageSent(seattle.peer) seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() expect(seattle.peer.toHexString()) .to.not.eql(sanfrancisco.peer.toHexString()) @@ -137,13 +131,11 @@ module.exports = (repo) => { function partnerWants (dEngine, values, partner, cb) { const message = new Message(false) - const blocks = values.map((k) => new Block(k)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - expect(err).to.not.exist - keys.forEach((key, i) => { - const cid = new CID(key) - message.addEntry(cid, Math.pow(2, 32) - 1 - i) + map(values, (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), (err, hashes) => { + expect(err).to.not.exist() + hashes.forEach((hash, i) => { + message.addEntry(new CID(hash), Math.pow(2, 32) - 1 - i) }) dEngine.messageReceived(partner, message, cb) @@ -152,73 +144,68 @@ module.exports = (repo) => { function partnerCancels (dEngine, values, partner, cb) { const message = new Message(false) - const blocks = values.map((k) => new Block(k)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - expect(err).to.not.exist - keys.forEach((key) => { - const cid = new CID(key) - message.cancel(cid) + map(values, (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), (err, hashes) => { + expect(err).to.not.exist() + hashes.forEach((hash) => { + message.cancel(new CID(hash)) }) dEngine.messageReceived(partner, message, cb) }) } repo.create('p', (err, repo) => { - expect(err).to.not.exist - - pull( - pull.values(alphabet), - pull.asyncMap((l, cb) => { - const block = new Block(l) - block.key((err, key) => { - if (err) { - return cb(err) - } - cb(null, { data: block.data, key: key }) - }) - }), - repo.blockstore.putStream(), - pull.onEnd((err) => { - expect(err).to.not.exist - - eachSeries(_.range(numRounds), (i, cb) => { - // 2 test cases - // a) want alphabet - cancel vowels - // b) want alphabet - cancels everything except vowels - - eachSeries(testCases, (testcase, innerCb) => { - const set = testcase[0] - const cancels = testcase[1] - const keeps = _.difference(set, cancels) - - const network = mockNetwork(1, (res) => { - const msgs = stringifyMessages(res.messages) - expect(msgs.sort()).to.eql(keeps.sort()) - innerCb() - }) - - const dEngine = new DecisionEngine(repo.blockstore, network) - dEngine.start() - - let partner - series([ - (cb) => PeerId.create((err, id) => { - if (err) { - return cb(err) - } - partner = id - cb() - }), - (cb) => partnerWants(dEngine, set, partner, cb), - (cb) => partnerCancels(dEngine, cancels, partner, cb) - ], (err) => { - expect(err).to.not.exist - }) - }, cb) - }, done) - }) - ) + expect(err).to.not.exist() + + waterfall([ + (cb) => map( + alphabet, + (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), + cb + ), + (hashes, cb) => each( + hashes.map((h, i) => { + return new Block(new Buffer(alphabet[i]), new CID(h)) + }), + (b, cb) => repo.blockstore.put(b, cb), + cb + ), + (cb) => eachSeries(_.range(numRounds), (i, cb) => { + // 2 test cases + // a) want alphabet - cancel vowels + // b) want alphabet - cancels everything except vowels + + eachSeries(testCases, (testcase, innerCb) => { + const set = testcase[0] + const cancels = testcase[1] + const keeps = _.difference(set, cancels) + + const network = mockNetwork(1, (res) => { + const msgs = stringifyMessages(res.messages) + expect(msgs.sort()).to.eql(keeps.sort()) + innerCb() + }) + + const dEngine = new DecisionEngine(repo.blockstore, network) + dEngine.start() + + let partner + series([ + (cb) => PeerId.create((err, id) => { + if (err) { + return cb(err) + } + partner = id + cb() + }), + (cb) => partnerWants(dEngine, set, partner, cb), + (cb) => partnerCancels(dEngine, cancels, partner, cb) + ], (err) => { + expect(err).to.not.exist() + }) + }, cb) + }, cb) + ], done) }) }) @@ -228,9 +215,6 @@ module.exports = (repo) => { b.fill(i) return b }) - const blocks = _.range(10).map((i) => { - return new Block(data[i]) - }) const net = mockNetwork(5, (res) => { expect(res.messages).to.have.length(5) @@ -239,30 +223,29 @@ module.exports = (repo) => { parallel([ (cb) => newEngine('sf', cb, net), - (cb) => map(blocks, (b, cb) => b.key(cb), cb) + (cb) => map(data, (d, cb) => multihashing(d, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + cb(null, new Block(d, new CID(hash))) + }), cb) ], (err, res) => { - expect(err).to.not.exist + expect(err).to.not.exist() const sf = res[0].engine - const cids = res[1].map((c) => new CID(c)) const id = res[0].peer - pull( - pull.values(blocks.map((b, i) => ({ - data: b.data, key: cids[i].multihash - }))), - sf.blockstore.putStream(), - pull.onEnd((err) => { - expect(err).to.not.exist - const msg = new Message(false) - cids.forEach((c, i) => { - msg.addEntry(c, Math.pow(2, 32) - 1 - i) - }) - - sf.messageReceived(id, msg, (err) => { - expect(err).to.not.exist - }) + const blocks = res[1] + const cids = blocks.map((b) => b.cid) + + each(blocks, (b, cb) => sf.blockstore.put(b, cb), (err) => { + expect(err).to.not.exist() + const msg = new Message(false) + cids.forEach((c, i) => { + msg.addEntry(c, Math.pow(2, 32) - 1 - i) }) - ) + + sf.messageReceived(id, msg, (err) => { + expect(err).to.not.exist() + }) + }) }) }) }) diff --git a/test/components/decision-engine/ledger.spec.js b/test/components/decision-engine/ledger.spec.js index 11c2a89d..8bba6e93 100644 --- a/test/components/decision-engine/ledger.spec.js +++ b/test/components/decision-engine/ledger.spec.js @@ -1,7 +1,9 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerId = require('peer-id') const Ledger = require('../../../src/components/decision-engine/ledger') diff --git a/test/components/network/gen-bitswap-network.node.js b/test/components/network/gen-bitswap-network.node.js index ad868337..67f49602 100644 --- a/test/components/network/gen-bitswap-network.node.js +++ b/test/components/network/gen-bitswap-network.node.js @@ -2,7 +2,9 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const series = require('async/series') const parallel = require('async/parallel') const map = require('async/map') @@ -10,10 +12,11 @@ const each = require('async/each') const _ = require('lodash') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer -const pull = require('pull-stream') const crypto = require('crypto') -const utils = require('../../utils') const CID = require('cids') +const multihashing = require('multihashing-async') + +const utils = require('../../utils') describe('gen Bitswap network', function () { // CI is very slow @@ -21,7 +24,7 @@ describe('gen Bitswap network', function () { it('retrieves local blocks', (done) => { utils.genBitswapNetwork(1, (err, nodes) => { - expect(err).to.not.exist + expect(err).to.not.exist() const node = nodes[0] let blocks @@ -30,51 +33,30 @@ describe('gen Bitswap network', function () { (cb) => map(_.range(100), (k, cb) => { const b = Buffer.alloc(1024) b.fill(k) - cb(null, new Block(b)) + multihashing(b, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + const cid = new CID(hash) + cb(null, new Block(b, cid)) + }) }, (err, _blocks) => { - if (err) { - return cb(err) - } + expect(err).to.not.exist() blocks = _blocks cb() }), - (cb) => { - pull( - pull.values(blocks), - pull.asyncMap((block, cb) => { - block.key((err, key) => { - if (err) { - return cb(err) - } - - cb(null, { - block: block, - cid: new CID(key) - }) - }) - }), - node.bitswap.putStream(), - pull.onEnd(cb) - ) - }, - (cb) => { - each(_.range(100), (i, cb) => { - map(blocks, (block, cb) => block.key(cb), (err, keys) => { - const cids = keys.map((key) => new CID(key)) - expect(err).to.not.exist - pull( - node.bitswap.getStream(cids), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.have.length(blocks.length) - cb() - }) - ) - }) - }, cb) - } + (cb) => each( + blocks, + (b, cb) => node.bitswap.put(b, cb), + cb + ), + (cb) => map(_.range(100), (i, cb) => { + node.bitswap.get(blocks[i].cid, cb) + }, (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(blocks.length) + cb() + }) ], (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() node.bitswap.stop() node.libp2p.stop(done) }) @@ -85,11 +67,11 @@ describe('gen Bitswap network', function () { it('with 2 nodes', (done) => { const n = 2 utils.genBitswapNetwork(n, (err, nodeArr) => { - expect(err).to.not.exist + expect(err).to.not.exist() nodeArr.forEach((node) => { expect( Object.keys(node.libp2p.swarm.conns) - ).to.be.empty + ).to.be.empty() expect( Object.keys(node.libp2p.swarm.muxedConns) @@ -114,49 +96,40 @@ describe('gen Bitswap network', function () { function round (nodeArr, n, cb) { const blockFactor = 10 - const blocks = createBlocks(n, blockFactor) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { + createBlocks(n, blockFactor, (err, blocks) => { if (err) { return cb(err) } - const cids = keys.map((k) => new CID(k)) + const cids = blocks.map((b) => b.cid) let d series([ // put blockFactor amount of blocks per node - (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { + (cb) => parallel(_.map(nodeArr, (node, i) => (cb) => { node.bitswap.start() const data = _.map(_.range(blockFactor), (j) => { const index = i * blockFactor + j - return { - block: blocks[index], - cid: cids[index] - } + return blocks[index] }) - each( - data, - (d, cb) => node.bitswap.put(d, cb), - callback - ) + + each(data, (d, cb) => node.bitswap.put(d, cb), cb) }), cb), (cb) => { d = (new Date()).getTime() - cb() - }, - // fetch all blocks on every node - (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { - pull( - node.bitswap.getStream(cids), - pull.collect((err, res) => { + // fetch all blocks on every node + parallel(_.map(nodeArr, (node, i) => (cb) => { + map(cids, (cid, cb) => { + node.bitswap.get(cid, cb) + }, (err, res) => { if (err) { - return callback(err) + return cb(err) } expect(res).to.have.length(blocks.length) - callback() + cb() }) - ) - }), cb) + }), cb) + } ], (err) => { if (err) { return cb(err) @@ -167,8 +140,15 @@ function round (nodeArr, n, cb) { }) } -function createBlocks (n, blockFactor) { - return _.map(_.range(n * blockFactor), (k) => { - return new Block(crypto.randomBytes(n * blockFactor)) - }) +function createBlocks (n, blockFactor, callback) { + map(_.map(_.range(n * blockFactor), (k) => { + return crypto.randomBytes(n * blockFactor) + }), (d, cb) => { + multihashing(d, 'sha2-256', (err, hash) => { + if (err) { + return cb(err) + } + cb(null, new Block(d, new CID(hash))) + }) + }, callback) } diff --git a/test/components/network/network.node.js b/test/components/network/network.node.js index b40fca81..2963b3d3 100644 --- a/test/components/network/network.node.js +++ b/test/components/network/network.node.js @@ -4,13 +4,16 @@ const Node = require('libp2p-ipfs-nodejs') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerBook = require('peer-book') -const Block = require('ipfs-block') const lp = require('pull-length-prefixed') const pull = require('pull-stream') const parallel = require('async/parallel') -const CID = require('cids') +const map = require('async/map') +const _ = require('lodash') +const utils = require('../../utils') const Network = require('../../../src/components/network') const Message = require('../../../src/types/message') @@ -38,7 +41,8 @@ describe('network', () => { parallel([ (cb) => PeerInfo.create(cb), (cb) => PeerInfo.create(cb), - (cb) => PeerInfo.create(cb) + (cb) => PeerInfo.create(cb), + (cb) => map(_.range(2), (i, cb) => utils.makeBlock(cb), cb) ], (err, results) => { if (err) { return done(err) @@ -48,7 +52,7 @@ describe('network', () => { peerInfoB = results[1] peerInfoC = results[2] - blocks = ['hello', 'world'].map((b) => new Block(b)) + blocks = results[3] const maA = multiaddr('/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String()) const maB = multiaddr('/ip4/127.0.0.1/tcp/10300/ipfs/' + peerInfoB.id.toB58String()) @@ -126,9 +130,9 @@ describe('network', () => { // only bitswap100 networkC = new Network(libp2pNodeC, peerBookC, bitswapMockC, true) - expect(networkA).to.exist - expect(networkB).to.exist - expect(networkC).to.exist + expect(networkA).to.exist() + expect(networkB).to.exist() + expect(networkC).to.exist() networkA.start() networkB.start() @@ -139,7 +143,7 @@ describe('network', () => { it('connectTo fail', (done) => { networkA.connectTo(peerInfoB.id, (err) => { - expect(err).to.exist + expect(err).to.exist() done() }) }) @@ -162,7 +166,7 @@ describe('network', () => { } libp2pNodeA.dialByPeerInfo(peerInfoB, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() }) function finish () { @@ -174,7 +178,7 @@ describe('network', () => { it('connectTo success', (done) => { networkA.connectTo(peerInfoB.id, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() done() }) }) @@ -184,41 +188,31 @@ describe('network', () => { const b1 = blocks[0] const b2 = blocks[1] - b1.key((err, key1) => { - expect(err).to.not.exist - const cid1 = new CID(key1) - msg.addEntry(cid1, 0, false) - msg.addBlock(cid1, b1) - - b2.key((err, key2) => { - expect(err).to.not.exist - const cid2 = new CID(key2) + msg.addEntry(b1.cid, 0, false) + msg.addBlock(b1) + msg.addBlock(b2) - msg.addBlock(cid2, b2) + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.eql(msgReceived) - bitswapMockB._receiveMessage = () => {} - bitswapMockB._receiveError = () => {} - done() - } + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist() + } - bitswapMockB._receiveError = (err) => { - expect(err).to.not.exist - } + libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { + expect(err).to.not.exist() - libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { - expect(err).to.not.exist - - pull( - pull.values([ - msg.serializeToBitswap100() - ]), - lp.encode(), - conn - ) - }) - }) + pull( + pull.values([ + msg.serializeToBitswap100() + ]), + lp.encode(), + conn + ) }) }) @@ -227,41 +221,31 @@ describe('network', () => { const b1 = blocks[0] const b2 = blocks[1] - b1.key((err, key1) => { - expect(err).to.not.exist - const cid1 = new CID(key1) - msg.addEntry(cid1, 0, false) - msg.addBlock(cid1, b1) + msg.addEntry(b1.cid, 0, false) + msg.addBlock(b1) + msg.addBlock(b2) - b2.key((err, key2) => { - expect(err).to.not.exist - const cid2 = new CID(key2) - - msg.addBlock(cid2, b2) + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.eql(msgReceived) - bitswapMockB._receiveMessage = () => {} - bitswapMockB._receiveError = () => {} - done() - } + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist() + } - bitswapMockB._receiveError = (err) => { - expect(err).to.not.exist - } + libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.1.0', (err, conn) => { + expect(err).to.not.exist() - libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.1.0', (err, conn) => { - expect(err).to.not.exist - - pull( - pull.values([ - msg.serializeToBitswap110() - ]), - lp.encode(), - conn - ) - }) - }) + pull( + pull.values([ + msg.serializeToBitswap110() + ]), + lp.encode(), + conn + ) }) }) @@ -270,33 +254,23 @@ describe('network', () => { const b1 = blocks[0] const b2 = blocks[1] - b1.key((err, key1) => { - expect(err).to.not.exist - const cid1 = new CID(key1) - msg.addEntry(cid1, 0, false) - msg.addBlock(cid1, b1) - - b2.key((err, key2) => { - expect(err).to.not.exist - const cid2 = new CID(key2) + msg.addEntry(b1.cid, 0, false) + msg.addBlock(b1) + msg.addBlock(b2) - msg.addBlock(cid2, b2) - - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.eql(msgReceived) - bitswapMockB._receiveMessage = () => {} - bitswapMockB._receiveError = () => {} - done() - } + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } - bitswapMockB._receiveError = (err) => { - expect(err).to.not.exist - } + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist() + } - networkA.sendMessage(peerInfoB.id, msg, (err) => { - expect(err).to.not.exist - }) - }) + networkA.sendMessage(peerInfoB.id, msg, (err) => { + expect(err).to.not.exist() }) }) @@ -318,7 +292,7 @@ describe('network', () => { } libp2pNodeA.dialByPeerInfo(peerInfoC, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() }) function finish () { @@ -333,33 +307,23 @@ describe('network', () => { const b1 = blocks[0] const b2 = blocks[1] - b1.key((err, key1) => { - expect(err).to.not.exist - const cid1 = new CID(key1) - msg.addEntry(cid1, 0, false) - msg.addBlock(cid1, b1) - - b2.key((err, key2) => { - expect(err).to.not.exist - const cid2 = new CID(key2) + msg.addEntry(b1.cid, 0, false) + msg.addBlock(b1) + msg.addBlock(b2) - msg.addBlock(cid2, b2) - - bitswapMockC._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.eql(msgReceived) - bitswapMockC._receiveMessage = () => {} - bitswapMockC._receiveError = () => {} - done() - } + bitswapMockC._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockC._receiveMessage = () => {} + bitswapMockC._receiveError = () => {} + done() + } - bitswapMockC._receiveError = (err) => { - expect(err).to.not.exist - } + bitswapMockC._receiveError = (err) => { + expect(err).to.not.exist() + } - networkA.sendMessage(peerInfoC.id, msg, (err) => { - expect(err).to.not.exist - }) - }) + networkA.sendMessage(peerInfoC.id, msg, (err) => { + expect(err).to.not.exist() }) }) }) diff --git a/test/components/wantmanager/index.spec.js b/test/components/wantmanager/index.spec.js index cd0b09e0..0cf00c20 100644 --- a/test/components/wantmanager/index.spec.js +++ b/test/components/wantmanager/index.spec.js @@ -1,19 +1,21 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerId = require('peer-id') const parallel = require('async/parallel') const series = require('async/series') const map = require('async/map') -const Block = require('ipfs-block') -const CID = require('cids') +const _ = require('lodash') const cs = require('../../../src/constants') const Message = require('../../../src/types/message') const WantManager = require('../../../src/components/want-manager') -const mockNetwork = require('../../utils').mockNetwork +const utils = require('../../utils') +const mockNetwork = utils.mockNetwork describe('WantManager', () => { it('sends wantlist to all connected peers', (done) => { @@ -24,13 +26,10 @@ describe('WantManager', () => { (cb) => PeerId.create(cb), (cb) => PeerId.create(cb), (cb) => { - const data = ['1', '2', '3'] - blocks = data.map((d) => new Block(d)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - if (err) { - return done(err) - } - cids = keys.map((key) => new CID(key)) + map(_.range(3), (i, cb) => utils.makeBlock(cb), (err, res) => { + expect(err).to.not.exist() + blocks = res + cids = blocks.map((b) => b.cid) cb() }) } @@ -89,7 +88,7 @@ describe('WantManager', () => { }, (cb) => setTimeout(cb, 200) ], (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() wantManager.wantBlocks([cid3]) }) }) diff --git a/test/components/wantmanager/msg-queue.spec.js b/test/components/wantmanager/msg-queue.spec.js index 5c4215cd..05ff6499 100644 --- a/test/components/wantmanager/msg-queue.spec.js +++ b/test/components/wantmanager/msg-queue.spec.js @@ -1,38 +1,36 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerId = require('peer-id') const map = require('async/map') const parallel = require('async/parallel') -const Block = require('ipfs-block') const CID = require('cids') +const multihashing = require('multihashing-async') const Message = require('../../../src/types/message') const MsgQueue = require('../../../src/components/want-manager/msg-queue') describe('MessageQueue', () => { let peerId - let blocks let cids before((done) => { parallel([ (cb) => { PeerId.create((err, _peerId) => { - expect(err).to.not.exist + expect(err).to.not.exist() peerId = _peerId cb() }) }, (cb) => { - const data = ['1', '2', '3', '4', '5', '6'] - blocks = data.map((d) => new Block(d)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - if (err) { - return done(err) - } - cids = keys.map((key) => new CID(key)) + const data = ['1', '2', '3', '4', '5', '6'].map((d) => new Buffer(d)) + map(data, (d, cb) => multihashing(d, 'sha2-256', cb), (err, hashes) => { + expect(err).to.not.exist() + cids = hashes.map((h) => new CID(h)) cb() }) } diff --git a/test/index-test.js b/test/index-test.js index fe1de8be..fb1cf71e 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -9,19 +9,31 @@ const map = require('async/map') const parallel = require('async/parallel') const setImmediate = require('async/setImmediate') const _ = require('lodash') -const expect = require('chai').expect +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const PeerId = require('peer-id') -const Block = require('ipfs-block') const PeerBook = require('peer-book') -const pull = require('pull-stream') -const CID = require('cids') const Message = require('../src/types/message') const Bitswap = require('../src') const utils = require('./utils') +const makeBlock = utils.makeBlock -const makeBlock = (cb) => cb(null, new Block(`hello world ${Math.random()}`)) +const hasBlocks = (msg, store, cb) => { + each(msg.blocks.values(), (b, cb) => { + store.has(b.cid, (err, has) => { + if (err) { + return cb(err) + } + if (!has) { + return cb(new Error('missing block')) + } + cb() + }) + }, cb) +} module.exports = (repo) => { const libp2pMock = { @@ -36,7 +48,6 @@ module.exports = (repo) => { describe('bitswap', () => { let store let blocks - let cids let ids before((done) => { @@ -53,13 +64,7 @@ module.exports = (repo) => { blocks = results[1] ids = results[2] - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - if (err) { - return done(err) - } - cids = keys.map((key) => new CID(key)) - done() - }) + done() }) }) @@ -77,35 +82,23 @@ module.exports = (repo) => { const b1 = blocks[0] const b2 = blocks[1] - const cid1 = cids[0] - const cid2 = cids[1] const msg = new Message(false) - msg.addBlock(cid1, b1) - msg.addBlock(cid2, b2) + msg.addBlock(b1) + msg.addBlock(b2) bs._receiveMessage(other, msg, (err) => { - if (err) { - throw err - } - + expect(err).to.not.exist() expect(bs.blocksRecvd).to.equal(2) expect(bs.dupBlocksRecvd).to.equal(0) - pull( - pull.values([cid1, cid2]), - pull.map((cid) => store.getStream(cid.multihash)), - pull.flatten(), - pull.collect((err, blocks) => { - if (err) { - return done(err) - } - - expect(blocks[0].data).to.eql(b1.data) - expect(blocks[1].data).to.eql(b2.data) - done() - }) - ) + map([b1.cid, b2.cid], (cid, cb) => store.get(cid, cb), (err, blocks) => { + expect(err).to.not.exist() + + expect(blocks[0].data).to.eql(b1.data) + expect(blocks[1].data).to.eql(b2.data) + done() + }) }) }) @@ -115,24 +108,24 @@ module.exports = (repo) => { bs.start() const other = ids[1] - const cid1 = cids[0] - const cid2 = cids[1] + const b1 = blocks[0] + const b2 = blocks[1] const msg = new Message(false) - msg.addEntry(cid1, 1, false) - msg.addEntry(cid2, 1, false) + msg.addEntry(b1.cid, 1, false) + msg.addEntry(b2.cid, 1, false) bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() expect(bs.blocksRecvd).to.be.eql(0) expect(bs.dupBlocksRecvd).to.be.eql(0) const wl = bs.wantlistForPeer(other) - expect(wl.has(cid1.buffer.toString())).to.eql(true) - expect(wl.has(cid2.buffer.toString())).to.eql(true) + expect(wl.has(b1.cid.buffer.toString())).to.eql(true) + expect(wl.has(b2.cid.buffer.toString())).to.eql(true) done() }) @@ -144,13 +137,12 @@ module.exports = (repo) => { let others let blocks - let cids bs.start() parallel([ (cb) => map(_.range(5), (i, cb) => PeerId.create(cb), cb), - (cb) => cb(null, _.range(10).map((i) => new Block(`hello ${i}`))) + (cb) => map(_.range(10), (i, cb) => makeBlock(cb), cb) ], (err, results) => { if (err) { return done(err) @@ -158,38 +150,23 @@ module.exports = (repo) => { others = results[0] blocks = results[1] - - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - if (err) { - return done(err) - } - cids = keys.map((key) => new CID(key)) - test() - }) + test() }) function test () { map(_.range(5), (i, cb) => { const msg = new Message(false) - - each([ - { block: blocks[i], cid: cids[i] }, - { block: blocks[5 + i], cid: cids[5 + i] } - ], (blockAndCid, cb) => { - msg.addBlock(blockAndCid.cid, blockAndCid.block) - cb() - }, (err) => { - expect(err).to.not.exist - cb(null, msg) - }) + msg.addBlock(blocks[i]) + msg.addBlock(blocks[5 + 1]) + cb(null, msg) }, (err, messages) => { - expect(err).to.not.exist + expect(err).to.not.exist() let i = 0 eachSeries(others, (other, cb) => { const msg = messages[i] i++ bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist + expect(err).to.not.exist() hasBlocks(msg, store, cb) }) }, done) @@ -198,73 +175,40 @@ module.exports = (repo) => { }) }) - describe('getStream', () => { + describe('get', () => { it('block exists locally', (done) => { const block = blocks[4] - const cid = cids[4] - - pull( - pull.values([ - { data: block.data, key: cid.multihash } - ]), - store.putStream(), - pull.onEnd((err) => { - if (err) { - return done(err) - } - - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - pull( - bs.getStream(cid), - pull.collect((err, res) => { - if (err) { - return done(err) - } + store.put(block, (err) => { + expect(err).to.not.exist() + const book = new PeerBook() + const bs = new Bitswap(libp2pMock, store, book) - expect(res[0].data).to.eql(block.data) - done() - }) - ) + bs.get(block.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.eql(block) + done() }) - ) + }) }) it('blocks exist locally', (done) => { const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] - const cid1 = cids[5] - const cid2 = cids[6] - const cid3 = cids[7] - - pull( - pull.values([ - { data: b1.data, key: cid1.multihash }, - { data: b2.data, key: cid2.multihash }, - { data: b3.data, key: cid3.multihash } - ]), - store.putStream(), - pull.onEnd((err) => { - expect(err).to.not.exist - - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - - pull( - bs.getStream([cid1, cid2, cid3]), - pull.collect((err, res) => { - expect(err).to.not.exist - - expect(res[0].data).to.eql(b1.data) - expect(res[1].data).to.eql(b2.data) - expect(res[2].data).to.eql(b3.data) - done() - }) - ) + + store.putMany([b1, b2, b3], (err) => { + expect(err).to.not.exist() + + const book = new PeerBook() + const bs = new Bitswap(libp2pMock, store, book) + + map([b1.cid, b2.cid, b3.cid], (cid, cb) => bs.get(cid, cb), (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql([b1, b2, b3]) + done() }) - ) + }) }) it('block is added locally afterwards', (done) => { @@ -278,25 +222,15 @@ module.exports = (repo) => { bs.engine.network = net bs.start() - block.key((err, key) => { - expect(err).to.not.exist - const cid = new CID(key) - pull( - bs.getStream(cid), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res[0].data).to.be.eql(block.data) - done() - }) - ) - - setTimeout(() => { - bs.put({ - block: block, - cid: cid - }, () => {}) - }, 200) + bs.get(block.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql(block) + done() }) + + setTimeout(() => { + bs.put(block, () => {}) + }, 200) }) it('block is sent after local add', (done) => { @@ -358,28 +292,16 @@ module.exports = (repo) => { bs1._onPeerConnected(other) bs2._onPeerConnected(me) - block.key((err, key) => { - expect(err).to.not.exist - const cid = new CID(key) - pull( - bs1.getStream(cid), - pull.collect((err, res) => { - expect(err).to.not.exist - cb(null, res[0]) - }) - ) - - setTimeout(() => { - bs2.put({ - block: block, - cid: cid - }) - }, 1000) + bs1.get(block.cid, (err, res) => { + expect(err).to.not.exist() + cb(null, res) }) + setTimeout(() => { + bs2.put(block, () => {}) + }, 1000) }, (res, cb) => { - // TODO: Ask Fridel if this is what he really meant - expect(res).to.eql(res) + expect(res).to.eql(block) cb() } ], done) @@ -412,40 +334,20 @@ module.exports = (repo) => { done() } } - b.key((err, key) => { - expect(err).to.not.exist - pull( - bs.getStream(new CID(key)), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.be.empty - finish() - }) - ) - pull( - bs.getStream(new CID(key)), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.be.empty - finish() - }) - ) - setTimeout(() => bs.unwant(new CID(key)), 10) + bs.get(b.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.empty() + finish() }) + bs.get(b.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.empty() + finish() + }) + + setTimeout(() => bs.unwant(b.cid), 10) }) }) }) } - -function hasBlocks (msg, store, cb) { - each(msg.blocks.values(), (b, next) => { - b.block.key((err, key) => { - if (err) { - return next(err) - } - - store.has(key, next) - }) - }, cb) -} diff --git a/test/node.js b/test/node.js index 91ec1c08..eac5f25b 100644 --- a/test/node.js +++ b/test/node.js @@ -4,7 +4,6 @@ const IPFSRepo = require('ipfs-repo') const path = require('path') const ncp = require('ncp') const rimraf = require('rimraf') -const Store = require('fs-pull-blob-store') const testRepoPath = path.join(__dirname, 'test-repo') const each = require('async/each') @@ -17,9 +16,14 @@ function createRepo (id, done) { ncp(testRepoPath, repoPath, (err) => { if (err) return done(err) - const repo = new IPFSRepo(repoPath, {stores: Store}) - repos.push(repoPath) - done(null, repo) + const repo = new IPFSRepo(repoPath) + repo.open((err) => { + if (err) { + return done(err) + } + repos.push(repoPath) + done(null, repo) + }) }) } diff --git a/test/test-repo/version b/test/test-repo/version index 00750edc..7ed6ff82 100644 --- a/test/test-repo/version +++ b/test/test-repo/version @@ -1 +1 @@ -3 +5 diff --git a/test/types/message.spec.js b/test/types/message.spec.js index c3650e02..fc5add5f 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -2,33 +2,34 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const expect = require('chai').expect -const Block = require('ipfs-block') +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const protobuf = require('protocol-buffers') const map = require('async/map') -const pbm = protobuf(require('../../src/types/message/message.proto')) const CID = require('cids') const isNode = require('detect-node') +const _ = require('lodash') const loadFixture = require('aegir/fixtures') const testDataPath = (isNode ? '../' : '') + 'test-data/serialized-from-go' const rawMessageFullWantlist = loadFixture(__dirname, testDataPath + '/bitswap110-message-full-wantlist') const rawMessageOneBlock = loadFixture(__dirname, testDataPath + '/bitswap110-message-one-block') +const pbm = protobuf(require('../../src/types/message/message.proto')) + const BitswapMessage = require('../../src/types/message') +const utils = require('../utils') describe('BitswapMessage', () => { let blocks let cids before((done) => { - const data = ['foo', 'hello', 'world'] - blocks = data.map((d) => new Block(d)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - if (err) { - return done(err) - } - cids = keys.map((key) => new CID(key)) + map(_.range(3), (i, cb) => utils.makeBlock(cb), (err, res) => { + expect(err).to.not.exist() + blocks = res + cids = blocks.map((b) => b.cid) done() }) }) @@ -48,18 +49,16 @@ describe('BitswapMessage', () => { it('.serializeToBitswap100', () => { const block = blocks[1] - const cid = cids[1] const msg = new BitswapMessage(true) - msg.addBlock(cid, block) + msg.addBlock(block) const serialized = msg.serializeToBitswap100() expect(pbm.Message.decode(serialized).blocks).to.eql([block.data]) }) it('.serializeToBitswap110', () => { const block = blocks[1] - const cid = cids[1] const msg = new BitswapMessage(true) - msg.addBlock(cid, block) + msg.addBlock(block) const serialized = msg.serializeToBitswap110() const decoded = pbm.Message.decode(serialized) @@ -90,7 +89,7 @@ describe('BitswapMessage', () => { }) BitswapMessage.deserialize(raw, (err, msg) => { - expect(err).to.not.exist + expect(err).to.not.exist() expect(msg.full).to.equal(true) expect(Array.from(msg.wantlist)) .to.eql([[ @@ -99,7 +98,7 @@ describe('BitswapMessage', () => { ]]) expect( - Array.from(msg.blocks).map((b) => [b[0], b[1].block.data]) + Array.from(msg.blocks).map((b) => [b[0], b[1].data]) ).to.eql([ [cid1.buffer.toString(), b1.data], [cid2.buffer.toString(), b2.data] @@ -135,7 +134,7 @@ describe('BitswapMessage', () => { }) BitswapMessage.deserialize(raw, (err, msg) => { - expect(err).to.not.exist + expect(err).to.not.exist() expect(msg.full).to.equal(true) expect(Array.from(msg.wantlist)) .to.eql([[ @@ -144,7 +143,7 @@ describe('BitswapMessage', () => { ]]) expect( - Array.from(msg.blocks).map((b) => [b[0], b[1].block.data]) + Array.from(msg.blocks).map((b) => [b[0], b[1].data]) ).to.eql([ [cid1.buffer.toString(), b1.data], [cid2.buffer.toString(), b2.data] @@ -163,8 +162,8 @@ describe('BitswapMessage', () => { m.addEntry(cid, 1) expect(m.wantlist.size).to.be.eql(1) - m.addBlock(cid, b) - m.addBlock(cid, b) + m.addBlock(b) + m.addBlock(b) expect(m.blocks.size).to.be.eql(1) done() }) @@ -191,8 +190,8 @@ describe('BitswapMessage', () => { m1.addEntry(cid, 1) m2.addEntry(cid, 1) - m1.addBlock(cid, b) - m2.addBlock(cid, b) + m1.addBlock(b) + m2.addBlock(b) expect(m1.equals(m2)).to.equal(true) done() }) @@ -206,8 +205,8 @@ describe('BitswapMessage', () => { m1.addEntry(cid, 100) m2.addEntry(cid, 3750) - m1.addBlock(cid, b) - m2.addBlock(cid, b) + m1.addBlock(b) + m2.addBlock(b) expect(m1.equals(m2)).to.equal(false) done() }) @@ -251,7 +250,7 @@ describe('BitswapMessage', () => { msg.addEntry(cid, 10) BitswapMessage.deserialize(goEncoded, (err, res) => { - expect(err).to.not.exist + expect(err).to.not.exist() expect(res).to.eql(msg) expect(msg.serializeToBitswap100()).to.eql(goEncoded) done() @@ -264,7 +263,7 @@ describe('BitswapMessage', () => { // payload but empty it('full wantlist message', (done) => { BitswapMessage.deserialize(rawMessageFullWantlist, (err, message) => { - expect(err).to.not.exist + expect(err).to.not.exist() // TODO // check the deserialised message done() @@ -273,7 +272,7 @@ describe('BitswapMessage', () => { it('one block message', (done) => { BitswapMessage.deserialize(rawMessageOneBlock, (err, message) => { - expect(err).to.not.exist + expect(err).to.not.exist() // TODO // check the deserialised message done() diff --git a/test/types/wantlist.spec.js b/test/types/wantlist.spec.js index 3534f23a..4050adb3 100644 --- a/test/types/wantlist.spec.js +++ b/test/types/wantlist.spec.js @@ -1,179 +1,126 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect -const Block = require('ipfs-block') +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect const map = require('async/map') const CID = require('cids') +const _ = require('lodash') +const multihashing = require('multihashing-async') const Wantlist = require('../../src/types/wantlist') +const utils = require('../utils') describe('Wantlist', () => { let wm let blocks before((done) => { - const data = ['hello', 'world'] - blocks = data.map((d) => new Block(d)) - done() + map(_.range(2), (i, cb) => utils.makeBlock(cb), (err, res) => { + expect(err).to.not.exist() + blocks = res + done() + }) }) beforeEach(() => { wm = new Wantlist() }) - it('length', (done) => { + it('length', () => { const b1 = blocks[0] const b2 = blocks[1] - map([ - b1, - b2 - ], - (b, cb) => b.key(cb), - (err, keys) => { - expect(err).to.not.exist - wm.add(new CID(keys[0]), 2) - wm.add(new CID(keys[1]), 1) - expect(wm).to.have.length(2) - done() - }) + wm.add(b1.cid, 2) + wm.add(b2.cid, 1) + expect(wm).to.have.length(2) }) describe('remove', () => { - it('removes with a single ref', (done) => { + it('removes with a single ref', () => { const b = blocks[0] - b.key((err, key) => { - expect(err).to.not.exist - wm.add(new CID(key), 1) - wm.remove(new CID(key)) - expect(wm).to.have.length(0) - done() - }) + wm.add(b.cid, 1) + wm.remove(b.cid) + expect(wm).to.have.length(0) }) - it('removes with multiple refs', (done) => { + it('removes with multiple refs', () => { const b1 = blocks[0] const b2 = blocks[1] - map([ - b1, - b2 - ], - (b, cb) => b.key(cb), - (err, keys) => { - expect(err).to.not.exist - const cid1 = new CID(keys[0]) - const cid2 = new CID(keys[1]) - - wm.add(cid1, 1) - wm.add(cid2, 2) + wm.add(b1.cid, 1) + wm.add(b2.cid, 2) - expect(wm).to.have.length(2) + expect(wm).to.have.length(2) - wm.remove(cid2) + wm.remove(b2.cid) - expect(wm).to.have.length(1) + expect(wm).to.have.length(1) - wm.add(cid1, 2) - wm.remove(cid1) + wm.add(b1.cid, 2) + wm.remove(b1.cid) - expect(wm).to.have.length(1) + expect(wm).to.have.length(1) - wm.remove(cid1) - expect(wm).to.have.length(0) - done() - }) + wm.remove(b1.cid) + expect(wm).to.have.length(0) }) - it('ignores non existing removes', (done) => { + it('ignores non existing removes', () => { const b = blocks[0] - b.key((err, key) => { - expect(err).to.not.exist - const cid = new CID(key) - wm.add(cid, 1) - wm.remove(cid) - wm.remove(cid) + wm.add(b.cid, 1) + wm.remove(b.cid) + wm.remove(b.cid) - expect(wm).to.have.length(0) - done() - }) + expect(wm).to.have.length(0) }) }) - it('entries', (done) => { + it('entries', () => { const b = blocks[0] - b.key((err, key) => { - expect(err).to.not.exist - const cid = new CID(key) - wm.add(cid, 2) - expect( - Array.from(wm.entries()) - ).to.be.eql([[ - cid.buffer.toString(), - new Wantlist.Entry(cid, 2) - ]]) - done() - }) + wm.add(b.cid, 2) + expect( + Array.from(wm.entries()) + ).to.be.eql([[ + b.cid.buffer.toString(), + new Wantlist.Entry(b.cid, 2) + ]]) }) - it('sortedEntries', (done) => { + it('sortedEntries', () => { const b1 = blocks[0] const b2 = blocks[1] - map([ - b1, - b2 - ], - (b, cb) => b.key(cb), - (err, keys) => { - expect(err).to.not.exist - const cid1 = new CID(keys[0]) - const cid2 = new CID(keys[1]) - - wm.add(cid1, 1) - wm.add(cid2, 1) + wm.add(b1.cid, 1) + wm.add(b2.cid, 1) - expect( - Array.from(wm.sortedEntries()) - ).to.be.eql([ - [cid1.buffer.toString(), new Wantlist.Entry(cid1, 1)], - [cid2.buffer.toString(), new Wantlist.Entry(cid2, 1)] - ]) - done() - }) + expect( + Array.from(wm.sortedEntries()) + ).to.be.eql([ + [b1.cid.buffer.toString(), new Wantlist.Entry(b1.cid, 1)], + [b2.cid.buffer.toString(), new Wantlist.Entry(b2.cid, 1)] + ]) }) - it('contains', (done) => { + it('contains', () => { const b1 = blocks[0] const b2 = blocks[1] - map([ - b1, - b2 - ], - (b, cb) => b.key(cb), - (err, keys) => { - expect(err).to.not.exist - const cid1 = new CID(keys[0]) - const cid2 = new CID(keys[1]) - - wm.add(cid1, 2) + wm.add(b1.cid, 2) - expect(wm.contains(cid1)).to.exist - expect(wm.contains(cid2)).to.not.exist - done() - }) + expect(wm.contains(b1.cid)).to.exist() + expect(wm.contains(b2.cid)).to.not.exist() }) it('with cidV1', (done) => { const b = blocks[0] - b.key((err, key) => { - expect(err).to.not.exist - const cid = new CID(1, 'dag-pb', key) + multihashing(b.data, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + const cid = new CID(1, 'dag-pb', hash) wm.add(cid, 2) expect( diff --git a/test/utils.js b/test/utils.js index 84868ca9..7a5da3d7 100644 --- a/test/utils.js +++ b/test/utils.js @@ -4,16 +4,20 @@ const each = require('async/each') const eachSeries = require('async/eachSeries') const map = require('async/map') const parallel = require('async/parallel') +const series = require('async/series') const _ = require('lodash') const PeerId = require('peer-id') const PeerInfo = require('peer-info') // const PeerBook = require('peer-book') const multiaddr = require('multiaddr') -const Bitswap = require('../src') const Node = require('libp2p-ipfs-nodejs') const os = require('os') const Repo = require('ipfs-repo') -const Store = require('interface-pull-blob-store') +const multihashing = require('multihashing-async') +const CID = require('cids') +const Block = require('ipfs-block') + +const Bitswap = require('../src') exports.mockNetwork = (calls, done) => { done = done || (() => {}) @@ -137,19 +141,35 @@ exports.genBitswapNetwork = (n, callback) => { const tmpDir = os.tmpdir() netArray.forEach((net, i) => { const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() - net.repo = new Repo(repoPath, { stores: Store }) + net.repo = new Repo(repoPath) }) - // start every libp2pNode each(netArray, (net, cb) => { - net.libp2p.start(cb) + const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() + net.repo = new Repo(repoPath) + + series([ + (cb) => net.repo.init({}, cb), + (cb) => net.repo.open(cb) + ], cb) }, (err) => { if (err) { throw err } - createBitswaps() + startLibp2p() }) + function startLibp2p () { + // start every libp2pNode + each(netArray, (net, cb) => { + net.libp2p.start(cb) + }, (err) => { + if (err) { + throw err + } + createBitswaps() + }) + } // create every BitSwap function createBitswaps () { netArray.forEach((net) => { @@ -181,3 +201,13 @@ exports.genBitswapNetwork = (n, callback) => { } }) } + +exports.makeBlock = (cb) => { + const data = new Buffer(`hello world ${Math.random()}`) + multihashing(data, 'sha2-256', (err, hash) => { + if (err) { + return cb(err) + } + cb(null, new Block(data, new CID(hash))) + }) +}