From f748a1d01777cafed0b288bc2c5861c0aa0be866 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 4 Mar 2020 19:01:39 +0000 Subject: [PATCH 1/5] fix: race condition when requesting the same block twice When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang idefinately. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block. --- package.json | 1 + src/index.js | 25 ++++++++++++++++++------ test/bitswap.js | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 10c587a7..0c86bc8f 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", "safe-buffer": "^5.1.2", + "sinon": "^9.0.0", "stats-lite": "^2.2.0", "uuid": "^3.3.2" }, diff --git a/src/index.js b/src/index.js index dca53913..e9d23bb3 100644 --- a/src/index.js +++ b/src/index.js @@ -101,6 +101,10 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { + if (wasWanted) { + this._sendHaveBlockNotifications(block) + } + return } @@ -287,16 +291,25 @@ class Bitswap { yield block - self.notifications.hasBlock(block) - self.engine.receivedBlocks([block.cid]) - // Note: Don't wait for provide to finish before returning - self.network.provide(block.cid).catch((err) => { - self._log.error('Failed to provide: %s', err.message) - }) + self._sendHaveBlockNotifications(block) } }()) } + /** + * Sends notifications about the arrival of a block + * + * @param {Block} block + */ + _sendHaveBlockNotifications (block) { + this.notifications.hasBlock(block) + this.engine.receivedBlocks([block.cid]) + // Note: Don't wait for provide to finish before returning + this.network.provide(block.cid).catch((err) => { + this._log.error('Failed to provide: %s', err.message) + }) + } + /** * Get the current list of wants. * diff --git a/test/bitswap.js b/test/bitswap.js index de4150db..206c5a49 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -5,6 +5,8 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const delay = require('delay') +const PeerId = require('peer-id') +const sinon = require('sinon') const Bitswap = require('../src') @@ -12,6 +14,7 @@ const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const Message = require('../src/types/message') // Creates a repo + libp2pNode + Bitswap with or without DHT async function createThing (dht) { @@ -70,6 +73,54 @@ describe('bitswap without DHT', function () { finish.assert() }) + + it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => { + // the block we want + const block = await makeBlock() + + // id of a peer with the block we want + const peerId = await PeerId.create({ bits: 512 }) + + // incoming message with requested block from the other peer + const message = new Message(false) + message.addEntry(block.cid, 1, false) + message.addBlock(block) + + // slow blockstore + nodes[0].bitswap.blockstore = { + has: sinon.stub().withArgs(block.cid).returns(false), + putMany: async function * (source) { // eslint-disable-line require-await + yield * source + } + } + + // add the block to our want list + const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) + + // oh look, a peer has sent it to us - this will trigger a `blockstore.putMany` which + // for our purposes is a batch operation so `self.blockstore.has(cid)` will still return + // false even though we've just yielded a block with that cid + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store did not have it + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // another context wants the same block + const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) + + // meanwhile the blockstore finishes it's batch + nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true) + + // here it comes again + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store had it this time + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // both requests should get the block + expect(await wantBlockPromise1).to.deep.equal(block) + expect(await wantBlockPromise2).to.deep.equal(block) + }) }) describe('bitswap with DHT', function () { From 2d8c89214ff70663d1130b1e45964bee6e6e6def Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 5 Mar 2020 14:57:17 +0000 Subject: [PATCH 2/5] chore: use blockstore.put instead of putMany --- src/index.js | 16 +++++++--------- test/bitswap.js | 10 ++++------ 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/index.js b/src/index.js index e9d23bb3..c81964c7 100644 --- a/src/index.js +++ b/src/index.js @@ -283,17 +283,15 @@ class Bitswap { async putMany (blocks) { // eslint-disable-line require-await const self = this - return this.blockstore.putMany(async function * () { - for await (const block of blocks) { - if (await self.blockstore.has(block.cid)) { - continue - } + for await (const block of blocks) { + if (await self.blockstore.has(block.cid)) { + continue + } - yield block + await this.blockstore.put(block) - self._sendHaveBlockNotifications(block) - } - }()) + self._sendHaveBlockNotifications(block) + } } /** diff --git a/test/bitswap.js b/test/bitswap.js index 206c5a49..3b8ea128 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -89,17 +89,15 @@ describe('bitswap without DHT', function () { // slow blockstore nodes[0].bitswap.blockstore = { has: sinon.stub().withArgs(block.cid).returns(false), - putMany: async function * (source) { // eslint-disable-line require-await - yield * source - } + put: sinon.stub() } // add the block to our want list const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) - // oh look, a peer has sent it to us - this will trigger a `blockstore.putMany` which - // for our purposes is a batch operation so `self.blockstore.has(cid)` will still return - // false even though we've just yielded a block with that cid + // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which + // is an async operation so `self.blockstore.has(cid)` will still return false + // until the write has completed await nodes[0].bitswap._receiveMessage(peerId, message) // block store did not have it From 4b22ceb8cf7652ef4f63292b28270eb2a1d58bb9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 5 Mar 2020 17:22:01 -0500 Subject: [PATCH 3/5] feat: use bitswap fetch queue --- src/index.js | 96 +++++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/src/index.js b/src/index.js index c81964c7..9054bbc9 100644 --- a/src/index.js +++ b/src/index.js @@ -58,6 +58,8 @@ class Bitswap { this.wm = new WantManager(this.peerInfo.id, this.network, this._stats) this.notifications = new Notifications(this.peerInfo.id) + + this.fetcher = new Fetcher(this) } get peerInfo () { @@ -101,10 +103,6 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { - if (wasWanted) { - this._sendHaveBlockNotifications(block) - } - return } @@ -193,45 +191,7 @@ class Bitswap { * @returns {Promise>} */ async * getMany (cids) { - let pendingStart = cids.length - const wantList = [] - let promptedNetwork = false - - const fetchFromNetwork = async (cid) => { - wantList.push(cid) - - const blockP = this.notifications.wantBlock(cid) - - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - - const block = await blockP - this.wm.cancelWants([cid]) - - return block - } - - for (const cid of cids) { - const has = await this.blockstore.has(cid) - pendingStart-- - if (has) { - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - yield this.blockstore.get(cid) - - continue - } - - if (!promptedNetwork) { - promptedNetwork = true - this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err)) - } - - // we don't have the block locally so fetch it from the network - yield fetchFromNetwork(cid) - } + yield * this.fetcher.fetchBlocks(cids) } /** @@ -359,4 +319,54 @@ class Bitswap { } } +class Fetcher { + constructor (bitswap) { + this.bitswap = bitswap + this.live = new Map() + } + + async * fetchBlocks (cids) { + const req = { rootCid: cids[0] } + for (const cid of cids) { + this.enqueueFetch(cid, req) + } + + for (const cid of cids) { + yield this.live.get(cid.toString()) + } + } + + enqueueFetch (cid, req) { + const cidstr = cid.toString() + const existing = this.live.get(cidstr) + if (existing) { + return existing + } + + const block = this.get(cid, req) + this.live.set(cidstr, block) + block.finally(() => { + this.live.delete(cidstr) + }) + } + + async get (cid, req) { + const has = await this.bitswap.blockstore.has(cid) + if (has) { + return this.bitswap.blockstore.get(cid) + } + + if (!req.promptedNetwork) { + this.bitswap.network.findAndConnect(req.rootCid).catch((err) => this.bitswap._log.error(err)) + req.promptedNetwork = true + } + + const block = this.bitswap.notifications.wantBlock(cid) + this.bitswap.wm.wantBlocks([cid]) + block.then(() => this.bitswap.wm.cancelWants([cid])) + + return block + } +} + module.exports = Bitswap From 1be4483cea4ab07c7ea03a9a5932f117b444b877 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 6 Mar 2020 13:56:05 -0500 Subject: [PATCH 4/5] refactor: simplify fetcher --- src/index.js | 60 +++++++++++++++++++++++++++++++++++++------------ test/bitswap.js | 42 ++++++++++++++++++++++------------ 2 files changed, 74 insertions(+), 28 deletions(-) diff --git a/src/index.js b/src/index.js index 9054bbc9..615e9394 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') const logger = require('./utils').logger const Stats = require('./stats') +const CID = require('cids') const defaultOptions = { statsEnabled: false, @@ -190,7 +191,7 @@ class Bitswap { * @param {Iterable} cids * @returns {Promise>} */ - async * getMany (cids) { + async * getMany (cids) { // eslint-disable-line require-await yield * this.fetcher.fetchBlocks(cids) } @@ -319,37 +320,68 @@ class Bitswap { } } +/** + * Fetcher fetches blocks from the blockstore or the network, allowing + * multiple concurrent fetches for the same cid. + * @param {Bitswap} bitswap + */ class Fetcher { constructor (bitswap) { this.bitswap = bitswap this.live = new Map() } - async * fetchBlocks (cids) { + /** + * Fetch the list of cids. + * + * @param {Array} cids + * @returns {Iterator>} + */ + async * fetchBlocks (cids) { // eslint-disable-line require-await const req = { rootCid: cids[0] } - for (const cid of cids) { - this.enqueueFetch(cid, req) - } + // Queue up the requests for each CID for (const cid of cids) { - yield this.live.get(cid.toString()) + yield this.enqueueFetch(cid, req) } } + /** + * Add a cid to the fetch queue. + * + * @param {CID} cid + * @param {Object} req - used to keep state across a request for several blocks + * @returns {Promise} + */ enqueueFetch (cid, req) { + if (!CID.isCID(cid)) { + throw new Error('Not a valid cid') + } + + // Check if there is already a live fetch for the block const cidstr = cid.toString() - const existing = this.live.get(cidstr) - if (existing) { - return existing + let blockFetch = this.live.get(cidstr) + if (blockFetch) { + return blockFetch } - const block = this.get(cid, req) - this.live.set(cidstr, block) - block.finally(() => { - this.live.delete(cidstr) - }) + // If not, add one + blockFetch = this.get(cid, req) + this.live.set(cidstr, blockFetch) + + // Clean up the promise once the fetch has completed + blockFetch.finally(() => this.live.delete(cidstr)) + + return blockFetch } + /** + * Get a block from the blockstore or the network. + * + * @param {CID} cid + * @param {Object} req - used to keep state across a request for several blocks + * @returns {Promise} + */ async get (cid, req) { const has = await this.bitswap.blockstore.has(cid) if (has) { diff --git a/test/bitswap.js b/test/bitswap.js index 3b8ea128..c989955c 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -86,34 +86,48 @@ describe('bitswap without DHT', function () { message.addEntry(block.cid, 1, false) message.addBlock(block) - // slow blockstore + // Control when the put completes + const realBlockstore = nodes[0].bitswap.blockstore + let putResolver + const allowPutToProceed = () => { + putResolver(realBlockstore.put(block)) + } + // Create a promise that resolves when the put starts + let onPutCalled + const blockstorePutCalled = new Promise((resolve) => { + onPutCalled = resolve + }) + const unresolvedPut = new Promise((resolve) => { + onPutCalled() + putResolver = resolve + }) nodes[0].bitswap.blockstore = { - has: sinon.stub().withArgs(block.cid).returns(false), - put: sinon.stub() + ...nodes[0].bitswap.blockstore, + put: sinon.stub().withArgs(block).returns(unresolvedPut) } // add the block to our want list const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which - // is an async operation so `self.blockstore.has(cid)` will still return false - // until the write has completed - await nodes[0].bitswap._receiveMessage(peerId, message) + // is an async operation + nodes[0].bitswap._receiveMessage(peerId, message) - // block store did not have it - expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + // Wait for the call to blockstore.put() + // (but don't allow it to proceed yet) + await blockstorePutCalled // another context wants the same block const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) - // meanwhile the blockstore finishes it's batch - nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true) + // Allow the first put to proceed + allowPutToProceed() - // here it comes again - await nodes[0].bitswap._receiveMessage(peerId, message) + // Restore the real blockstore + nodes[0].bitswap.blockstore = realBlockstore - // block store had it this time - expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + // receive the block again + await nodes[0].bitswap._receiveMessage(peerId, message) // both requests should get the block expect(await wantBlockPromise1).to.deep.equal(block) From 0cdb2bb7f3bad4791ca321765e20dbda12a6948b Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 6 Mar 2020 16:03:39 -0500 Subject: [PATCH 5/5] fix: catch sync issues between wantlist and notifier --- src/index.js | 25 ++++++++++++++++++++----- test/bitswap-mock-internals.js | 18 +++++++++++++++++- test/bitswap.js | 11 ++++++----- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/index.js b/src/index.js index 615e9394..f5e71073 100644 --- a/src/index.js +++ b/src/index.js @@ -104,6 +104,11 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { + // When fetching a block, we register with the notifier and then check + // the blockstore, to catch syncing issues between the blockstore and + // wantlist. So inform the notifier that we got the block even though + // it may not have been in the wantlist. + this.notifications.hasBlock(block) return } @@ -188,7 +193,7 @@ class Bitswap { * Fetch a a list of blocks by cid. If the blocks are in the local * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. * - * @param {Iterable} cids + * @param {AsyncIterable} cids * @returns {Promise>} */ async * getMany (cids) { // eslint-disable-line require-await @@ -341,7 +346,7 @@ class Fetcher { const req = { rootCid: cids[0] } // Queue up the requests for each CID - for (const cid of cids) { + for await (const cid of cids) { yield this.enqueueFetch(cid, req) } } @@ -383,21 +388,31 @@ class Fetcher { * @returns {Promise} */ async get (cid, req) { + // Register with the notifier, in case the block arrives while we're + // checking the blockstore for it + const blockNotification = this.bitswap.notifications.wantBlock(cid) + + // If the block is in the local blockstore, return it const has = await this.bitswap.blockstore.has(cid) if (has) { + // Deregister with the notifier + this.bitswap.notifications.unwantBlock(cid) + // Get the block from the blockstore return this.bitswap.blockstore.get(cid) } + // Otherwise query content routing for the block if (!req.promptedNetwork) { this.bitswap.network.findAndConnect(req.rootCid).catch((err) => this.bitswap._log.error(err)) req.promptedNetwork = true } - const block = this.bitswap.notifications.wantBlock(cid) + // Add the block CID to the wantlist this.bitswap.wm.wantBlocks([cid]) - block.then(() => this.bitswap.wm.cancelWants([cid])) + // Remove it from the wantlist when the block arrives + blockNotification.then(() => this.bitswap.wm.cancelWants([cid])) - return block + return blockNotification } } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index f4e56e00..0f52364c 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -190,7 +190,7 @@ describe('bitswap with mocks', function () { expect(retrievedBlocks).to.be.eql([b1, b2, b3]) }) - it('getMany', async () => { + it('multiple get', async () => { const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] @@ -208,6 +208,22 @@ describe('bitswap with mocks', function () { expect(block3).to.eql(b3) }) + it('getMany with iterator', async () => { + const blocks = await makeBlock(3) + + await repo.blocks.putMany(blocks) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + function * it () { + for (const b of blocks) { + yield b.cid + } + } + const fetched = await all(bs.getMany(it())) + + expect(fetched).to.eql(blocks) + }) + it('block is added locally afterwards', async () => { const finish = orderedFinish(2) const block = blocks[9] diff --git a/test/bitswap.js b/test/bitswap.js index c989955c..8acbfa75 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -120,18 +120,19 @@ describe('bitswap without DHT', function () { // another context wants the same block const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) - // Allow the first put to proceed - allowPutToProceed() - // Restore the real blockstore nodes[0].bitswap.blockstore = realBlockstore + // Allow the first put to proceed + allowPutToProceed() + // receive the block again await nodes[0].bitswap._receiveMessage(peerId, message) // both requests should get the block - expect(await wantBlockPromise1).to.deep.equal(block) - expect(await wantBlockPromise2).to.deep.equal(block) + const res = await Promise.all([wantBlockPromise1, wantBlockPromise2]) + expect(res[0]).to.deep.equal(block) + expect(res[1]).to.deep.equal(block) }) })