diff --git a/.aegir.js b/.aegir.js index d096ba1a..f5a5d320 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,25 +1,33 @@ 'use strict' +const path = require('path') -module.exports = { - bundlesize: { maxSize: '68kB' }, - karma: { - files: [{ - pattern: 'test/test-data/**/*', - watched: false, - served: true, - included: false - }] - }, - webpack: { - node: { - // needed by ipfs-repo-migrations - path: true, - - // needed by dependencies of peer-id - stream: true, +/** @type {import('aegir').Options["build"]["config"]} */ +const esbuild = { + // this will inject all the named exports from 'node-globals.js' as globals + inject: [require.resolve('./scripts/node-globals.js')], + plugins: [ + { + name: 'node built ins', // this will make the bundler resolve node builtins to the respective browser polyfill + setup (build) { + build.onResolve({ filter: /^stream$/ }, () => { + return { path: require.resolve('readable-stream') } + }) + } + } + ] +} - // needed by core-util-is - Buffer: true +/** @type {import('aegir').PartialOptions} */ +module.exports = { + test: { + browser :{ + config: { + buildConfig: esbuild + } } + }, + build: { + bundlesizeMax: '61kB', + config: esbuild } } diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..e885421e --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,78 @@ +name: ci +on: + push: + branches: + - master + - 'release/**' + pull_request: + branches: + - master + - 'release/**' + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: npm install + - run: npx aegir lint + - run: npx aegir build + - run: npx aegir dep-check + - uses: ipfs/aegir/actions/bundle-size@master + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + test-node: + needs: check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [windows-latest, ubuntu-latest, macos-latest] + node: [14, 15] + fail-fast: true + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node }} + - run: npm install + - run: npx aegir test -t node --bail --cov + - uses: codecov/codecov-action@v1 + test-chrome: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: microsoft/playwright-github-action@v1 + - run: npm install + - run: npx aegir test -t browser -t webworker --bail -- --exit # TODO remove - https://mochajs.org/#-exit + - uses: codecov/codecov-action@v1 + test-firefox: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: microsoft/playwright-github-action@v1 + - run: npm install + - run: npx aegir test -t browser -t webworker --bail -- --browser firefox + test-webkit: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: microsoft/playwright-github-action@v1 + - run: npm install + - run: npx aegir test -t browser -t webworker --bail --timeout 10000 -- --browser webkit + test-electron-main: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: npm install + - run: npx xvfb-maybe aegir test -t electron-main --bail + test-electron-renderer: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: npm install + - run: npx xvfb-maybe aegir test -t electron-renderer --bail diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml new file mode 100644 index 00000000..0da155e6 --- /dev/null +++ b/.github/workflows/typecheck.yml @@ -0,0 +1,27 @@ +on: + push: + branches: + - master + - main + - default + pull_request: + branches: + - '**' + +name: Typecheck +jobs: + check: + runs-on: ubuntu-latest + strategy: + matrix: + node-version: [15.x] + steps: + - uses: actions/checkout@v1 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - name: Install dependencies + run: npm install + - name: Typecheck + uses: gozala/typescript-error-reporter-action@v1.0.8 diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 2398cfd5..00000000 --- a/.travis.yml +++ /dev/null @@ -1,59 +0,0 @@ -language: node_js -cache: npm - -branches: - only: - - master - - /^release\/.*$/ - -node_js: - - 'lts/*' - - 'node' - -stages: - - check - - test - - cov - -os: - - linux - - osx - - windows - -script: npx nyc -s npm run test:node -- --bail -after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov - -jobs: - include: - - stage: check - script: - - npx aegir build --bundlesize - - npx aegir dep-check - - npm run lint - - - stage: test - name: chrome - addons: - chrome: stable - script: npx aegir test -t browser -t webworker - - - stage: test - name: firefox - addons: - firefox: latest - script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless - - - stage: test - name: electron-main - os: osx - script: - - npx aegir test -t electron-main --bail - - - stage: test - name: electron-renderer - os: osx - script: - - npx aegir test -t electron-renderer --bail - -notifications: - email: false diff --git a/package.json b/package.json index 600d44bc..c976b2f9 100644 --- a/package.json +++ b/package.json @@ -8,21 +8,40 @@ "./test/utils/create-libp2p-node": false, "./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js" }, + "types": "dist/src/index.d.ts", + "typesVersions": { + "*": { + "src/*": [ + "dist/src/*", + "dist/src/*/index" + ], + "src/": [ + "dist/src/index" + ] + } + }, + "eslintConfig": { + "extends": "ipfs", + "ignorePatterns": [ + "scripts/*" + ] + }, "files": [ "dist", "src" ], "scripts": { + "prepare": "aegir build --no-bundle", "test": "aegir test", "test:browser": "aegir test -t browser -t webworker", "test:node": "aegir test -t node", "lint": "aegir lint", + "check": "aegir ts -p check", "release": "aegir release", "release-minor": "aegir release --type minor", "release-major": "aegir release --type major", "bench": "node benchmarks/index", - "build": "aegir build", - "coverage": "aegir coverage --provider codecov", + "coverage": "aegir test -t node --cov && nyc report --reporter=html", "docs": "aegir docs", "benchmarks": "node test/benchmarks/get-many" }, @@ -43,50 +62,58 @@ "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { "@nodeutils/defaults-deep": "^1.1.0", - "aegir": "^28.1.0", + "@types/debug": "^4.1.5", + "aegir": "^31.0.4", + "assert": "^2.0.0", "benchmark": "^2.1.4", - "delay": "^4.3.0", - "ipfs-repo": "^7.0.0", - "ipfs-utils": "^6.0.0", + "delay": "^5.0.0", + "ipfs-repo": "^9.0.0", + "ipfs-utils": "^6.0.1", "iso-random-stream": "^1.1.1", - "it-all": "^1.0.2", - "it-drain": "^1.0.1", - "libp2p": "^0.30.5", - "libp2p-kad-dht": "^0.20.0", - "libp2p-mplex": "^0.10.0", - "libp2p-secio": "^0.13.0", - "libp2p-tcp": "^0.15.0", + "it-all": "^1.0.5", + "it-drain": "^1.0.4", + "libp2p": "^0.30.9", + "libp2p-kad-dht": "^0.21.0", + "libp2p-mplex": "^0.10.2", + "libp2p-secio": "^0.13.1", + "libp2p-tcp": "^0.15.3", "lodash.difference": "^4.5.0", "lodash.flatten": "^4.4.0", "lodash.range": "^3.2.0", "lodash.without": "^4.4.0", "p-defer": "^3.0.0", - "p-event": "^4.1.0", - "p-wait-for": "^3.1.0", - "peer-id": "^0.14.0", + "p-event": "^4.2.0", + "p-wait-for": "^3.2.0", + "peer-id": "^0.14.3", "promisify-es6": "^1.0.3", - "rimraf": "^3.0.0", - "sinon": "^9.0.0", + "rimraf": "^3.0.2", + "sinon": "^9.2.4", "stats-lite": "^2.2.0", - "uuid": "^8.0.0" + "uuid": "^8.3.2" }, "dependencies": { "abort-controller": "^3.0.0", - "any-signal": "^2.1.1", + "any-signal": "^2.1.2", "bignumber.js": "^9.0.0", - "cids": "^1.0.0", - "debug": "^4.1.0", + "cids": "^1.1.6", + "debug": "^4.2.0", + "ipfs-core-types": "^0.3.1", "ipld-block": "^0.11.0", - "it-length-prefixed": "^3.0.0", + "it-length-prefixed": "^3.1.0", "it-pipe": "^1.1.0", "just-debounce-it": "^1.1.0", "libp2p-interfaces": "^0.8.3", - "moving-average": "^1.0.0", - "multicodec": "^2.0.0", - "multihashing-async": "^2.0.1", + "moving-average": "^1.0.1", + "multicodec": "^3.0.1", + "multihashing-async": "^2.1.2", + "native-abort-controller": "^1.0.3", + "process": "^0.11.10", "protons": "^2.0.0", - "streaming-iterables": "^5.0.2", - "uint8arrays": "^2.0.5", + "readable-stream": "^3.6.0", + "streaming-iterables": "^5.0.4", + "uint8arrays": "^2.1.3", + "url": "^0.11.0", + "util": "^0.12.3", "varint-decoder": "^1.0.0" }, "pre-push": [ @@ -114,6 +141,7 @@ "dmitriy ryajov ", "Dmitriy Ryajov ", "Bryan Stenson ", - "Richard Schneider " + "Richard Schneider ", + "Irakli Gozalishvili " ] } diff --git a/scripts/node-globals.js b/scripts/node-globals.js new file mode 100644 index 00000000..f5912dfc --- /dev/null +++ b/scripts/node-globals.js @@ -0,0 +1,4 @@ +import { Buffer } from 'buffer' +import process from "process/browser" + +export { Buffer, process } diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index b873e9dd..f2395cdd 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -1,5 +1,11 @@ 'use strict' +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('ipfs-core-types/src/block-service').Block} Block + * @typedef {import('../types/message/entry')} BitswapMessageEntry + */ + const CID = require('cids') const Message = require('../types/message') @@ -26,7 +32,16 @@ const TARGET_MESSAGE_SIZE = 16 * 1024 const MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024 class DecisionEngine { - constructor (peerId, blockstore, network, stats, opts) { + /** + * @param {PeerId} peerId + * @param {import('ipfs-core-types/src/block-store').BlockStore} blockstore + * @param {import('../network')} network + * @param {import('../stats')} stats + * @param {Object} [opts] + * @param {number} [opts.targetMessageSize] + * @param {number} [opts.maxSizeReplaceHasWithBlock] + */ + constructor (peerId, blockstore, network, stats, opts = {}) { this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network @@ -34,6 +49,7 @@ class DecisionEngine { this._opts = this._processOpts(opts) // A list of of ledgers by their partner id + /** @type {Map} */ this.ledgerMap = new Map() this._running = false @@ -41,6 +57,12 @@ class DecisionEngine { this._requestQueue = new RequestQueue(TaskMerger) } + /** + * @template {Object} Opts + * @param {Opts} opts + * @returns {Opts & {maxSizeReplaceHasWithBlock:number, targetMessageSize:number}} + * @private + */ _processOpts (opts) { return { maxSizeReplaceHasWithBlock: MAX_SIZE_REPLACE_HAS_WITH_BLOCK, @@ -49,14 +71,21 @@ class DecisionEngine { } } + /** + * @private + */ _scheduleProcessTasks () { setTimeout(() => { this._processTasks() }) } - // Pull tasks off the request queue and send a message to the corresponding - // peer + /** + * Pull tasks off the request queue and send a message to the corresponding + * peer + * + * @private + */ async _processTasks () { if (!this._running) { return @@ -112,7 +141,7 @@ class DecisionEngine { // If there's nothing in the message, bail out if (msg.empty) { - this._requestQueue.tasksDone(peerId, tasks) + peerId && this._requestQueue.tasksDone(peerId, tasks) // Trigger the next round of task processing this._scheduleProcessTasks() @@ -122,32 +151,37 @@ class DecisionEngine { try { // Send the message - await this.network.sendMessage(peerId, msg) + peerId && await this.network.sendMessage(peerId, msg) // Peform sent message accounting for (const block of blocks.values()) { - this.messageSent(peerId, block) + peerId && this.messageSent(peerId, block) } } catch (err) { this._log.error(err) } // Free the tasks up from the request queue - this._requestQueue.tasksDone(peerId, tasks) + peerId && this._requestQueue.tasksDone(peerId, tasks) // Trigger the next round of task processing this._scheduleProcessTasks() } + /** + * @param {PeerId} peerId + * @returns {Map} + */ wantlistForPeer (peerId) { const peerIdStr = peerId.toB58String() - if (!this.ledgerMap.has(peerIdStr)) { - return new Map() - } - - return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries() + const ledger = this.ledgerMap.get(peerIdStr) + return ledger ? ledger.wantlist.sortedEntries() : new Map() } + /** + * @param {PeerId} peerId + * @returns {import('ipfs-core-types/src/bitswap').LedgerForPeer|null} + */ ledgerForPeer (peerId) { const peerIdStr = peerId.toB58String() @@ -164,12 +198,20 @@ class DecisionEngine { } } + /** + * @returns {PeerId[]} + */ peers () { return Array.from(this.ledgerMap.values()).map((l) => l.partner) } - // Receive blocks either from an incoming message from the network, or from - // blocks being added by the client on the localhost (eg IPFS add) + /** + * Receive blocks either from an incoming message from the network, or from + * blocks being added by the client on the localhost (eg IPFS add) + * + * @param {Block[]} blocks + * @returns {void} + */ receivedBlocks (blocks) { if (!blocks.length) { return @@ -211,7 +253,13 @@ class DecisionEngine { this._scheduleProcessTasks() } - // Handle incoming messages + /** + * Handle incoming messages + * + * @param {PeerId} peerId + * @param {Message} msg + * @returns {Promise} + */ async messageReceived (peerId, msg) { const ledger = this._findOrCreate(peerId) @@ -233,7 +281,9 @@ class DecisionEngine { } // Clear cancelled wants and add new wants to the ledger + /** @type {CID[]} */ const cancels = [] + /** @type {BitswapMessageEntry[]} */ const wants = [] msg.wantlist.forEach((entry) => { if (entry.cancel) { @@ -251,12 +301,24 @@ class DecisionEngine { this._scheduleProcessTasks() } + /** + * @private + * @param {PeerId} peerId + * @param {CID[]} cids + * @returns {void} + */ _cancelWants (peerId, cids) { for (const c of cids) { this._requestQueue.remove(c.toString(), peerId) } } + /** + * @private + * @param {PeerId} peerId + * @param {BitswapMessageEntry[]} wants + * @returns {Promise} + */ async _addWants (peerId, wants) { // Get the size of each wanted block const blockSizes = await this._getBlockSizes(wants.map(w => w.cid)) @@ -315,16 +377,31 @@ class DecisionEngine { } } + /** + * @private + * @param {import('../types/message/message.proto').WantType} wantType + * @param {number} blockSize + */ _sendAsBlock (wantType, blockSize) { return wantType === WantType.Block || blockSize <= this._opts.maxSizeReplaceHasWithBlock } + /** + * @private + * @param {CID[]} cids + * @returns {Promise>} + */ async _getBlockSizes (cids) { const blocks = await this._getBlocks(cids) return new Map([...blocks].map(([k, v]) => [k, v.data.length])) } + /** + * @private + * @param {CID[]} cids + * @returns {Promise>} + */ async _getBlocks (cids) { const res = new Map() await Promise.all(cids.map(async (cid) => { @@ -340,6 +417,11 @@ class DecisionEngine { return res } + /** + * @private + * @param {Map} blocksMap + * @param {Ledger} ledger + */ _updateBlockAccounting (blocksMap, ledger) { blocksMap.forEach(b => { this._log('got block (%s bytes)', b.data.length) @@ -347,7 +429,14 @@ class DecisionEngine { }) } - // Clear up all accounting things after message was sent + /** + * Clear up all accounting things after message was sent + * + * @param {PeerId} peerId + * @param {Object} [block] + * @param {Uint8Array} block.data + * @param {CID} [block.cid] + */ messageSent (peerId, block) { const ledger = this._findOrCreate(peerId) ledger.sentBytes(block ? block.data.length : 0) @@ -356,15 +445,29 @@ class DecisionEngine { } } + /** + * @param {PeerId} peerId + * @returns {number} + */ numBytesSentTo (peerId) { return this._findOrCreate(peerId).accounting.bytesSent } + /** + * @param {PeerId} peerId + * @returns {number} + */ + numBytesReceivedFrom (peerId) { return this._findOrCreate(peerId).accounting.bytesRecv } - peerDisconnected (peerId) { + /** + * + * @param {PeerId} _peerId + * @returns {void} + */ + peerDisconnected (_peerId) { // if (this.ledgerMap.has(peerId.toB58String())) { // this.ledgerMap.delete(peerId.toB58String()) // } @@ -373,10 +476,16 @@ class DecisionEngine { // in the peer request queue } + /** + * @private + * @param {PeerId} peerId + * @returns {Ledger} + */ _findOrCreate (peerId) { const peerIdStr = peerId.toB58String() - if (this.ledgerMap.has(peerIdStr)) { - return this.ledgerMap.get(peerIdStr) + const ledger = this.ledgerMap.get(peerIdStr) + if (ledger) { + return ledger } const l = new Ledger(peerId) diff --git a/src/decision-engine/ledger.js b/src/decision-engine/ledger.js index 407f6f2c..48f4fe3e 100644 --- a/src/decision-engine/ledger.js +++ b/src/decision-engine/ledger.js @@ -2,7 +2,14 @@ const Wantlist = require('../types/wantlist') +/** + * @typedef {import('cids')} CID + */ + class Ledger { + /** + * @param {import('peer-id')} peerId + */ constructor (peerId) { this.partner = peerId this.wantlist = new Wantlist() @@ -16,30 +23,55 @@ class Ledger { } } + /** + * @param {number} n + */ sentBytes (n) { this.exchangeCount++ this.lastExchange = (new Date()).getTime() this.accounting.bytesSent += n } + /** + * @param {number} n + */ receivedBytes (n) { this.exchangeCount++ this.lastExchange = (new Date()).getTime() this.accounting.bytesRecv += n } + /** + * + * @param {CID} cid + * @param {number} priority + * @param {import('../types/message/message.proto').WantType} wantType + * @returns {void} + */ wants (cid, priority, wantType) { this.wantlist.add(cid, priority, wantType) } + /** + * @param {CID} cid + * @returns {void} + */ + cancelWant (cid) { this.wantlist.remove(cid) } + /** + * @param {CID} cid + * @returns {import('../types/wantlist/entry')|void} + */ wantlistContains (cid) { return this.wantlist.contains(cid) } + /** + * @returns {number} + */ debtRatio () { return (this.accounting.bytesSent / (this.accounting.bytesRecv + 1)) // +1 is to prevent division by zero } diff --git a/src/decision-engine/req-queue.js b/src/decision-engine/req-queue.js index fe7a07d2..cbe94050 100644 --- a/src/decision-engine/req-queue.js +++ b/src/decision-engine/req-queue.js @@ -3,22 +3,26 @@ const SortedMap = require('../utils/sorted-map') /** - * @typedef {Object} Task - * @property {string} topic - a name for the Task (like an id but not necessarily unique) - * @property {number} priority - tasks are ordered by priority per peer - * @property {number} size - the size of the task, eg the number of bytes in a block - */ - -/** - * @typedef {Object} TaskMerger - * @property {function(task, tasksWithTopic)} hasNewInfo - given the existing tasks with the same topic, does the task add some new information? Used to decide whether to merge the task or ignore it. - * @property {function(task, existingTask)} merge - merge the information from the given task into the existing task (with the same topic) + * @typedef {Object} PopTaskResult + * @property {PeerId} [peerId] + * @property {Task[]} tasks + * @property {number} pendingSize + * + * @typedef {Object} PendingTask + * @property {number} created + * @property {Task} task + * + * @typedef {import('peer-id')} PeerId + * @typedef {import('./types').Task} Task + * @typedef {import('./types').TaskMerger} TaskMerger */ /** * The task merger that is used by default. * Assumes that new tasks do not add any information over existing tasks, * and doesn't try to merge. + * + * @type {TaskMerger} */ const DefaultTaskMerger = { hasNewInfo () { @@ -37,18 +41,20 @@ const DefaultTaskMerger = { */ class RequestQueue { /** - * @param {TaskMerger} taskMerger + * @param {TaskMerger} [taskMerger] */ - constructor (taskMerger) { - this._taskMerger = taskMerger || DefaultTaskMerger - this._byPeer = new SortedMap([], PeerTasks.compare, true) + constructor (taskMerger = DefaultTaskMerger) { + this._taskMerger = taskMerger + /** @type {SortedMap} */ + this._byPeer = new SortedMap([], PeerTasks.compare) } /** * Push tasks onto the queue for the given peer * * @param {PeerId} peerId - * @param {Task} tasks + * @param {Task[]} tasks + * @returns {void} */ pushTasks (peerId, tasks) { let peerTasks = this._byPeer.get(peerId.toB58String()) @@ -69,16 +75,16 @@ class RequestQueue { * actively being processed (and cannot be modified). * * @param {number} targetMinBytes - the minimum total size of tasks to pop - * @returns {Object} + * @returns {PopTaskResult} */ popTasks (targetMinBytes) { - if (this._byPeer.size === 0) { - return { tasks: [], pendingSize: 0 } - } - // Get the queue of tasks for the best peer and pop off tasks up to // targetMinBytes const peerTasks = this._head() + if (peerTasks === undefined) { + return { tasks: [], pendingSize: 0 } + } + const { tasks, pendingSize } = peerTasks.popTasks(targetMinBytes) if (tasks.length === 0) { return { tasks, pendingSize } @@ -99,10 +105,21 @@ class RequestQueue { } } + /** + * @private + * @returns {PeerTasks|undefined} + */ _head () { + // Shortcut + if (this._byPeer.size === 0) { + return undefined + } + + // eslint-disable-next-line no-unreachable-loop for (const [, v] of this._byPeer) { return v } + return undefined } @@ -111,6 +128,7 @@ class RequestQueue { * * @param {string} topic * @param {PeerId} peerId + * @returns {void} */ remove (topic, peerId) { const peerTasks = this._byPeer.get(peerId.toB58String()) @@ -122,6 +140,7 @@ class RequestQueue { * * @param {PeerId} peerId * @param {Task[]} tasks + * @returns {void} */ tasksDone (peerId, tasks) { const peerTasks = this._byPeer.get(peerId.toB58String()) @@ -161,6 +180,7 @@ class PeerTasks { * Push tasks onto the queue. * * @param {Task[]} tasks + * @returns {void} */ pushTasks (tasks) { for (const t of tasks) { @@ -168,6 +188,12 @@ class PeerTasks { } } + /** + * @private + * @param {Task} task + * @returns {void} + */ + _pushTask (task) { // If the new task doesn't add any more information over what we // already have in the active queue, then we can skip the new task @@ -196,8 +222,14 @@ class PeerTasks { this._pending.add(task) } - // Indicates whether the new task adds any more information over tasks that are - // already in the active task queue + /** + * Indicates whether the new task adds any more information over tasks that are + * already in the active task queue + * + * @private + * @param {Task} task + * @returns {boolean} + */ _taskHasMoreInfoThanActiveTasks (task) { const tasksWithTopic = [] for (const activeTask of this._active) { @@ -218,7 +250,7 @@ class PeerTasks { * Pop tasks off the queue such that the total size is at least targetMinBytes * * @param {number} targetMinBytes - * @returns {Object} + * @returns {PopTaskResult} */ popTasks (targetMinBytes) { let size = 0 @@ -248,6 +280,7 @@ class PeerTasks { * Note: must be the same reference as returned from popTasks. * * @param {Task} task + * @returns {void} */ taskDone (task) { if (this._active.has(task)) { @@ -260,6 +293,7 @@ class PeerTasks { * Remove pending tasks with the given topic * * @param {string} topic + * @returns {void} */ remove (topic) { this._pending.delete(topic) @@ -271,10 +305,17 @@ class PeerTasks { * @returns {boolean} */ isIdle () { - return this._pending.length === 0 && this._active.length === 0 + return this._pending.length === 0 && this._active.size === 0 } - // Compare PeerTasks + /** + * Compare PeerTasks + * + * @template Key + * @param {[Key, PeerTasks]} a + * @param {[Key, PeerTasks]} b + * @returns {number} + */ static compare (a, b) { // Move peers with no pending tasks to the back of the queue if (a[1]._pending.length === 0) { @@ -300,6 +341,7 @@ class PeerTasks { */ class PendingTasks { constructor () { + /** @type {SortedMap} */ this._tasks = new SortedMap([], this._compare) } @@ -307,15 +349,26 @@ class PendingTasks { return this._tasks.size } - // Sum of the size of all pending tasks + /** + * Sum of the size of all pending tasks + * + * @type {number} + **/ get totalSize () { return [...this._tasks.values()].reduce((a, t) => a + t.task.size, 0) } + /** + * @param {string} topic + * @returns {Task|void} + */ get (topic) { return (this._tasks.get(topic) || {}).task } + /** + * @param {Task} task + */ add (task) { this._tasks.set(task.topic, { created: Date.now(), @@ -323,6 +376,10 @@ class PendingTasks { }) } + /** + * @param {string} topic + * @returns {void} + */ delete (topic) { this._tasks.delete(topic) } @@ -332,7 +389,13 @@ class PendingTasks { return [...this._tasks.values()].map(i => i.task) } - // Update the priority of the task with the given topic, and update the order + /** + * Update the priority of the task with the given topic, and update the order + * + * @param {string} topic + * @param {number} priority + * @returns {void} + **/ updatePriority (topic, priority) { const obj = this._tasks.get(topic) if (!obj) { @@ -344,7 +407,14 @@ class PendingTasks { this._tasks.update(i) } - // Sort by priority desc then FIFO + /** + * Sort by priority desc then FIFO + * + * @param {[string, PendingTask]} a + * @param {[string, PendingTask]} b + * @returns {number} + * @private + */ _compare (a, b) { if (a[1].task.priority === b[1].task.priority) { // FIFO diff --git a/src/decision-engine/task-merger.js b/src/decision-engine/task-merger.js index 7f514ab3..18941f52 100644 --- a/src/decision-engine/task-merger.js +++ b/src/decision-engine/task-merger.js @@ -1,9 +1,15 @@ 'use strict' +/** + * @typedef {import('./types').Task} Task + * @typedef {import('./types').TaskMerger} TaskMergerAPI + */ + +/** @type {TaskMergerAPI} */ const TaskMerger = { /** * Indicates whether the given task has newer information than the active - * tasks with the same topic + * tasks with the same topic. * * @param {Task} task * @param {Task[]} tasksWithTopic @@ -39,47 +45,13 @@ const TaskMerger = { }, /** - * Merge the information from the task into the existing pending task + * Merge the information from the given task into the existing task (with the + * same topic) * * @param {Task} newTask * @param {Task} existingTask */ merge (newTask, existingTask) { - // Tasks look like this: - // { - // topic: "some topic", - // priority: 5, - // - // # The size of the response on the wire. This is used to calculate - // # how many tasks to pop off the request queue and add to a message. - // # If the response is - // # - a HAVE or DONT_HAVE - // # it is the size of the CID + type (HAVE/DONT_HAVE) - // # - a block - // # it is the size of the block - // size: 32, - // - // data: { - // - // # The size of the block, if known (if we don't have the block this is zero) - // blockSize: 128 * 1024, - // - // # Indicates if the request is for a block or for a HAVE - // isWantBlock: false, - // - // # Do we have the block? - // # Note: a block can have size zero. - // haveBlock: true, - // - // # Indicates whether to send a DONT_HAVE response if we don't have - // # the block. - // # If this is false and we don't have the block, we just ignore the - // # want-block request (useful for discovery where we query lots of - // # peers but don't want a response unless the peer has the block). - // sendDontHave: false - // } - // } - // // The merge function ignores the topic and priority as these don't change. // // We may receive new information about a want before the want has been @@ -97,8 +69,6 @@ const TaskMerger = { // 3. Local node receives block for CID1 from peer // In this case we should replace DONT_HAVE with the want, including // updating the task size and block size. - // - const taskData = newTask.data const existingData = existingTask.data diff --git a/src/decision-engine/types.d.ts b/src/decision-engine/types.d.ts new file mode 100644 index 00000000..1731d296 --- /dev/null +++ b/src/decision-engine/types.d.ts @@ -0,0 +1,51 @@ +export interface TaskMerger { + /** + * Given the existing tasks with the same topic, does the task add some new + * information? Used to decide whether to merge the task or ignore it. + */ + hasNewInfo: (task: Task, tasksWithTopic: Task[]) => boolean + + /** + * Merge the information from the task into the existing pending task. + */ + merge: (newTask: Task, existingTask: Task) => void +} + +export interface Task { + /** + * A name for the Task (like an id but not necessarily unique) + */ + topic: string + /** + * Priority for the Task (tasks are ordered by priority per peer). + */ + priority: number + /** + * The size of the task, e.g. the number of bytes in a block. + */ + size: number + + data: TaskData +} + +export interface TaskData { + /** + * The size of the block, if known (if we don't have the block this is zero) + */ + blockSize: number + /** + * Indicates if the request is for a block or for a HAVE. + */ + isWantBlock: boolean + /** + * Indicates if we have the block. + */ + haveBlock: boolean + /** + * Indicates whether to send a DONT_HAVE response if we don't have the block. + * If this is `false` and we don't have the block, we just ignore the + * want-block request (useful for discovery where we query lots of peers but + * don't want a response unless the peer has the block). + */ + sendDontHave: boolean +} diff --git a/src/index.js b/src/index.js index 5c39c00a..3a6af632 100644 --- a/src/index.js +++ b/src/index.js @@ -6,8 +6,19 @@ const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') const logger = require('./utils').logger const Stats = require('./stats') -const AbortController = require('abort-controller') -const anySignal = require('any-signal') +const { AbortController } = require('native-abort-controller') +const { anySignal } = require('any-signal') + +/** + * @typedef {import('ipfs-core-types/src/basic').AbortOptions} AbortOptions + * @typedef {import('ipfs-core-types/src/bitswap').Bitswap} API + * @typedef {import('ipfs-core-types/src/bitswap').WantListEntry} WantListEntry + * @typedef {import('ipfs-core-types/src/bitswap').LedgerForPeer} LedgerForPeer + * @typedef {import('ipfs-core-types/src/block-service').Block} Block + * @typedef {import('peer-id')} PeerId + * @typedef {import('./types/message')} BitswapMessage + * @typedef {import('cids')} CID + */ const defaultOptions = { statsEnabled: false, @@ -30,12 +41,18 @@ const statsKeys = [ * JavaScript implementation of the Bitswap 'data exchange' protocol * used by IPFS. * - * @param {Libp2p} libp2p - * @param {Blockstore} blockstore - * @param {Object} options + * @implements {API} */ class Bitswap { - constructor (libp2p, blockstore, options) { + /** + * @param {import('libp2p')} libp2p + * @param {import('ipfs-core-types/src/block-store').BlockStore} blockstore + * @param {Object} [options] + * @param {boolean} [options.statsEnabled=false] + * @param {number} [options.statsComputeThrottleTimeout=1000] + * @param {number} [options.statsComputeThrottleMaxQueueSize=1000] + */ + constructor (libp2p, blockstore, options = {}) { this._libp2p = libp2p this._log = logger(this.peerId) @@ -49,7 +66,7 @@ class Bitswap { }) // the network delivers messages - this.network = new Network(libp2p, this, {}, this._stats) + this.network = new Network(libp2p, this, this._stats) // local database this.blockstore = blockstore @@ -62,11 +79,20 @@ class Bitswap { this.notifications = new Notifications(this.peerId) } + /** + * @type {PeerId} + */ get peerId () { return this._libp2p.peerId } - // handle messages received through the network + /** + * handle messages received through the network + * + * @param {PeerId} peerId + * @param {BitswapMessage} incoming + * @returns {Promise} + */ async _receiveMessage (peerId, incoming) { try { // Note: this allows the engine to respond to any wants in the message. @@ -99,6 +125,13 @@ class Bitswap { })) } + /** + * @private + * @param {PeerId} peerId + * @param {Block} block + * @param {boolean} wasWanted + * @returns {Promise} + */ async _handleReceivedBlock (peerId, block, wasWanted) { this._log('received block') @@ -113,27 +146,46 @@ class Bitswap { await this.put(block) } - _updateReceiveCounters (peerId, block, exists) { - this._stats.push(peerId, 'blocksReceived', 1) - this._stats.push(peerId, 'dataReceived', block.data.length) + /** + * @private + * @param {string} peerIdStr + * @param {Block} block + * @param {boolean} exists + */ + _updateReceiveCounters (peerIdStr, block, exists) { + this._stats.push(peerIdStr, 'blocksReceived', 1) + this._stats.push(peerIdStr, 'dataReceived', block.data.length) if (exists) { - this._stats.push(peerId, 'dupBlksReceived', 1) - this._stats.push(peerId, 'dupDataReceived', block.data.length) + this._stats.push(peerIdStr, 'dupBlksReceived', 1) + this._stats.push(peerIdStr, 'dupDataReceived', block.data.length) } } - // handle errors on the receiving channel + /** + * handle errors on the receiving channel + * + * @param {Error} err + * @returns {void} + */ _receiveError (err) { this._log.error('ReceiveError: %s', err.message) } - // handle new peers + /** + * handle new peers + * + * @param {PeerId} peerId + */ _onPeerConnected (peerId) { this.wm.connected(peerId) } - // handle peers being disconnected + /** + * handle peers being disconnected + * + * @param {PeerId} peerId + */ _onPeerDisconnected (peerId) { this.wm.disconnected(peerId) this.engine.peerDisconnected(peerId) @@ -158,9 +210,10 @@ class Bitswap { * Return the current wantlist for a given `peerId` * * @param {PeerId} peerId - * @returns {Map} + * @param {AbortOptions} [_options] + * @returns {Map} */ - wantlistForPeer (peerId) { + wantlistForPeer (peerId, _options) { return this.engine.wantlistForPeer(peerId) } @@ -168,7 +221,7 @@ class Bitswap { * Return ledger information for a given `peerId` * * @param {PeerId} peerId - * @returns {Object} + * @returns {null|LedgerForPeer} */ ledgerForPeer (peerId) { return this.engine.ledgerForPeer(peerId) @@ -179,11 +232,17 @@ class Bitswap { * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us. * * @param {CID} cid - * @param {Object} options - * @param {AbortSignal} options.abortSignal + * @param {Object} [options] + * @param {AbortSignal} [options.signal] * @returns {Promise} */ async get (cid, options = {}) { + /** + * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.signal + * @returns {Promise} + */ const fetchFromNetwork = (cid, options) => { // add it to the want list - n.b. later we will abort the AbortSignal // so no need to remove the blocks from the wantlist after we have it @@ -194,6 +253,13 @@ class Bitswap { let promptedNetwork = false + /** + * + * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.signal + * @returns {Promise} + */ const loadOrFetchFromNetwork = async (cid, options) => { try { // have to await here as we want to handle ERR_NOT_FOUND @@ -222,7 +288,9 @@ class Bitswap { // a race condition, so register for incoming block notifications as well // as trying to get it from the datastore const controller = new AbortController() - const signal = anySignal([options.signal, controller.signal]) + const signal = options.signal + ? anySignal([options.signal, controller.signal]) + : controller.signal const block = await Promise.race([ this.notifications.wantBlock(cid, { @@ -243,10 +311,10 @@ class Bitswap { * Fetch a a list of blocks by cid. If the blocks are in the local * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. * - * @param {AsyncIterator} cids - * @param {Object} options - * @param {AbortSignal} options.abortSignal - * @returns {Promise>} + * @param {AsyncIterable|Iterable} cids + * @param {Object} [options] + * @param {AbortSignal} [options.signal] + * @returns {AsyncIterable} */ async * getMany (cids, options = {}) { for await (const cid of cids) { @@ -262,16 +330,14 @@ class Bitswap { * If you want to cancel the want for a block without doing that, pass an * AbortSignal in to `.get` or `.getMany` and abort it. * - * @param {Iterable} cids + * @param {CID[]|CID} cids * @returns {void} */ unwant (cids) { - if (!Array.isArray(cids)) { - cids = [cids] - } + const cidsArray = Array.isArray(cids) ? cids : [cids] - this.wm.unwantBlocks(cids) - cids.forEach((cid) => this.notifications.unwantBlock(cid)) + this.wm.unwantBlocks(cidsArray) + cidsArray.forEach((cid) => this.notifications.unwantBlock(cid)) } /** @@ -279,14 +345,11 @@ class Bitswap { * for blocks to never resolve. If you wish these promises to abort instead * call `unwant(cids)` instead. * - * @param {Iterable} cids + * @param {CID[]|CID} cids * @returns {void} */ cancelWants (cids) { - if (!Array.isArray(cids)) { - cids = [cids] - } - this.wm.cancelWants(cids) + this.wm.cancelWants(Array.isArray(cids) ? cids : [cids]) } /** @@ -294,9 +357,10 @@ class Bitswap { * send it to nodes that have it in their wantlist. * * @param {Block} block + * @param {AbortOptions} [_options] * @returns {Promise} */ - async put (block) { + async put (block, _options) { await this.blockstore.put(block) this._sendHaveBlockNotifications(block) } @@ -305,7 +369,7 @@ class Bitswap { * Put the given blocks to the underlying blockstore and * send it to nodes that have it them their wantlist. * - * @param {AsyncIterable} blocks + * @param {AsyncIterable|Iterable} blocks * @returns {AsyncIterable} */ async * putMany (blocks) { @@ -319,6 +383,7 @@ class Bitswap { /** * Sends notifications about the arrival of a block * + * @private * @param {Block} block */ _sendHaveBlockNotifications (block) { @@ -333,7 +398,7 @@ class Bitswap { /** * Get the current list of wants. * - * @returns {Iterator} + * @returns {Iterable<[string, WantListEntry]>} */ getWantlist () { return this.wm.wantlist.entries() @@ -342,7 +407,7 @@ class Bitswap { /** * Get the current list of partners. * - * @returns {Iterator} + * @returns {PeerId[]} */ peers () { return this.engine.peers() @@ -351,7 +416,7 @@ class Bitswap { /** * Get stats about the bitswap node. * - * @returns {Object} + * @returns {import('ipfs-core-types/src/bitswap').Stats} */ stat () { return this._stats diff --git a/src/network.js b/src/network.js index 82285fb7..d6e82b2a 100644 --- a/src/network.js +++ b/src/network.js @@ -1,7 +1,7 @@ 'use strict' const lp = require('it-length-prefixed') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') @@ -9,21 +9,46 @@ const Message = require('./types/message') const CONSTANTS = require('./constants') const logger = require('./utils').logger +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('cids')} CID + * @typedef {import('multiaddr')} Multiaddr + * + * @typedef {Object} Connection + * @property {string} id + * @property {PeerId} remotePeer + * + * @typedef {Object} Provider + * @property {PeerId} id + * @property {Multiaddr[]} multiaddrs + * + * @typedef {Object} Stream + * @property {AsyncIterable} source + * @property {(output:AsyncIterable) => Promise} sink + */ + const BITSWAP100 = '/ipfs/bitswap/1.0.0' const BITSWAP110 = '/ipfs/bitswap/1.1.0' const BITSWAP120 = '/ipfs/bitswap/1.2.0' class Network { - constructor (libp2p, bitswap, options, stats) { + /** + * @param {import('libp2p')} libp2p + * @param {import('./index')} bitswap + * @param {import('./stats')} stats + * @param {Object} [options] + * @param {boolean} [options.b100Only] + */ + constructor (libp2p, bitswap, stats, options = {}) { this._log = logger(libp2p.peerId, 'network') - options = options || {} - this.libp2p = libp2p - this.bitswap = bitswap - this.protocols = [BITSWAP100] + this._libp2p = libp2p + this._bitswap = bitswap + this._protocols = [BITSWAP100] + if (!options.b100Only) { // Latest bitswap first - this.protocols.unshift(BITSWAP110) - this.protocols.unshift(BITSWAP120) + this._protocols.unshift(BITSWAP110) + this._protocols.unshift(BITSWAP120) } this._stats = stats @@ -37,21 +62,21 @@ class Network { start () { this._running = true - this.libp2p.handle(this.protocols, this._onConnection) + this._libp2p.handle(this._protocols, this._onConnection) // register protocol with topology const topology = new MulticodecTopology({ - multicodecs: this.protocols, + multicodecs: this._protocols, handlers: { onConnect: this._onPeerConnect, onDisconnect: this._onPeerDisconnect } }) - this._registrarId = this.libp2p.registrar.register(topology) + this._registrarId = this._libp2p.registrar.register(topology) // All existing connections are like new ones for us - for (const peer of this.libp2p.peerStore.peers.values()) { - const conn = this.libp2p.connectionManager.get(peer.id) + for (const peer of this._libp2p.peerStore.peers.values()) { + const conn = this._libp2p.connectionManager.get(peer.id) conn && this._onPeerConnect(conn.remotePeer) } @@ -61,21 +86,23 @@ class Network { this._running = false // Unhandle both, libp2p doesn't care if it's not already handled - this.libp2p.unhandle(this.protocols) + this._libp2p.unhandle(this._protocols) // unregister protocol and handlers - this.libp2p.registrar.unregister(this._registrarId) + if (this._registrarId != null) { + this._libp2p.registrar.unregister(this._registrarId) + } } /** * Handles both types of incoming bitswap messages * * @private - * @param {object} param0 - * @param {string} param0.protocol - The protocol the stream is running - * @param {Stream} param0.stream - A duplex iterable stream - * @param {Connection} param0.connection - A libp2p Connection - * @returns {void} + * @param {object} connection + * @param {string} connection.protocol - The protocol the stream is running + * @param {Stream} connection.stream - A duplex iterable stream + * @param {Connection} connection.connection - A libp2p Connection + * @returns {Promise} */ async _onConnection ({ protocol, stream, connection }) { if (!this._running) { return } @@ -85,13 +112,16 @@ class Network { await pipe( stream, lp.decode(), + /** + * @param {AsyncIterable} source + */ async (source) => { for await (const data of source) { try { const message = await Message.deserialize(data.slice()) - await this.bitswap._receiveMessage(connection.remotePeer, message) + await this._bitswap._receiveMessage(connection.remotePeer, message) } catch (err) { - this.bitswap._receiveError(err) + this._bitswap._receiveError(err) break } } @@ -102,12 +132,21 @@ class Network { } } + /** + * @private + * @param {PeerId} peerId + */ _onPeerConnect (peerId) { - this.bitswap._onPeerConnected(peerId) + this._bitswap._onPeerConnected(peerId) } + /** + * @private + * @param {PeerId} peerId + * @returns {void} + */ _onPeerDisconnect (peerId) { - this.bitswap._onPeerDisconnected(peerId) + this._bitswap._onPeerDisconnected(peerId) } /** @@ -115,14 +154,16 @@ class Network { * * @param {CID} cid * @param {number} maxProviders - * @param {Object} options - * @param {AbortSignal} options.abortSignal - * @returns {AsyncIterable} + * @param {Object} [options] + * @param {AbortSignal} [options.signal] + * @returns {AsyncIterable} */ findProviders (cid, maxProviders, options = {}) { - return this.libp2p.contentRouting.findProviders( + return this._libp2p.contentRouting.findProviders( cid, { + // TODO: Should this be a timeout options insetad ? + // @ts-expect-error - 'maxTimeout' does not exist in type maxTimeout: CONSTANTS.providerRequestTimeout, maxNumProviders: maxProviders, signal: options.signal @@ -134,9 +175,9 @@ class Network { * Find the providers of a given `cid` and connect to them. * * @param {CID} cid - * @param {Object} options - * @param {AbortSignal} options.abortSignal - * @returns {void} + * @param {Object} [options] + * @param {AbortSignal} [options.signal] + * @returns {Promise} */ async findAndConnect (cid, options) { const connectAttempts = [] @@ -151,16 +192,23 @@ class Network { * Tell the network we can provide content for the passed CID * * @param {CID} cid - * @param {Object} options - * @param {AbortSignal} options.abortSignal + * @param {Object} [options] + * @param {AbortSignal} [options.signal] * @returns {Promise} */ async provide (cid, options) { - await this.libp2p.contentRouting.provide(cid, options) + // @ts-expect-error - contentRouting takes no options + await this._libp2p.contentRouting.provide(cid, options) } - // Connect to the given peer - // Send the given msg (instance of Message) to the given peer + /** + * Connect to the given peer + * Send the given msg (instance of Message) to the given peer + * + * @param {PeerId} peer + * @param {Message} msg + * @returns {Promise} + */ async sendMessage (peer, msg) { if (!this._running) throw new Error('network isn\'t running') @@ -169,6 +217,7 @@ class Network { const { stream, protocol } = await this._dialPeer(peer) + /** @type {Uint8Array} */ let serialized switch (protocol) { case BITSWAP100: @@ -191,9 +240,9 @@ class Network { /** * Connects to another peer * - * @param {PeerId|Multiaddr} peer - * @param {Object} options - * @param {AbortSignal} options.abortSignal + * @param {PeerId|Multiaddr|Provider} peer + * @param {Object} [options] + * @param {AbortSignal} [options.signal] * @returns {Promise} */ async connectTo (peer, options) { // eslint-disable-line require-await @@ -201,14 +250,26 @@ class Network { throw new Error('network isn\'t running') } - return this.libp2p.dial(peer, options) + // TODO: Figure out inconsistency here. + // @ts-expect-error - dial does not expects Provider + return this._libp2p.dial(peer, options) } - // Dial to the peer and try to use the most recent Bitswap + /** + * Dial to the peer and try to use the most recent Bitswap + * + * @private + * @param {PeerId|Multiaddr} peer + */ _dialPeer (peer) { - return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100]) + return this._libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100]) } + /** + * @private + * @param {PeerId} peer + * @param {Map} blocks + */ _updateSentStats (peer, blocks) { const peerId = peer.toB58String() @@ -219,6 +280,12 @@ class Network { } } +/** + * + * @param {Stream} stream + * @param {Uint8Array} msg + * @param {*} log + */ async function writeMessage (stream, msg, log) { try { await pipe( diff --git a/src/notifications.js b/src/notifications.js index 5804ecff..00332fb4 100644 --- a/src/notifications.js +++ b/src/notifications.js @@ -1,24 +1,34 @@ 'use strict' -const EventEmitter = require('events').EventEmitter -const Block = require('ipld-block') +const { EventEmitter } = require('events') +const IPLDBlock = require('ipld-block') const uint8ArrayEquals = require('uint8arrays/equals') const uint8ArrayToString = require('uint8arrays/to-string') const CONSTANTS = require('./constants') const logger = require('./utils').logger +/** + * @typedef {import('ipfs-core-types/src/block-service').Block} Block + */ + +/** + * @param {CID} cid + */ const unwantEvent = (cid) => `unwant:${uint8ArrayToString(cid.multihash, 'base64')}` -const blockEvent = (cid) => `block:${uint8ArrayToString(cid.multihash, 'base64')}` /** - * Internal module used to track events about incoming blocks, - * wants and unwants. - * - * @param {PeerId} peerId - * @private + * @param {CID} cid */ +const blockEvent = (cid) => `block:${uint8ArrayToString(cid.multihash, 'base64')}` + class Notifications extends EventEmitter { + /** + * Internal module used to track events about incoming blocks, + * wants and unwants. + * + * @param {PeerId} peerId + */ constructor (peerId) { super() @@ -46,8 +56,8 @@ class Notifications extends EventEmitter { * or undefined when the block is unwanted. * * @param {CID} cid - * @param {Object} options - * @param {AbortSignal} options.abortSignal + * @param {Object} [options] + * @param {AbortSignal} [options.signal] * @returns {Promise} */ wantBlock (cid, options = {}) { @@ -65,6 +75,10 @@ class Notifications extends EventEmitter { this.removeListener(blockEvt, onBlock) reject(new Error(`Block for ${cid} unwanted`)) } + + /** + * @param {Block} block + */ const onBlock = (block) => { this.removeListener(unwantEvt, onUnwant) @@ -73,7 +87,7 @@ class Notifications extends EventEmitter { return reject(new Error(`Incorrect block received for ${cid}`)) } else if (cid.version !== block.cid.version || cid.codec !== block.cid.codec) { // right block but wrong version or codec - block = new Block(block.data, cid) + block = new IPLDBlock(block.data, cid) } resolve(block) @@ -107,3 +121,8 @@ class Notifications extends EventEmitter { } module.exports = Notifications + +/** + * @typedef {import('cids')} CID + * @typedef {import('peer-id')} PeerId + */ diff --git a/src/stats/index.js b/src/stats/index.js index a7a9e90f..cba5eb50 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -1,18 +1,39 @@ 'use strict' -const EventEmitter = require('events') +const { EventEmitter } = require('events') const Stat = require('./stat') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('ipfs-core-types/src/bitswap').Stats} API + */ + +/** + * @typedef {[number, number, number]} AverageIntervals + */ const defaultOptions = { - movingAverageIntervals: [ + enabled: false, + computeThrottleTimeout: 1000, + computeThrottleMaxQueueSize: 1000, + movingAverageIntervals: /** @type {AverageIntervals} */ ([ 60 * 1000, // 1 minute 5 * 60 * 1000, // 5 minutes 15 * 60 * 1000 // 15 minutes - ] + ]) } +/** + * @implements {API} + */ class Stats extends EventEmitter { - constructor (initialCounters, _options) { + /** + * @param {string[]} [initialCounters] + * @param {Object} _options + * @param {boolean} _options.enabled + * @param {number} _options.computeThrottleTimeout + * @param {number} _options.computeThrottleMaxQueueSize + */ + constructor (initialCounters = [], _options = defaultOptions) { super() const options = Object.assign({}, defaultOptions, _options) @@ -32,6 +53,7 @@ class Stats extends EventEmitter { this._global = new Stat(initialCounters, options) this._global.on('update', (stats) => this.emit('update', stats)) + /** @type {Map} */ this._peers = new Map() } @@ -63,14 +85,24 @@ class Stats extends EventEmitter { return this._global.movingAverages } + /** + * @param {PeerId|string} peerId + * @returns {Stat|void} + */ forPeer (peerId) { - if (peerId.toB58String) { - peerId = peerId.toB58String() - } + const peerIdStr = (typeof peerId !== 'string' && peerId.toB58String) + ? peerId.toB58String() + : `${peerId}` - return this._peers.get(peerId) + return this._peers.get(peerIdStr) } + /** + * + * @param {string|null} peer + * @param {string} counter + * @param {number} inc + */ push (peer, counter, inc) { if (this._enabled) { this._global.push(counter, inc) @@ -87,6 +119,9 @@ class Stats extends EventEmitter { } } + /** + * @param {PeerId} peer + */ disconnected (peer) { const peerId = peer.toB58String() const peerStats = this._peers.get(peerId) diff --git a/src/stats/stat.js b/src/stats/stat.js index da3e461e..1ce2f1e6 100644 --- a/src/stats/stat.js +++ b/src/stats/stat.js @@ -1,25 +1,43 @@ 'use strict' -const EventEmitter = require('events') -const Big = require('bignumber.js') +const { EventEmitter } = require('events') +const Big = require('bignumber.js').default const MovingAverage = require('moving-average') +/** + * @typedef {[string, number, number]} Op + */ + class Stats extends EventEmitter { + /** + * + * @param {string[]} initialCounters + * @param {Object} options + * @param {boolean} options.enabled + * @param {number} options.computeThrottleTimeout + * @param {number} options.computeThrottleMaxQueueSize + * @param {import('.').AverageIntervals} options.movingAverageIntervals + */ constructor (initialCounters, options) { super() this._options = options + /** @type {Op[]} */ this._queue = [] + /** @type {Record} */ this._stats = {} this._frequencyLastTime = Date.now() + /** @type {Record} */ this._frequencyAccumulators = {} + + /** @type {Record>} */ this._movingAverages = {} this._update = this._update.bind(this) initialCounters.forEach((key) => { - this._stats[key] = Big(0) + this._stats[key] = new Big(0) this._movingAverages[key] = {} this._options.movingAverageIntervals.forEach((interval) => { const ma = this._movingAverages[key][interval] = MovingAverage(interval) @@ -52,6 +70,10 @@ class Stats extends EventEmitter { return Object.assign({}, this._movingAverages) } + /** + * @param {string} counter + * @param {number} inc + */ push (counter, inc) { if (this._enabled) { this._queue.push([counter, inc, Date.now()]) @@ -59,6 +81,9 @@ class Stats extends EventEmitter { } } + /** + * @private + */ _resetComputeTimeout () { if (this._timeout) { clearTimeout(this._timeout) @@ -66,12 +91,19 @@ class Stats extends EventEmitter { this._timeout = setTimeout(this._update, this._nextTimeout()) } + /** + * @private + * @returns {number} + */ _nextTimeout () { // calculate the need for an update, depending on the queue length const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize return Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0) } + /** + * @private + */ _update () { this._timeout = null @@ -79,15 +111,19 @@ class Stats extends EventEmitter { let last while (this._queue.length) { const op = last = this._queue.shift() - this._applyOp(op) + op && this._applyOp(op) } - this._updateFrequency(last[2]) // contains timestamp of last op + last && this._updateFrequency(last[2]) // contains timestamp of last op this.emit('update', this._stats) } } + /** + * @private + * @param {number} latestTime + */ _updateFrequency (latestTime) { const timeDiff = latestTime - this._frequencyLastTime @@ -100,6 +136,13 @@ class Stats extends EventEmitter { this._frequencyLastTime = latestTime } + /** + * @private + * @param {string} key + * @param {number} timeDiffMS + * @param {number} latestTime + * @returns {void} + */ _updateFrequencyFor (key, timeDiffMS, latestTime) { const count = this._frequencyAccumulators[key] || 0 this._frequencyAccumulators[key] = 0 @@ -118,18 +161,22 @@ class Stats extends EventEmitter { }) } + /** + * @private + * @param {Op} op + */ _applyOp (op) { const key = op[0] const inc = op[1] if (typeof inc !== 'number') { - throw new Error('invalid increment number:', inc) + throw new Error(`invalid increment number: ${inc}`) } let n if (!Object.prototype.hasOwnProperty.call(this._stats, key)) { - n = this._stats[key] = Big(0) + n = this._stats[key] = new Big(0) } else { n = this._stats[key] } diff --git a/src/types/message/entry.js b/src/types/message/entry.js index c488ea84..65a509b6 100644 --- a/src/types/message/entry.js +++ b/src/types/message/entry.js @@ -3,6 +3,13 @@ const WantlistEntry = require('../wantlist').Entry module.exports = class BitswapMessageEntry { + /** + * @param {import('cids')} cid + * @param {number} priority + * @param {import('./message.proto').WantType} wantType + * @param {boolean} [cancel] + * @param {boolean} [sendDontHave] + */ constructor (cid, priority, wantType, cancel, sendDontHave) { this.entry = new WantlistEntry(cid, priority, wantType) this.cancel = Boolean(cancel) @@ -38,6 +45,9 @@ module.exports = class BitswapMessageEntry { return `BitswapMessageEntry ${cidStr} ` } + /** + * @param {this} other + */ equals (other) { return (this.cancel === other.cancel) && (this.sendDontHave === other.sendDontHave) && diff --git a/src/types/message/index.js b/src/types/message/index.js index c4be0c58..9d2e1ead 100644 --- a/src/types/message/index.js +++ b/src/types/message/index.js @@ -1,8 +1,9 @@ 'use strict' -const Block = require('ipld-block') +const IPLDBlock = require('ipld-block') const CID = require('cids') const { getName } = require('multicodec') +// @ts-ignore const vd = require('varint-decoder') const multihashing = require('multihashing-async') const { isMapEqual } = require('../../utils') @@ -10,10 +11,18 @@ const { Message } = require('./message.proto') const Entry = require('./entry') class BitswapMessage { + /** + * @param {boolean} full + */ constructor (full) { this.full = full + /** @type {Map} */ this.wantlist = new Map() + + /** @type {Map} */ this.blocks = new Map() + + /** @type {Map} */ this.blockPresences = new Map() this.pendingBytes = 0 } @@ -24,6 +33,15 @@ class BitswapMessage { this.blockPresences.size === 0 } + /** + * + * @param {CID} cid + * @param {number} priority + * @param {import('./message.proto').WantType} [wantType] + * @param {boolean} [cancel] + * @param {boolean} [sendDontHave] + * @returns {void} + */ addEntry (cid, priority, wantType, cancel, sendDontHave) { if (wantType == null) { wantType = BitswapMessage.WantType.Block @@ -53,11 +71,18 @@ class BitswapMessage { } } + /** + * @param {import('ipfs-core-types/src/block-service').Block} block + * @returns {void} + */ addBlock (block) { const cidStr = block.cid.toString('base58btc') this.blocks.set(cidStr, block) } + /** + * @param {CID} cid + */ addHave (cid) { const cidStr = cid.toString('base58btc') if (!this.blockPresences.has(cidStr)) { @@ -65,6 +90,9 @@ class BitswapMessage { } } + /** + * @param {CID} cid + */ addDontHave (cid) { const cidStr = cid.toString('base58btc') if (!this.blockPresences.has(cidStr)) { @@ -72,21 +100,30 @@ class BitswapMessage { } } + /** + * @param {CID} cid + */ cancel (cid) { const cidStr = cid.toString('base58btc') this.wantlist.delete(cidStr) this.addEntry(cid, 0, BitswapMessage.WantType.Block, true, false) } + /** + * @param {number} size + */ setPendingBytes (size) { this.pendingBytes = size } - /* + /** * Serializes to Bitswap Message protobuf of * version 1.0.0 + * + * @returns {Uint8Array} */ serializeToBitswap100 () { + /** @type {import('./message.proto').Message100} */ const msg = { wantlist: { entries: Array.from(this.wantlist.values()).map((entry) => { @@ -108,11 +145,14 @@ class BitswapMessage { return Message.encode(msg) } - /* + /** * Serializes to Bitswap Message protobuf of * version 1.1.0 + * + * @returns {Uint8Array} */ serializeToBitswap110 () { + /** @type {import('./message.proto').Message110} */ const msg = { wantlist: { entries: Array.from(this.wantlist.values()).map((entry) => { @@ -126,7 +166,8 @@ class BitswapMessage { }) }, blockPresences: [], - payload: [] + payload: [], + pendingBytes: this.pendingBytes } if (this.full) { @@ -154,11 +195,17 @@ class BitswapMessage { return Message.encode(msg) } + /** + * @param {BitswapMessage} other + * @returns {boolean} + */ equals (other) { if (this.full !== other.full || this.pendingBytes !== other.pendingBytes || !isMapEqual(this.wantlist, other.wantlist) || !isMapEqual(this.blocks, other.blocks) || + // @TODO - Is this a bug ? + // @ts-expect-error - isMap equals map values to be objects not numbers !isMapEqual(this.blockPresences, other.blockPresences) ) { return false @@ -174,6 +221,11 @@ class BitswapMessage { } } +/** + * + * @param {Uint8Array} raw + * @returns {Promise} + */ BitswapMessage.deserialize = async (raw) => { const decoded = Message.decode(raw) @@ -205,7 +257,7 @@ BitswapMessage.deserialize = async (raw) => { await Promise.all(decoded.blocks.map(async (b) => { const hash = await multihashing(b, 'sha2-256') const cid = new CID(hash) - msg.addBlock(new Block(b, cid)) + msg.addBlock(new IPLDBlock(b, cid)) })) return msg } @@ -223,7 +275,7 @@ BitswapMessage.deserialize = async (raw) => { // const hashLen = values[3] // We haven't need to use this so far const hash = await multihashing(p.data, hashAlg) const cid = new CID(cidVersion, getName(multicodec), hash) - msg.addBlock(new Block(p.data, cid)) + msg.addBlock(new IPLDBlock(p.data, cid)) })) msg.setPendingBytes(decoded.pendingBytes) return msg @@ -232,6 +284,9 @@ BitswapMessage.deserialize = async (raw) => { return msg } +/** + * @param {CID} cid + */ BitswapMessage.blockPresenceSize = (cid) => { // It's ok if this is not exactly right: it's used to estimate the size of // the HAVE / DONT_HAVE on the wire, but when doing that calculation we leave @@ -242,11 +297,15 @@ BitswapMessage.blockPresenceSize = (cid) => { BitswapMessage.Entry = Entry BitswapMessage.WantType = { + /** @type {import('./message.proto').WantBlock} */ Block: Message.Wantlist.WantType.Block, + /** @type {import('./message.proto').HaveBlock} */ Have: Message.Wantlist.WantType.Have } BitswapMessage.BlockPresenceType = { + /** @type {import('./message.proto').Have} */ Have: Message.BlockPresenceType.Have, + /** @type {import('./message.proto').DontHave} */ DontHave: Message.BlockPresenceType.DontHave } module.exports = BitswapMessage diff --git a/src/types/message/message.proto.d.ts b/src/types/message/message.proto.d.ts new file mode 100644 index 00000000..312df051 --- /dev/null +++ b/src/types/message/message.proto.d.ts @@ -0,0 +1,73 @@ + +export interface MessageProto { + decode: (bytes: Uint8Array) => MessageData + encode: (value: Message100|Message110) => Uint8Array + BlockPresenceType: { + Have: Have + DontHave: DontHave + } + Wantlist: { + WantType: { + Block: WantBlock + Have: HaveBlock + } + } +} + +export interface MessageData { + wantlist?: WantList + blockPresences: BlockPresence[] + + blocks: Uint8Array[] + payload: Block[] + + pendingBytes: number +} + +export interface Message110 { + wantlist: WantList + blockPresences: BlockPresence[] + + payload: Block[] + pendingBytes: number +} + +export interface Message100 { + wantlist: WantList + + blocks: Uint8Array[] +} + +export interface BlockPresence { + cid: Uint8Array + type: BlockPresenceType +} + +export interface WantList { + entries: Entry[] + full?: boolean +} + +export type WantBlock = 0 +export type HaveBlock = 1 +export type WantType = WantBlock | HaveBlock + +export type Have = 0 +export type DontHave = 1 +export type BlockPresenceType = Have | DontHave + +export interface Entry { + block: Uint8Array + priority: number + cancel: boolean + wantType?: WantType + sendDontHave?: boolean +} + +export interface Block { + prefix: Uint8Array + data: Uint8Array +} + +declare var Message: MessageProto +export { Message } diff --git a/src/types/message/message.proto.js b/src/types/message/message.proto.js index bb017d27..03e0d114 100644 --- a/src/types/message/message.proto.js +++ b/src/types/message/message.proto.js @@ -1,4 +1,6 @@ +// @ts-nocheck 'use strict' + const protons = require('protons') // from: https://github.com/ipfs/go-ipfs/blob/master/exchange/bitswap/message/pb/message.proto diff --git a/src/types/wantlist/entry.js b/src/types/wantlist/entry.js index 84d68826..2e46f188 100644 --- a/src/types/wantlist/entry.js +++ b/src/types/wantlist/entry.js @@ -1,6 +1,18 @@ 'use strict' +/** + * @typedef {import('ipfs-core-types/src/bitswap').WantListEntry} API + */ + +/** + * @implements {API} + */ class WantListEntry { + /** + * @param {import('cids')} cid + * @param {number} priority + * @param {import('../message/message.proto').WantType} wantType + */ constructor (cid, priority, wantType) { // Keep track of how many requests we have for this key this._refCounter = 1 @@ -28,10 +40,16 @@ class WantListEntry { return `WantlistEntry ` } + /** + * @param {API} other + * @returns {boolean} + */ equals (other) { + // @ts-expect-error _refCounter is not specified by the interface return (this._refCounter === other._refCounter) && this.cid.equals(other.cid) && this.priority === other.priority && + // @ts-expect-error - wantType is not specified by the interface this.wantType === other.wantType } } diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js index 3c4656b7..54408112 100644 --- a/src/types/wantlist/index.js +++ b/src/types/wantlist/index.js @@ -3,8 +3,17 @@ const { sortBy } = require('../../utils') const Entry = require('./entry') +/** + * @typedef {import('cids')} CID + */ + class Wantlist { + /** + * + * @param {import('../../stats')} [stats] + */ constructor (stats) { + /** @type {Map} */ this.set = new Map() this._stats = stats } @@ -13,6 +22,12 @@ class Wantlist { return this.set.size } + /** + * @param {CID} cid + * @param {number} priority + * @param {import('../message/message.proto').WantType} wantType + * @returns {void} + */ add (cid, priority, wantType) { // Have to import here to avoid circular reference const Message = require('../message') @@ -36,6 +51,10 @@ class Wantlist { } } + /** + * @param {CID} cid + * @returns {void} + */ remove (cid) { const cidStr = cid.toString('base58btc') const entry = this.set.get(cidStr) @@ -57,12 +76,18 @@ class Wantlist { } } + /** + * @param {string} cidStr + */ removeForce (cidStr) { if (this.set.has(cidStr)) { this.set.delete(cidStr) } } + /** + * @param {(entry:Entry, key:string) => void} fn + */ forEach (fn) { return this.set.forEach(fn) } @@ -72,9 +97,16 @@ class Wantlist { } sortedEntries () { + // TODO: Figure out if this is an actual bug. + // @ts-expect-error - Property 'key' does not exist on type 'WantListEntry' return new Map(sortBy(o => o[1].key, Array.from(this.set.entries()))) } + /** + * + * @param {CID} cid + * @returns {Entry|undefined} + */ contains (cid) { const cidStr = cid.toString('base58btc') return this.set.get(cidStr) diff --git a/src/utils/index.js b/src/utils/index.js index 2c482777..9f66f2e0 100644 --- a/src/utils/index.js +++ b/src/utils/index.js @@ -6,11 +6,8 @@ const uint8ArrayEquals = require('uint8arrays/equals') /** * Creates a logger for the given subsystem * - * @param {PeerId} [id] + * @param {import('peer-id')} [id] * @param {string} [subsystem] - * @returns {debug} - * - * @private */ const logger = (id, subsystem) => { const name = ['bitswap'] @@ -20,12 +17,19 @@ const logger = (id, subsystem) => { if (id) { name.push(`${id.toB58String().slice(0, 8)}`) } - const logger = debug(name.join(':')) - logger.error = debug(name.concat(['error']).join(':')) - return logger + return Object.assign(debug(name.join(':')), { + error: debug(name.concat(['error']).join(':')) + }) } +/** + * @template X, T + * @param {(x:X, t:T) => boolean} pred + * @param {X} x + * @param {T[]} list + * @returns {boolean} + */ const includesWith = (pred, x, list) => { let idx = 0 const len = list.length @@ -38,6 +42,12 @@ const includesWith = (pred, x, list) => { return false } +/** + * @template T + * @param {(x:T, t:T) => boolean} pred + * @param {T[]} list + * @returns {T[]} + */ const uniqWith = (pred, list) => { let idx = 0 const len = list.length @@ -54,6 +64,13 @@ const uniqWith = (pred, list) => { return result } +/** + * @template {string|number|symbol} K + * @template V + * @param {(v:V) => K} pred + * @param {V[]} list + * @returns {Record} + */ const groupBy = (pred, list) => { return list.reduce((acc, v) => { const k = pred(v) @@ -64,15 +81,28 @@ const groupBy = (pred, list) => { acc[k] = [v] } return acc - }, {}) + }, /** @type {Record} */({})) } +/** + * @template T, E + * @param {(a:T, b:E) => boolean} pred + * @param {T[]} list + * @param {E[]} values + * @returns {T[]} + */ const pullAllWith = (pred, list, values) => { return list.filter(i => { return !includesWith(pred, i, values) }) } +/** + * @template T + * @param {(v:T) => number} fn + * @param {T[]} list + * @returns {T[]} + */ const sortBy = (fn, list) => { return Array.prototype.slice.call(list, 0).sort((a, b) => { const aa = fn(a) @@ -84,8 +114,9 @@ const sortBy = (fn, list) => { /** * Is equal for Maps of BitswapMessageEntry or Blocks * - * @param {Map} a - * @param {Map} b + * @template {{data?:Uint8Array, equals?: (value:any) => boolean}} T + * @param {Map} a + * @param {Map} b * @returns {boolean} */ const isMapEqual = (a, b) => { @@ -94,18 +125,18 @@ const isMapEqual = (a, b) => { } for (const [key, valueA] of a) { - if (!b.has(key)) { + const valueB = b.get(key) + + if (valueB === undefined) { return false } - const valueB = b.get(key) - // Support BitswapMessageEntry if (typeof valueA.equals === 'function' && !valueA.equals(valueB)) { return false } // Support Blocks - if (valueA._data && !uint8ArrayEquals(valueA._data, valueB._data)) { + if (valueA.data && !(valueB.data && uint8ArrayEquals(valueA.data, valueB.data))) { return false } } diff --git a/src/utils/sorted-map.js b/src/utils/sorted-map.js index 7b0fc627..bfada300 100644 --- a/src/utils/sorted-map.js +++ b/src/utils/sorted-map.js @@ -1,16 +1,19 @@ 'use strict' /** + * @template Key, Value * SortedMap is a Map whose iterator order can be defined by the user + * @extends {Map} */ class SortedMap extends Map { /** - * @param {Array} [entries] - * @param {function(a, b)} [cmp] - compares [k1, v1] to [k2, v2] + * @param {Array<[Key, Value]>} [entries] + * @param {(a:[Key, Value], b:[Key, Value]) => number} [cmp] - compares [k1, v1] to [k2, v2] */ constructor (entries, cmp) { super() this._cmp = cmp || this._defaultSort + /** @type {Key[]} */ this._keys = [] for (const [k, v] of entries || []) { this.set(k, v) @@ -23,7 +26,8 @@ class SortedMap extends Map { * priority changes, call update. * Call indexOf() to get the index _before_ the change happens. * - * @param {Object} i - the index of entry whose position should be updated. + * @param {number} i - the index of entry whose position should be updated. + * @returns {void} */ update (i) { if (i < 0 || i >= this._keys.length) { @@ -36,6 +40,10 @@ class SortedMap extends Map { this._keys.splice(newIdx, 0, k) } + /** + * @param {Key} k + * @param {Value} v + */ set (k, v) { // If the key is already in the map, remove it from the ordering and // re-insert it below @@ -50,6 +58,8 @@ class SortedMap extends Map { // Find the correct position of the newly inserted k/v in the order const i = this._find(k) this._keys.splice(i, 0, k) + + return this } clear () { @@ -57,16 +67,23 @@ class SortedMap extends Map { this._keys = [] } + /** + * @param {Key} k + */ delete (k) { if (!this.has(k)) { - return + return false } const i = this.indexOf(k) this._keys.splice(i, 1) - super.delete(k) + return super.delete(k) } + /** + * @param {Key} k + * @returns {number} + */ indexOf (k) { if (!this.has(k)) { return -1 @@ -88,6 +105,12 @@ class SortedMap extends Map { return -1 // should never happen for existing key } + /** + * @private + * @param {Key} k + * @returns {number} + */ + _find (k) { let lower = 0 let upper = this._keys.length @@ -106,47 +129,88 @@ class SortedMap extends Map { return lower } + /** + * @returns {IterableIterator} + */ * keys () { for (const k of this._keys) { yield k } + + return undefined } + /** + * @returns {IterableIterator} + */ * values () { for (const k of this._keys) { + // @ts-ignore - return of `this.get(k)` is `Value|undefined` which is + // incompatible with `Value`. Typechecker can't that this contains values + // for all the `_keys`. ts(2322) yield this.get(k) } + + return undefined } + /** + * @returns {IterableIterator<[Key, Value]>} + */ * entries () { for (const k of this._keys) { + // @ts-ignore - return of `this.get(k)` is `Value|undefined` which is + // incompatible with `Value`. Typechecker can't that this contains values + // for all the `_keys`. ts(2322) yield [k, this.get(k)] } + + return undefined } * [Symbol.iterator] () { yield * this.entries() } + /** + * @template This + * @param {(entry:[Key, Value]) => void} cb + * @param {This} [thisArg] + */ + // @ts-expect-error - Callback in Map forEach is (V, K, Map) => void forEach (cb, thisArg) { if (!cb) { return } for (const k of this._keys) { - cb.apply(thisArg, [[k, this.get(k)]]) + cb.apply(thisArg, [[k, /** @type {Value} */(this.get(k))]]) } } + /** + * @private + * @param {[Key, Value]} a + * @param {[Key, Value]} b + * @returns {0|1|-1} + */ _defaultSort (a, b) { if (a[0] < b[0]) return -1 if (b[0] < a[0]) return 1 return 0 } + /** + * @private + * @param {Key} a + * @param {Key} b + * @returns {number} + */ _kCmp (a, b) { return this._cmp( + // @ts-ignore - get may return undefined [a, this.get(a)], + // @ts-ignore - get may return undefined [b, this.get(b)] ) } diff --git a/src/want-manager/index.js b/src/want-manager/index.js index 2e9edfba..30a26c5d 100644 --- a/src/want-manager/index.js +++ b/src/want-manager/index.js @@ -6,8 +6,20 @@ const CONSTANTS = require('../constants') const MsgQueue = require('./msg-queue') const logger = require('../utils').logger +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('cids')} CID + */ + module.exports = class WantManager { + /** + * + * @param {PeerId} peerId + * @param {import('../network')} network + * @param {import('../stats')} stats + */ constructor (peerId, network, stats) { + /** @type {Map} */ this.peers = new Map() this.wantlist = new Wantlist(stats) @@ -18,6 +30,12 @@ module.exports = class WantManager { this._log = logger(peerId, 'want') } + /** + * @private + * @param {CID[]} cids + * @param {boolean} cancel + * @param {boolean} [force] + */ _addEntries (cids, cancel, force) { const entries = cids.map((cid, i) => { return new Message.Entry(cid, CONSTANTS.kMaxPriority - i, Message.WantType.Block, cancel) @@ -27,12 +45,14 @@ module.exports = class WantManager { // add changes to our wantlist if (e.cancel) { if (force) { - this.wantlist.removeForce(e.cid) + this.wantlist.removeForce(e.cid.toString()) } else { this.wantlist.remove(e.cid) } } else { this._log('adding to wl') + // TODO: Figure out the wantType + // @ts-expect-error - requires wantType this.wantlist.add(e.cid, e.priority) } }) @@ -43,6 +63,10 @@ module.exports = class WantManager { } } + /** + * @private + * @param {PeerId} peerId + */ _startPeerHandler (peerId) { let mq = this.peers.get(peerId.toB58String()) @@ -66,6 +90,10 @@ module.exports = class WantManager { return mq } + /** + * @private + * @param {PeerId} peerId + */ _stopPeerHandler (peerId) { const mq = this.peers.get(peerId.toB58String()) @@ -81,7 +109,13 @@ module.exports = class WantManager { this.peers.delete(peerId.toB58String()) } - // add all the cids to the wantlist + /** + * add all the cids to the wantlist + * + * @param {CID[]} cids + * @param {Object} [options] + * @param {AbortSignal} [options.signal] + */ wantBlocks (cids, options = {}) { this._addEntries(cids, false) @@ -92,27 +126,46 @@ module.exports = class WantManager { } } - // remove blocks of all the given keys without respecting refcounts + /** + * Remove blocks of all the given keys without respecting refcounts + * + * @param {CID[]} cids + */ unwantBlocks (cids) { this._log('unwant blocks: %s', cids.length) this._addEntries(cids, true, true) } - // cancel wanting all of the given keys + /** + * Cancel wanting all of the given keys + * + * @param {CID[]} cids + */ cancelWants (cids) { this._log('cancel wants: %s', cids.length) this._addEntries(cids, true) } - // Returns a list of all currently connected peers + /** + * Returns a list of all currently connected peers + * + * @returns {string[]} + */ connectedPeers () { return Array.from(this.peers.keys()) } + /** + * @param {PeerId} peerId + */ connected (peerId) { this._startPeerHandler(peerId) } + /** + * @param {PeerId} peerId + */ + disconnected (peerId) { this._stopPeerHandler(peerId) } @@ -122,7 +175,5 @@ module.exports = class WantManager { stop () { this.peers.forEach((mq) => this.disconnected(mq.peerId)) - - clearInterval(this.timer) } } diff --git a/src/want-manager/msg-queue.js b/src/want-manager/msg-queue.js index 54d1e58e..c3f51bd1 100644 --- a/src/want-manager/msg-queue.js +++ b/src/want-manager/msg-queue.js @@ -1,22 +1,42 @@ 'use strict' +// @ts-ignore const debounce = require('just-debounce-it') const Message = require('../types/message') const logger = require('../utils').logger const { wantlistSendDebounceMs } = require('../constants') +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('cids')} CID + * @typedef {import('../network')} Network + */ + module.exports = class MsgQueue { + /** + * @param {PeerId} selfPeerId + * @param {PeerId} otherPeerId + * @param {Network} network + */ constructor (selfPeerId, otherPeerId, network) { this.peerId = otherPeerId this.network = network this.refcnt = 1 + /** + * @private + * @type {{cid:CID, priority:number, cancel?:boolean}[]} + */ this._entries = [] - this._log = logger(selfPeerId, 'msgqueue', otherPeerId.toB58String().slice(0, 8)) + /** @private */ + this._log = logger(selfPeerId, 'msgqueue') this.sendEntries = debounce(this._sendEntries.bind(this), wantlistSendDebounceMs) } + /** + * @param {Message} msg + */ addMessage (msg) { if (msg.empty) { return @@ -25,11 +45,17 @@ module.exports = class MsgQueue { this.send(msg) } + /** + * @param {{cid:CID, priority:number}[]} entries + */ addEntries (entries) { this._entries = this._entries.concat(entries) this.sendEntries() } + /** + * @private + */ _sendEntries () { if (!this._entries.length) { return @@ -47,6 +73,9 @@ module.exports = class MsgQueue { this.addMessage(msg) } + /** + * @param {Message} msg + */ async send (msg) { try { await this.network.connectTo(this.peerId) diff --git a/test/benchmarks/get-many.js b/test/benchmarks/get-many.js index 46c7871f..0d063aad 100644 --- a/test/benchmarks/get-many.js +++ b/test/benchmarks/get-many.js @@ -4,7 +4,7 @@ const distributionTest = require('../utils/distribution-test') const print = require('./helpers/print-swarm-results') -const EventEmitter = require('events') +const { EventEmitter } = require('events') ;(async function () { const emitter = new EventEmitter() diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 07f91b76..d014b9e1 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -11,7 +11,7 @@ const Message = require('../src/types/message') const Bitswap = require('../src') const CID = require('cids') const Block = require('ipld-block') -const AbortController = require('abort-controller') +const { AbortController } = require('native-abort-controller') const delay = require('delay') const createTempRepo = require('./utils/create-temp-repo-nodejs') @@ -20,11 +20,11 @@ const applyNetwork = require('./utils/mocks').applyNetwork const mockLibp2pNode = require('./utils/mocks').mockLibp2pNode const storeHasBlocks = require('./utils/store-has-blocks') const makeBlock = require('./utils/make-block') -const makePeerId = require('./utils/make-peer-id') +const { makePeerIds } = require('./utils/make-peer-id') const orderedFinish = require('./utils/helpers').orderedFinish function wantsBlock (cid, bitswap) { - for (const [key, value] of bitswap.getWantlist()) { // eslint-disable-line no-unused-vars + for (const [, value] of bitswap.getWantlist()) { if (value.cid.toString() === cid.toString()) { return true } @@ -43,7 +43,7 @@ describe('bitswap with mocks', function () { before(async () => { repo = await createTempRepo() blocks = await makeBlock(15) - ids = await makePeerId(2) + ids = await makePeerIds(2) }) after(() => repo.teardown()) @@ -112,7 +112,7 @@ describe('bitswap with mocks', function () { bs.start() - const others = await makePeerId(5) + const others = await makePeerIds(5) const blocks = await makeBlock(10) const messages = await Promise.all(range(5).map((i) => { @@ -182,6 +182,7 @@ describe('bitswap with mocks', function () { it('fails on requesting empty block', async () => { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) try { + // @ts-expect-error we want this to fail await bs.get(null) } catch (err) { expect(err).to.exist() @@ -243,7 +244,7 @@ describe('bitswap with mocks', function () { setTimeout(() => { finish(1) - bs.put(block, () => {}) + bs.put(block) }, 200) const res = await get @@ -330,7 +331,7 @@ describe('bitswap with mocks', function () { const p1 = bs1.get(block.cid) setTimeout(() => { - bs2.put(block, () => {}) + bs2.put(block) }, 1000) const b1 = await p1 expect(b1).to.eql(block) diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index f61ea766..c9597cb9 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -9,7 +9,7 @@ const Bitswap = require('../src') const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') -const makePeerId = require('./utils/make-peer-id') +const { makePeerIds } = require('./utils/make-peer-id') const expectedStats = [ 'blocksReceived', @@ -39,7 +39,7 @@ describe('bitswap stats', () => { before(async () => { const nodes = [0, 1] blocks = await makeBlock(2) - ids = await makePeerId(2) + ids = await makePeerIds(2) // create 2 temp repos repos = await Promise.all(nodes.map(() => createTempRepo())) diff --git a/test/bitswap.js b/test/bitswap.js index 727c4097..8d8e5b08 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -83,7 +83,7 @@ describe('bitswap without DHT', function () { // incoming message with requested block from the other peer const message = new Message(false) - message.addEntry(block.cid, 1, false) + message.addEntry(block.cid, 1, Message.WantType.Block) message.addBlock(block) // slow blockstore diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index 04b7f8c6..12edc896 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -12,12 +12,14 @@ const multihashing = require('multihashing-async') const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') const drain = require('it-drain') +const defer = require('p-defer') const Message = require('../../src/types/message') const DecisionEngine = require('../../src/decision-engine') +const Stats = require('../../src/stats') const createTempRepo = require('../utils/create-temp-repo-nodejs.js') const makeBlock = require('../utils/make-block') -const makePeerId = require('../utils/make-peer-id') +const { makePeerId, makePeerIds } = require('../utils/make-peer-id') const mockNetwork = require('../utils/mocks').mockNetwork const sum = (nums) => nums.reduce((a, b) => a + b, 0) @@ -31,6 +33,10 @@ function stringifyMessages (messages) { return flatten(messages.map(messageToString)) } +/** + * + * @param {import('../../src/network')} network + */ async function newEngine (network) { const results = await Promise.all([ createTempRepo(), @@ -38,7 +44,7 @@ async function newEngine (network) { ]) const blockstore = results[0].blocks const peerId = results[1] - const engine = new DecisionEngine(peerId, blockstore, network || mockNetwork()) + const engine = new DecisionEngine(peerId, blockstore, network, new Stats()) engine.start() return { peer: peerId, engine: engine } } @@ -46,8 +52,8 @@ async function newEngine (network) { describe('Engine', () => { it('consistent accounting', async () => { const res = await Promise.all([ - newEngine(false), - newEngine(false) + newEngine(mockNetwork()), + newEngine(mockNetwork()) ]) const sender = res[0] @@ -79,8 +85,8 @@ describe('Engine', () => { it('peer is added to peers when message received or sent', async () => { const res = await Promise.all([ - newEngine(false), - newEngine(false) + newEngine(mockNetwork()), + newEngine(mockNetwork()) ]) const sanfrancisco = res[0] @@ -149,19 +155,15 @@ describe('Engine', () => { const set = testcase[0] const cancels = testcase[1] const keeps = difference(set, cancels) - - let network - const done = new Promise((resolve, reject) => { - network = mockNetwork(1, (res) => { - const msgs = stringifyMessages(res.messages) - expect(msgs.sort()).to.eql(keeps.sort()) - resolve() - }) + const deferred = defer() + const network = mockNetwork(1, (res) => { + const msgs = stringifyMessages(res.messages) + expect(msgs.sort()).to.eql(keeps.sort()) + deferred.resolve() }) - const id = await PeerId.create({ bits: 512 }) const repo = await createTempRepo() - const dEngine = new DecisionEngine(id, repo.blocks, network) + const dEngine = new DecisionEngine(id, repo.blocks, network, new Stats()) dEngine.start() // Send wants then cancels for some of the wants @@ -171,14 +173,14 @@ describe('Engine', () => { // Simulate receiving blocks from the network await peerSendsBlocks(dEngine, repo, blocks, somePeer) - await done + await deferred.promise } } }) it('round-robins incoming wants', async () => { - const id = await makePeerId(1)[0] - const peers = await makePeerId(3) + const id = await makePeerId() + const peers = await makePeerIds(3) const blockSize = 256 * 1024 const blocks = await makeBlock(20, blockSize) @@ -194,76 +196,90 @@ describe('Engine', () => { const repo = await createTempRepo() await drain(repo.blocks.putMany(blocks)) - let network let rcvdBlockCount = 0 const received = new Map(peers.map(p => [p.toB58String(), { count: 0, bytes: 0 }])) - const done = new Promise((resolve) => { - network = mockNetwork(blocks.length, null, ([peer, msg]) => { - const pid = peer.toB58String() - const rcvd = received.get(pid) - - // Blocks should arrive in priority order. - // Note: we requested the blocks such that the priority order was - // highest at the start to lowest at the end. - for (const block of msg.blocks.values()) { - expect(blockIndex(block)).to.gte(rcvd.count) - } + const deferred = defer() + const network = mockNetwork(blocks.length, undefined, ([peer, msg]) => { + const pid = peer.toB58String() + const rcvd = received.get(pid) - rcvd.count += msg.blocks.size - rcvd.bytes += sum([...msg.blocks.values()].map(b => b.data.length)) + if (!rcvd) { + return deferred.reject(new Error(`Could not get received for peer ${pid}`)) + } + + // Blocks should arrive in priority order. + // Note: we requested the blocks such that the priority order was + // highest at the start to lowest at the end. + for (const block of msg.blocks.values()) { + expect(blockIndex(block)).to.gte(rcvd.count) + } - // pendingBytes should be equal to the remaining data we're expecting - expect(msg.pendingBytes).to.eql(blockSize * blocks.length - rcvd.bytes) + rcvd.count += msg.blocks.size + rcvd.bytes += sum([...msg.blocks.values()].map(b => b.data.length)) - // Expect each peer to receive blocks in a roughly round-robin fashion, - // in other words one peer shouldn't receive a bunch more blocks than - // the others at any given time. - for (const p of peers) { - if (p !== peer) { - const pCount = received.get(p.toB58String()).count - expect(rcvd.count - pCount).to.lt(blocks.length * 0.8) + // pendingBytes should be equal to the remaining data we're expecting + expect(msg.pendingBytes).to.eql(blockSize * blocks.length - rcvd.bytes) + + // Expect each peer to receive blocks in a roughly round-robin fashion, + // in other words one peer shouldn't receive a bunch more blocks than + // the others at any given time. + for (const p of peers) { + if (p !== peer) { + const peerCount = received.get(p.toB58String()) + + if (!peerCount) { + return deferred.reject(new Error(`Could not get peer count for ${p.toB58String()}`)) } + + const pCount = peerCount.count + expect(rcvd.count - pCount).to.lt(blocks.length * 0.8) } + } + + // When all peers have received all the blocks, we're done + rcvdBlockCount += msg.blocks.size + if (rcvdBlockCount === blocks.length * peers.length) { + // Make sure each peer received all blocks it was expecting + for (const peer of peers) { + const pid = peer.toB58String() + const rcvd = received.get(pid) - // When all peers have received all the blocks, we're done - rcvdBlockCount += msg.blocks.size - if (rcvdBlockCount === blocks.length * peers.length) { - // Make sure each peer received all blocks it was expecting - for (const peer of peers) { - const pid = peer.toB58String() - const rcvd = received.get(pid) - expect(rcvd.count).to.eql(blocks.length) + if (!rcvd) { + return deferred.reject(new Error(`Could not get peer count for ${pid}`)) } - resolve() + + expect(rcvd.count).to.eql(blocks.length) } - }) + + deferred.resolve() + } }) - const dEngine = new DecisionEngine(id, repo.blocks, network) + const dEngine = new DecisionEngine(id, repo.blocks, network, new Stats()) dEngine.start() // Each peer requests all blocks for (const peer of peers) { const message = new Message(false) + for (const [i, block] of blocks.entries()) { message.addEntry(block.cid, blocks.length - i) } + await dEngine.messageReceived(peer, message) } - await done + await deferred.promise }) it('sends received blocks to peers that want them', async () => { - const [id, peer] = await makePeerId(2) + const [id, peer] = await makePeerIds(2) const blocks = await makeBlock(4, 8 * 1024) - let network - const receiveMessage = new Promise(resolve => { - network = mockNetwork(blocks.length, null, resolve) - }) + const deferred = defer() + const network = mockNetwork(blocks.length, undefined, ([peer, msg]) => deferred.resolve([peer, msg])) const repo = await createTempRepo() - const dEngine = new DecisionEngine(id, repo.blocks, network, null, { maxSizeReplaceHasWithBlock: 0 }) + const dEngine = new DecisionEngine(id, repo.blocks, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) dEngine.start() const message = new Message(false) @@ -280,7 +296,7 @@ describe('Engine', () => { await dEngine.receivedBlocks(rcvdBlocks) // Wait till the engine sends a message - const [toPeer, msg] = await receiveMessage + const [toPeer, msg] = await deferred.promise // Expect the message to be sent to the peer that wanted the blocks expect(toPeer.toB58String()).to.eql(peer.toB58String()) @@ -294,18 +310,18 @@ describe('Engine', () => { }) it('sends DONT_HAVE', async () => { - const [id, peer] = await makePeerId(2) + const [id, peer] = await makePeerIds(2) const blocks = await makeBlock(4, 8 * 1024) let onMsg const receiveMessage = () => new Promise(resolve => { onMsg = resolve }) - const network = mockNetwork(blocks.length, null, (res) => { + const network = mockNetwork(blocks.length, undefined, (res) => { onMsg(res) }) const repo = await createTempRepo() - const dEngine = new DecisionEngine(id, repo.blocks, network, null, { maxSizeReplaceHasWithBlock: 0 }) + const dEngine = new DecisionEngine(id, repo.blocks, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) dEngine.start() const message = new Message(false) @@ -344,7 +360,7 @@ describe('Engine', () => { }) it('handles want-have and want-block', async () => { - const [id, partner] = await makePeerId(2) + const [id, partner] = await makePeerIds(2) const alphabet = 'abcdefghijklmnopqrstuvwxyz' const vowels = 'aeiou' @@ -619,14 +635,14 @@ describe('Engine', () => { dEngine._processTasks() }) } - const network = mockNetwork(blocks.length, null, ([peer, msg]) => { + const network = mockNetwork(blocks.length, undefined, ([peer, msg]) => { onMsg && onMsg(msg) onMsg = undefined }) const repo = await createTempRepo() await drain(repo.blocks.putMany(blocks)) - const dEngine = new DecisionEngine(id, repo.blocks, network, null, { maxSizeReplaceHasWithBlock: 0 }) + const dEngine = new DecisionEngine(id, repo.blocks, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) dEngine._scheduleProcessTasks = () => {} dEngine.start() @@ -698,7 +714,7 @@ describe('Engine', () => { // who is in the network const us = await newEngine(network) - const them = await newEngine() + const them = await newEngine(mockNetwork()) // add a block to our blockstore const data = uint8ArrayFromString(`this is message ${Date.now()}`) @@ -707,15 +723,11 @@ describe('Engine', () => { const block = new Block(data, cid) await us.engine.blockstore.put(block) + const message = new Message(false) + message.addEntry(cid, 1, Message.WantType.Block, false, false) + // receive a message with a want for our block - await us.engine.messageReceived(them.peer, { - blocks: [], - wantlist: [{ - cid, - priority: 1, - wantType: 'wanty' - }] - }) + await us.engine.messageReceived(them.peer, message) // should have added a task for the remote peer const tasks = us.engine._requestQueue._byPeer.get(them.peer.toB58String()) diff --git a/test/decision-engine/req-queue.spec.js b/test/decision-engine/req-queue.spec.js index 164dde3a..a335c5cd 100644 --- a/test/decision-engine/req-queue.spec.js +++ b/test/decision-engine/req-queue.spec.js @@ -32,15 +32,33 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 10, - priority: 3 + priority: 3, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 5, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'c', size: 5, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) const { peerId, tasks, pendingSize } = rq.popTasks(11) @@ -59,11 +77,23 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) const { tasks, pendingSize } = rq.popTasks(0) @@ -77,7 +107,13 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) const res = rq.popTasks(1) @@ -95,15 +131,33 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 10 + priority: 10, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 5 + priority: 5, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'c', size: 1, - priority: 7 + priority: 7, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) const { peerId, tasks, pendingSize } = rq.popTasks(10) @@ -118,7 +172,13 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) // Pop all tasks for peer0 @@ -129,7 +189,13 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'b', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) // Pop tasks for peer0 @@ -143,29 +209,65 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 5, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[1], [{ topic: 'b', size: 10, - priority: 3 + priority: 3, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'c', size: 3, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'd', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[2], [{ topic: 'e', size: 7, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'f', size: 2, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) // Active Pending @@ -241,17 +343,35 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 0, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[1], [{ topic: 'a', size: 0, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) // _byPeer map should have been resorted to put peer0 @@ -267,25 +387,55 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[1], [{ topic: 'a', size: 1, - priority: 3 + priority: 3, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'c', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.remove('a', peerIds[0]) @@ -308,7 +458,13 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.remove('a', peerIds[1]) @@ -323,7 +479,13 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.remove('b', peerIds[0]) @@ -340,17 +502,35 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[0], [{ topic: 'b', size: 1, - priority: 3 + priority: 3, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) const { tasks } = rq.popTasks(10) @@ -363,21 +543,45 @@ describe('Request Queue', () => { rq.pushTasks(peerIds[0], [{ topic: 'a', size: 2, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'b', size: 1, - priority: 1 + priority: 1, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) rq.pushTasks(peerIds[1], [{ topic: 'c', size: 1, - priority: 3 + priority: 3, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }, { topic: 'd', size: 1, - priority: 2 + priority: 2, + data: { + blockSize: 0, + haveBlock: false, + isWantBlock: false, + sendDontHave: false + } }]) // Pop one task for each peer diff --git a/test/network/network.node.js b/test/network/network.node.js index 4a44830c..2b72b6b7 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -3,14 +3,19 @@ const { expect, assert } = require('aegir/utils/chai') const lp = require('it-length-prefixed') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const pDefer = require('p-defer') const createLibp2pNode = require('../utils/create-libp2p-node') const makeBlock = require('../utils/make-block') const Network = require('../../src/network') const Message = require('../../src/types/message') +const Stats = require('../../src/stats') +/** + * @returns {import('../../src')} + */ function createBitswapMock () { + // @ts-ignore return { _receiveMessage: async () => {}, _receiveError: async () => {}, @@ -46,10 +51,10 @@ describe('network', () => { bitswapMockB = createBitswapMock() bitswapMockC = createBitswapMock() - networkA = new Network(p2pA, bitswapMockA) - networkB = new Network(p2pB, bitswapMockB) + networkA = new Network(p2pA, bitswapMockA, new Stats()) + networkB = new Network(p2pB, bitswapMockB, new Stats()) // only bitswap100 - networkC = new Network(p2pC, bitswapMockC, { b100Only: true }) + networkC = new Network(p2pC, bitswapMockC, new Stats(), { b100Only: true }) networkA.start() networkB.start() @@ -165,6 +170,7 @@ describe('network', () => { const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}` const { stream } = await p2pA.dialProtocol(ma, '/ipfs/bitswap/' + version.num) + await pipe( [version.serialize(msg)], lp.encode(), @@ -238,11 +244,11 @@ describe('network', () => { }) it('dials to peer using Bitswap 1.2.0', async () => { - networkA = new Network(p2pA, bitswapMockA) + networkA = new Network(p2pA, bitswapMockA, new Stats()) // only supports 1.2.0 - networkB = new Network(p2pB, bitswapMockB) - networkB.protocols = ['/ipfs/bitswap/1.2.0'] + networkB = new Network(p2pB, bitswapMockB, new Stats()) + networkB._protocols = ['/ipfs/bitswap/1.2.0'] networkA.start() networkB.start() diff --git a/test/notifications.spec.js b/test/notifications.spec.js index 2f31826e..edb6f0d1 100644 --- a/test/notifications.spec.js +++ b/test/notifications.spec.js @@ -4,13 +4,13 @@ const { expect } = require('aegir/utils/chai') const CID = require('cids') const Block = require('ipld-block') -const AbortController = require('abort-controller') +const { AbortController } = require('native-abort-controller') const uint8ArrayToString = require('uint8arrays/to-string') const Notifications = require('../src/notifications') const makeBlock = require('./utils/make-block') -const makePeerId = require('./utils/make-peer-id') +const { makePeerId } = require('./utils/make-peer-id') describe('Notifications', () => { let blocks @@ -54,7 +54,7 @@ describe('Notifications', () => { }) it('unwant block', async () => { - const n = new Notifications() + const n = new Notifications(peerId) const b = blocks[0] const p = n.wantBlock(b.cid) @@ -65,7 +65,7 @@ describe('Notifications', () => { }) it('abort block want', async () => { - const n = new Notifications() + const n = new Notifications(peerId) const b = blocks[0] const controller = new AbortController() @@ -103,7 +103,7 @@ describe('Notifications', () => { }) it('unwant block', async () => { - const n = new Notifications() + const n = new Notifications(peerId) const cid = new CID(blocks[0].cid.toV1().toString('base64')) const b = new Block(blocks[0].data, cid) diff --git a/test/swarms.js b/test/swarms.js index cef9f218..5cf93a5c 100644 --- a/test/swarms.js +++ b/test/swarms.js @@ -5,7 +5,7 @@ const stats = require('stats-lite') const distributionTest = require('./utils/distribution-test') -const EventEmitter = require('events') +const { EventEmitter } = require('events') const test = it describe.skip('swarms', () => { diff --git a/test/types/message.spec.js b/test/types/message.spec.js index d3a3f551..7dcc84af 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -5,7 +5,7 @@ const { expect } = require('aegir/utils/chai') const CID = require('cids') const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayEquals = require('uint8arrays/equals') -const loadFixture = require('aegir/fixtures') +const loadFixture = require('aegir/utils/fixtures') const testDataPath = 'test/fixtures/serialized-from-go' const rawMessageFullWantlist = loadFixture(testDataPath + '/bitswap110-message-full-wantlist') const rawMessageOneBlock = loadFixture(testDataPath + '/bitswap110-message-one-block') @@ -46,10 +46,10 @@ describe('BitswapMessage', () => { msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, false, false) msg.addEntry(cids[0], 2, BitswapMessage.WantType.Have, true, false) - expect(msg.wantlist.get(cids[0].toString('base58btc')).priority).to.eql(1) + expect(msg.wantlist.get(cids[0].toString('base58btc'))).to.have.property('priority', 1) msg.addEntry(cids[0], 2, BitswapMessage.WantType.Block, true, false) - expect(msg.wantlist.get(cids[0].toString('base58btc')).priority).to.eql(2) + expect(msg.wantlist.get(cids[0].toString('base58btc'))).to.have.property('priority', 2) }) it('only changes from dont cancel to do cancel', () => { @@ -57,11 +57,11 @@ describe('BitswapMessage', () => { msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, true, false) msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, false, false) - expect(msg.wantlist.get(cids[0].toString('base58btc')).cancel).to.eql(true) + expect(msg.wantlist.get(cids[0].toString('base58btc'))).to.have.property('cancel', true) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Block, false, false) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Block, true, false) - expect(msg.wantlist.get(cids[1].toString('base58btc')).cancel).to.eql(true) + expect(msg.wantlist.get(cids[1].toString('base58btc'))).to.have.property('cancel', true) }) it('only changes from dont send to do send DONT_HAVE', () => { @@ -69,11 +69,11 @@ describe('BitswapMessage', () => { msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, false, false) msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, false, true) - expect(msg.wantlist.get(cids[0].toString('base58btc')).sendDontHave).to.eql(true) + expect(msg.wantlist.get(cids[0].toString('base58btc'))).to.have.property('sendDontHave', true) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Block, false, true) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Block, false, false) - expect(msg.wantlist.get(cids[1].toString('base58btc')).sendDontHave).to.eql(true) + expect(msg.wantlist.get(cids[1].toString('base58btc'))).to.have.property('sendDontHave', true) }) it('only override want-have with want-block (not vice versa)', () => { @@ -81,11 +81,11 @@ describe('BitswapMessage', () => { msg.addEntry(cids[0], 1, BitswapMessage.WantType.Block, false, false) msg.addEntry(cids[0], 1, BitswapMessage.WantType.Have, false, false) - expect(msg.wantlist.get(cids[0].toString('base58btc')).wantType).to.eql(BitswapMessage.WantType.Block) + expect(msg.wantlist.get(cids[0].toString('base58btc'))).to.have.property('wantType', BitswapMessage.WantType.Block) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Have, false, false) msg.addEntry(cids[1], 1, BitswapMessage.WantType.Block, false, false) - expect(msg.wantlist.get(cids[1].toString('base58btc')).wantType).to.eql(BitswapMessage.WantType.Block) + expect(msg.wantlist.get(cids[1].toString('base58btc'))).to.have.property('wantType', BitswapMessage.WantType.Block) }) }) diff --git a/test/utils.spec.js b/test/utils.spec.js index 46b8863a..f4c611ad 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -7,6 +7,7 @@ const Block = require('ipld-block') const multihashing = require('multihashing-async') const BitswapMessageEntry = require('../src/types/message/entry') const uint8ArrayFromString = require('uint8arrays/from-string') +const BitswapMessage = require('../src/types/message') const { groupBy, uniqWith, pullAllWith, includesWith, sortBy, isMapEqual } = require('../src/utils') const SortedMap = require('../src/utils/sorted-map') @@ -96,7 +97,7 @@ describe('utils spec', function () { } ] - const groupedList1 = sortBy(o => o.name, list) + const groupedList1 = sortBy(o => o.name.charCodeAt(0), list) const groupedList2 = sortBy(o => o.id, list) expect(groupedList1).to.be.deep.equal([{ id: 2, name: 'a' }, @@ -110,25 +111,25 @@ describe('utils spec', function () { describe('isMapEqual', () => { it('should on be false when !== size', () => { expect(isMapEqual( - new Map([['key1', 'value1'], ['key2', 'value2']]), - new Map([['key1', 'value1']]) + new Map([['key1', { data: uint8ArrayFromString('value1') }], ['key2', { data: uint8ArrayFromString('value2') }]]), + new Map([['key1', { data: uint8ArrayFromString('value1') }]]) )).to.be.false() }) it('should on be false if one key is missing', () => { expect(isMapEqual( - new Map([['key1', 'value1'], ['key2', 'value2']]), - new Map([['key1', 'value1'], ['key3', 'value2']]) + new Map([['key1', { data: uint8ArrayFromString('value1') }], ['key2', { data: uint8ArrayFromString('value2') }]]), + new Map([['key1', { data: uint8ArrayFromString('value1') }], ['key3', { data: uint8ArrayFromString('value2') }]]) )).to.be.false() }) - it('should on be false if BitswapMessageEntry dont match', async () => { + it('should on be false if BitswapMessageEntry don\'t match', async () => { const hash1 = await multihashing(uint8ArrayFromString('OMG!1'), 'sha2-256') const cid1 = new CID(1, 'dag-pb', hash1) expect(isMapEqual( - new Map([['key1', new BitswapMessageEntry(cid1, 1, true)], ['key2', new BitswapMessageEntry(cid1, 2, true)]]), - new Map([['key1', new BitswapMessageEntry(cid1, 1, true)], ['key2', new BitswapMessageEntry(cid1, 1, true)]]) + new Map([['key1', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)], ['key2', new BitswapMessageEntry(cid1, 2, BitswapMessage.WantType.Block)]]), + new Map([['key1', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)], ['key2', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)]]) )).to.be.false() }) @@ -137,8 +138,8 @@ describe('utils spec', function () { const cid1 = new CID(1, 'dag-pb', hash1) expect(isMapEqual( - new Map([['key1', new BitswapMessageEntry(cid1, 1, true)], ['key2', new BitswapMessageEntry(cid1, 1, true)]]), - new Map([['key1', new BitswapMessageEntry(cid1, 1, true)], ['key2', new BitswapMessageEntry(cid1, 1, true)]]) + new Map([['key1', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)], ['key2', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)]]), + new Map([['key1', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)], ['key2', new BitswapMessageEntry(cid1, 1, BitswapMessage.WantType.Block)]]) )).to.be.true() }) @@ -212,7 +213,7 @@ describe('utils spec', function () { expect(sm.get('two')).to.eql(2) - sm.clear('two') + sm.clear() expect(sm.get('two')).to.be.undefined() expect(sm.size).to.eql(0) diff --git a/test/utils/connect-all.js b/test/utils/connect-all.js index 40a34e0d..75471ec5 100644 --- a/test/utils/connect-all.js +++ b/test/utils/connect-all.js @@ -1,7 +1,12 @@ 'use strict' +// @ts-ignore const without = require('lodash.without') +/** + * + * @param {any[]} nodes + */ module.exports = async (nodes) => { for (const node of nodes) { for (const otherNode of without(nodes, node)) { diff --git a/test/utils/create-bitswap.js b/test/utils/create-bitswap.js index fd3dc32d..80edfbd1 100644 --- a/test/utils/create-bitswap.js +++ b/test/utils/create-bitswap.js @@ -1,6 +1,6 @@ 'use strict' -const Bitswap = require('../..') +const Bitswap = require('../../src') const createTempRepo = require('./create-temp-repo-nodejs') const createLibp2pNode = require('./create-libp2p-node') diff --git a/test/utils/create-libp2p-node.js b/test/utils/create-libp2p-node.js index 98649980..32eefa0a 100644 --- a/test/utils/create-libp2p-node.js +++ b/test/utils/create-libp2p-node.js @@ -1,15 +1,21 @@ 'use strict' +// @ts-ignore const TCP = require('libp2p-tcp') +// @ts-ignore const MPLEX = require('libp2p-mplex') +// @ts-ignore const SECIO = require('libp2p-secio') const libp2p = require('libp2p') const KadDHT = require('libp2p-kad-dht') const PeerId = require('peer-id') - +// @ts-ignore const defaultsDeep = require('@nodeutils/defaults-deep') class Node extends libp2p { + /** + * @param {Partial & import('libp2p').constructorOptions & { DHT?: boolean}} _options + */ constructor (_options) { const defaults = { modules: { diff --git a/test/utils/create-temp-repo-browser.js b/test/utils/create-temp-repo-browser.js index c06a7725..973bec2f 100644 --- a/test/utils/create-temp-repo-browser.js +++ b/test/utils/create-temp-repo-browser.js @@ -1,17 +1,17 @@ /* global self */ 'use strict' +// @ts-ignore const IPFSRepo = require('ipfs-repo') -const idb = self.indexedDB || - self.mozIndexedDB || - self.webkitIndexedDB || - self.msIndexedDB +// @ts-ignore +const idb = self.indexedDB || self.mozIndexedDB || self.webkitIndexedDB || self.msIndexedDB async function createTempRepo () { const date = Date.now().toString() const path = `/bitswap-tests-${date}-${Math.random()}` + /** @type {import('ipfs-core-types/src/repo').Repo & { teardown: () => void}} */ const repo = new IPFSRepo(path) await repo.init({}) await repo.open() diff --git a/test/utils/create-temp-repo-nodejs.js b/test/utils/create-temp-repo-nodejs.js index 655ac856..55567a74 100644 --- a/test/utils/create-temp-repo-nodejs.js +++ b/test/utils/create-temp-repo-nodejs.js @@ -1,14 +1,19 @@ 'use strict' +// @ts-ignore const IPFSRepo = require('ipfs-repo') const pathJoin = require('path').join const os = require('os') +// @ts-ignore const rimraf = require('rimraf') +// @ts-ignore const promisify = require('promisify-es6') async function createTempRepo () { const date = Date.now().toString() const path = pathJoin(os.tmpdir(), `bitswap-tests-${date}-${Math.random()}`) + + /** @type {import('ipfs-core-types/src/repo').Repo & { teardown: () => Promise}} */ const repo = new IPFSRepo(path) repo.teardown = async () => { diff --git a/test/utils/distribution-test.js b/test/utils/distribution-test.js index d1e68d71..29f890f1 100644 --- a/test/utils/distribution-test.js +++ b/test/utils/distribution-test.js @@ -1,5 +1,7 @@ 'use strict' +/** @type {(n:number) => any[]} */ +// @ts-ignore const range = require('lodash.range') const { expect } = require('aegir/utils/chai') @@ -7,6 +9,13 @@ const createBitswap = require('./create-bitswap') const makeBlock = require('./make-block') const connectAll = require('./connect-all') +/** + * + * @param {number} instanceCount + * @param {number} blockCount + * @param {number} repeats + * @param {*} events + */ module.exports = async (instanceCount, blockCount, repeats, events) => { let pendingRepeats = repeats diff --git a/test/utils/helpers.js b/test/utils/helpers.js index a9aad9a1..d6545c2a 100644 --- a/test/utils/helpers.js +++ b/test/utils/helpers.js @@ -1,12 +1,20 @@ 'use strict' +// @ts-ignore const range = require('lodash.range') const { expect } = require('aegir/utils/chai') +/** + * @param {number} n + */ exports.orderedFinish = (n) => { const r = range(1, n + 1) + /** @type {number[]} */ const finishes = [] + /** + * @param {number} i + */ const output = (i) => { finishes.push(i) } diff --git a/test/utils/make-block.js b/test/utils/make-block.js index 81dfc3fc..5820dd9b 100644 --- a/test/utils/make-block.js +++ b/test/utils/make-block.js @@ -3,11 +3,19 @@ const multihashing = require('multihashing-async') const CID = require('cids') const Block = require('ipld-block') +// @ts-ignore const randomBytes = require('iso-random-stream/src/random') +// @ts-ignore const range = require('lodash.range') const uint8ArrayFromString = require('uint8arrays/from-string') +// @ts-ignore const { v4: uuid } = require('uuid') +/** + * @param {number} count + * @param {number} [size] + * @returns {Promise} + */ module.exports = async (count, size) => { const blocks = await Promise.all( range(count || 1).map(async () => { diff --git a/test/utils/make-peer-id.js b/test/utils/make-peer-id.js index 87d3b5ab..70811e73 100644 --- a/test/utils/make-peer-id.js +++ b/test/utils/make-peer-id.js @@ -2,9 +2,22 @@ const PeerId = require('peer-id') -module.exports = async (count) => { +async function makePeerId () { + return (await makePeerIds(1))[0] +} + +/** + * @param {number} count + * @returns {Promise} + */ +async function makePeerIds (count) { const peerIds = await Promise.all([...new Array(count || 1)].map(() => { return PeerId.create({ bits: 512 }) })) - return count ? peerIds : peerIds[0] + return peerIds +} + +module.exports = { + makePeerId, + makePeerIds } diff --git a/test/utils/mocks.js b/test/utils/mocks.js index 7dab93de..3290c54a 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -8,9 +8,43 @@ const PeerStore = require('libp2p/src/peer-store') const Node = require('./create-libp2p-node').bundle const tmpdir = require('ipfs-utils/src/temp-dir') const Repo = require('ipfs-repo') -const EventEmitter = require('events') +const { EventEmitter } = require('events') +const toString = require('uint8arrays/to-string') const Bitswap = require('../../src') +const Network = require('../../src/network') +const Stats = require('../../src/stats') + +/** + * @typedef {import('../../src/types').BlockStore} BlockStore + */ + +/** + * + * @returns {BlockStore} + */ +function mockBlockStore () { + const blocks = {} + + const store = { + has: (cid) => Promise.resolve(Boolean(blocks[toString(cid.multihash)])), + get: (cid) => Promise.resolve(blocks[toString(cid.multihash)]), + put: (block) => { + blocks[toString(block.cid.multihash)] = block + + return Promise.resolve(block) + }, + putMany: async function * (blocks) { + for await (const block of blocks) { + store.put(block) + + yield block + } + } + } + + return store +} /* * Create a mock libp2p node @@ -47,12 +81,15 @@ exports.mockLibp2pNode = () => { }) } -/* +/** * Create a mock network instance + * + * @param {number} [calls] + * @param {Function} [done] + * @param {Function} [onMsg] + * @returns {import('../../src/network')} */ -exports.mockNetwork = (calls, done, onMsg) => { - done = done || (() => {}) - +exports.mockNetwork = (calls = Infinity, done = () => {}, onMsg = () => {}) => { const connects = [] const messages = [] let i = 0 @@ -60,18 +97,27 @@ exports.mockNetwork = (calls, done, onMsg) => { const finish = (msgTo) => { onMsg && onMsg(msgTo) if (++i === calls) { - done({ connects: connects, messages: messages }) + done && done({ connects: connects, messages: messages }) } } - return { - messages, - connects, + class MockNetwork extends Network { + constructor () { + super({}, new Bitswap({}, mockBlockStore()), new Stats()) + + this.connects = connects + this.messages = messages + } + + // @ts-ignore connectTo (p) { setTimeout(() => { connects.push(p) }) - }, + + return Promise.resolve({ id: '', remotePeer: '' }) + } + sendMessage (p, msg) { messages.push([p, msg]) @@ -80,20 +126,27 @@ exports.mockNetwork = (calls, done, onMsg) => { }) return Promise.resolve() - }, + } + start () { return Promise.resolve() - }, + } + stop () { return Promise.resolve() - }, + } + findAndConnect () { return Promise.resolve() - }, + } + provide () { return Promise.resolve() } } + + // @ts-ignore + return new MockNetwork() } /* @@ -115,7 +168,7 @@ exports.createMockTestNet = async (repo, count) => { connectTo (id) { return new Promise((resolve, reject) => { if (!hexIds.includes(hexIds, id.toHexString())) { - return reject(new Error('unkown peer')) + return reject(new Error('unknown peer')) } resolve() }) diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index d4d1f782..6adb3f51 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -6,16 +6,17 @@ const { expect } = require('aegir/utils/chai') const cs = require('../../src/constants') const Message = require('../../src/types/message') const WantManager = require('../../src/want-manager') +const Stats = require('../../src/stats') const mockNetwork = require('../utils/mocks').mockNetwork const makeBlock = require('../utils/make-block') -const makePeerId = require('../utils/make-peer-id') +const { makePeerIds } = require('../utils/make-peer-id') describe('WantManager', () => { it('sends wantlist to all connected peers', async function () { this.timeout(80 * 1000) - const peerIds = await makePeerId(3) + const peerIds = await makePeerIds(3) const blocks = await makeBlock(3) const cids = blocks.map((b) => b.cid) @@ -56,7 +57,7 @@ describe('WantManager', () => { resolve() }) - const wantManager = new WantManager(peerIds[2], network) + const wantManager = new WantManager(peerIds[2], network, new Stats()) wantManager.start() wantManager.wantBlocks([cid1, cid2]) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index fda8cf24..7898ee33 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -7,9 +7,15 @@ const CID = require('cids') const multihashing = require('multihashing-async') const Message = require('../../src/types/message') const MsgQueue = require('../../src/want-manager/msg-queue') +const defer = require('p-defer') +const { + mockNetwork +} = require('../utils/mocks') describe('MessageQueue', () => { + /** @type {PeerId[]} */ let peerIds + /** @type {CID[]} */ let cids before(async () => { @@ -20,7 +26,7 @@ describe('MessageQueue', () => { cids = hashes.map((h) => new CID(h)) }) - it('connects and sends messages', () => { + it('connects and sends messages', async () => { const msg = new Message(true) const cid1 = cids[0] const cid2 = cids[1] @@ -32,60 +38,45 @@ describe('MessageQueue', () => { msg.addEntry(cid1, 3) msg.addEntry(cid2, 1) - const messages = [] - const connects = [] - let i = 0 - - return new Promise((resolve) => { - const finish = () => { - i++ - if (i === 2) { - expect(connects).to.be.eql([peerIds[1], peerIds[1]]) - - const m1 = new Message(false) - m1.addEntry(cid3, 1) - m1.addEntry(cid4, 2) - m1.cancel(cid5) - m1.cancel(cid6) - - expect( - messages - ).to.be.eql([ - [peerIds[1], msg], - [peerIds[1], m1] - ]) - - resolve() - } - } - - const network = { - async connectTo (p) { // eslint-disable-line require-await - connects.push(p) - }, - async sendMessage (p, msg) { // eslint-disable-line require-await - messages.push([p, msg]) - finish() - } - } - - const mq = new MsgQueue(peerIds[0], peerIds[1], network) - - expect(mq.refcnt).to.equal(1) - - const batch1 = [ - new Message.Entry(cid3, 1, Message.WantType.Block, false), - new Message.Entry(cid4, 2, Message.WantType.Block, false) - ] - - const batch2 = [ - new Message.Entry(cid5, 1, Message.WantType.Block, true), - new Message.Entry(cid6, 2, Message.WantType.Block, true) - ] - - mq.addEntries(batch1) - mq.addEntries(batch2) - mq.addMessage(msg) + const deferred = defer() + + const network = mockNetwork(2, ({ connects, messages }) => { + expect(connects).to.be.eql([peerIds[1], peerIds[1]]) + + const m1 = new Message(false) + m1.addEntry(cid3, 1) + m1.addEntry(cid4, 2) + m1.cancel(cid5) + m1.cancel(cid6) + + expect( + messages + ).to.be.eql([ + [peerIds[1], msg], + [peerIds[1], m1] + ]) + + deferred.resolve() }) + + const mq = new MsgQueue(peerIds[0], peerIds[1], network) + + expect(mq.refcnt).to.equal(1) + + const batch1 = [ + new Message.Entry(cid3, 1, Message.WantType.Block, false), + new Message.Entry(cid4, 2, Message.WantType.Block, false) + ] + + const batch2 = [ + new Message.Entry(cid5, 1, Message.WantType.Block, true), + new Message.Entry(cid6, 2, Message.WantType.Block, true) + ] + + mq.addEntries(batch1) + mq.addEntries(batch2) + mq.addMessage(msg) + + await deferred.promise }) }) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 00000000..0cbdc31a --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "./node_modules/aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src" + ] +}