diff --git a/index.js b/index.js index b0ea403..bcd2f9c 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,7 @@ const mutexify = require('mutexify') const b4a = require('b4a') const { BlobReadStream, BlobWriteStream } = require('./lib/streams') +const Monitor = require('./lib/monitor') const DEFAULT_BLOCK_SIZE = 2 ** 16 @@ -12,6 +13,11 @@ module.exports = class Hyperblobs { this._lock = mutexify() this._core = core + this._monitors = new Set() + + this._boundUpdatePeers = this._updatePeers.bind(this) + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) } get feed () { @@ -67,4 +73,42 @@ module.exports = class Hyperblobs { const core = (opts && opts.core) ? opts.core : this._core return new BlobWriteStream(core, this._lock, opts) } + + monitor (id) { + const monitor = new Monitor(this, id) + if (this._monitors.size === 0) this._startListening() + this._monitors.add(monitor) + return monitor + } + + _removeMonitor (mon) { + this._monitors.delete(mon) + if (this._monitors.size === 0) this._stopListening() + } + + _updatePeers () { + for (const m of this._monitors) m._updatePeers() + } + + _onUpload (index, bytes, from) { + for (const m of this._monitors) m._onUpload(index, bytes, from) + } + + _onDownload (index, bytes, from) { + for (const m of this._monitors) m._onDownload(index, bytes, from) + } + + _startListening () { + this.core.on('peer-add', this._boundUpdatePeers) + this.core.on('peer-remove', this._boundUpdatePeers) + this.core.on('upload', this._boundOnUpload) + this.core.on('download', this._boundOnDownload) + } + + _stopListening () { + this.core.off('peer-add', this._boundUpdatePeers) + this.core.off('peer-remove', this._boundUpdatePeers) + this.core.off('upload', this._boundOnUpload) + this.core.off('download', this._boundOnDownload) + } } diff --git a/lib/monitor.js b/lib/monitor.js new file mode 100644 index 0000000..4626eb4 --- /dev/null +++ b/lib/monitor.js @@ -0,0 +1,92 @@ +const EventEmitter = require('events') +const speedometer = require('speedometer') + +module.exports = class Monitor extends EventEmitter { + constructor (blobs, id) { + super() + + if (!id) throw new Error('id is required') + + this.blobs = blobs + this.id = id + this.peers = 0 + this.uploadSpeedometer = null + this.downloadSpeedometer = null + + const stats = { + startTime: 0, + percentage: 0, + peers: 0, + speed: 0, + blocks: 0, + totalBytes: 0, // local + bytes loaded during monitoring + monitoringBytes: 0, // bytes loaded during monitoring + targetBytes: 0, + targetBlocks: 0 + } + + this.uploadStats = { ...stats } + this.downloadStats = { ...stats } + this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength + this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength + this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length + + this.uploadSpeedometer = speedometer() + this.downloadSpeedometer = speedometer() + + // Handlers + } + + // just an alias + destroy () { + return this.close() + } + + close () { + this.blobs._removeMonitor(this) + } + + _onUpload (index, bytes, from) { + this._updateStats(this.uploadSpeedometer, this.uploadStats, index, bytes, from) + } + + _onDownload (index, bytes, from) { + this._updateStats(this.downloadSpeedometer, this.downloadStats, index, bytes, from) + } + + _updatePeers () { + this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length + this.emit('update') + } + + _updateStats (speed, stats, index, bytes) { + if (this.closing) return + if (!isWithinRange(index, this.id)) return + + if (!stats.startTime) stats.startTime = Date.now() + + stats.speed = speed(bytes) + stats.blocks++ + stats.totalBytes += bytes + stats.monitoringBytes += bytes + stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100) + + this.emit('update') + } + + downloadSpeed () { + return this.downloadSpeedometer ? this.downloadSpeedometer() : 0 + } + + uploadSpeed () { + return this.uploadSpeedometer ? this.uploadSpeedometer() : 0 + } +} + +function isWithinRange (index, { blockOffset, blockLength }) { + return index >= blockOffset && index < blockOffset + blockLength +} + +function toFixed (n) { + return Math.round(n * 100) / 100 +} diff --git a/package.json b/package.json index 21a4162..6d5ff3c 100644 --- a/package.json +++ b/package.json @@ -25,10 +25,18 @@ "index.js", "lib/**.js" ], + "imports": { + "events": { + "bare": "bare-events", + "default": "events" + } + }, "dependencies": { "b4a": "^1.6.1", + "bare-events": "^2.5.0", "hypercore-errors": "^1.1.1", "mutexify": "^1.4.0", + "speedometer": "^1.1.0", "streamx": "^2.13.2" }, "devDependencies": { diff --git a/test/all.js b/test/all.js index a2d7e43..5643904 100644 --- a/test/all.js +++ b/test/all.js @@ -305,6 +305,77 @@ test('clear with diff option', async function (t) { t.is(cleared3.blocks, 0) }) +test('upload/download can be monitored', async (t) => { + t.plan(30) + + const [a, b] = await createPair() + const blobsA = new Hyperblobs(a) + const blobsB = new Hyperblobs(b) + + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buf = Buffer.alloc(bytes, '0') + const id = await blobsA.put(buf) + + // add another blob which should not be monitored + const controlId = await blobsA.put(buf) + + { + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + const expectedPercentage = [100, 50] + + // Start monitoring upload + const monitor = blobsA.monitor(id) + monitor.on('update', () => { + t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) + t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.uploadStats.targetBlocks, 2) + t.is(monitor.uploadStats.targetBytes, bytes) + t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) + t.is(monitor.uploadStats.percentage, expectedPercentage.pop()) + t.absent(monitor.downloadStats.blocks) + }) + } + + { + // Start monitoring download + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + const expectedPercentage = [100, 50] + + const monitor = blobsB.monitor(id) + monitor.on('update', () => { + t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) + t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.downloadStats.targetBlocks, 2) + t.is(monitor.downloadStats.targetBytes, bytes) + t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) + t.is(monitor.downloadStats.percentage, expectedPercentage.pop()) + t.absent(monitor.uploadStats.blocks) + }) + } + + const res = await blobsB.get(id) + t.alike(res, buf) + + // should not generate events + const controRes = await blobsB.get(controlId) + t.alike(controRes, buf) +}) + +test('monitor is removed from the Set on close', async (t) => { + const core = new Hypercore(RAM) + const blobs = new Hyperblobs(core) + + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buf = Buffer.alloc(bytes, '0') + const id = await blobs.put(buf) + const monitor = blobs.monitor(id) + t.is(blobs._monitors.size, 1) + monitor.close() + t.is(blobs._monitors.size, 0) +}) + async function createPair () { const a = new Hypercore(RAM) await a.ready()