From f6c092911b5330c945b5802b5fdcaf97b48921e1 Mon Sep 17 00:00:00 2001 From: natzcam Date: Tue, 17 Dec 2024 00:08:19 +0800 Subject: [PATCH 1/6] add monitor --- index.js | 14 +++++++ lib/monitor.js | 107 +++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 3 ++ test/all.js | 67 +++++++++++++++++++++++++++++++ 4 files changed, 191 insertions(+) create mode 100644 lib/monitor.js diff --git a/index.js b/index.js index b0ea403..3b5fcf3 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,7 @@ const mutexify = require('mutexify') const b4a = require('b4a') const { BlobReadStream, BlobWriteStream } = require('./lib/streams') +const Monitor = require('./lib/monitor') const DEFAULT_BLOCK_SIZE = 2 ** 16 @@ -12,6 +13,7 @@ module.exports = class Hyperblobs { this._lock = mutexify() this._core = core + this.monitors = new Set() } get feed () { @@ -67,4 +69,16 @@ module.exports = class Hyperblobs { const core = (opts && opts.core) ? opts.core : this._core return new BlobWriteStream(core, this._lock, opts) } + + monitor (id, opts = {}) { + const monitor = new Monitor(this, id, { ...opts }) + this.monitors.add(monitor) + return monitor + } + +// closeMonitors () { +// const closing = [] +// for (const monitor of this.monitors) closing.push(monitor.close()) +// await Promise.allSettled(closing) +// } } diff --git a/lib/monitor.js b/lib/monitor.js new file mode 100644 index 0000000..fb620df --- /dev/null +++ b/lib/monitor.js @@ -0,0 +1,107 @@ +const ReadyResource = require('ready-resource') +const safetyCatch = require('safety-catch') +const speedometer = require('speedometer') + +module.exports = class Monitor extends ReadyResource { + constructor (blobs, id) { + super() + if (!blobs) throw new Error('blobs is required') + if (!id) throw new Error('id is required') + + this.blobs = blobs + this.id = id + this.peers = 0 + this.uploadSpeedometer = null + this.downloadSpeedometer = null + + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) + this._boundPeerUpdate = this._updatePeers.bind(this) + + const stats = { + startTime: 0, + percentage: 0, + peers: 0, + speed: 0, + blocks: 0, + totalBytes: 0, // local + bytes loaded during monitoring + monitoringBytes: 0, // bytes loaded during monitoring + targetBytes: 0, + targetBlocks: 0 + } + + this.uploadStats = { ...stats } + this.downloadStats = { ...stats } + + this.ready().catch(safetyCatch) + } + + _open () { + this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength + this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength + + this.uploadSpeedometer = speedometer() + this.downloadSpeedometer = speedometer() + + this._updatePeers() + + // Handlers + this.blobs.core.on('peer-add', this._boundPeerUpdate) + this.blobs.core.on('peer-remove', this._boundPeerUpdate) + this.blobs.core.on('upload', this._boundOnUpload) + this.blobs.core.on('download', this._boundOnDownload) + } + + _close () { + this.blobs.core.off('peer-add', this._boundPeerUpdate) + this.blobs.core.off('peer-remove', this._boundPeerUpdate) + this.blobs.core.off('upload', this._boundOnUpload) + this.blobs.core.off('download', this._boundOnDownload) + this.blobs.monitors.delete(this) + } + + _onUpload (index, bytes, from) { + this._updateStats(this.uploadSpeedometer, this.uploadStats, index, bytes, from) + } + + _onDownload (index, bytes, from) { + this._updateStats(this.downloadSpeedometer, this.downloadStats, index, bytes, from) + } + + _updatePeers () { + this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length + this.emit('update') + } + + _updateStats (speed, stats, index, bytes) { + if (this.closing) return + if (!isWithinRange(index, this.id)) return + + if (!stats.startTime) stats.startTime = Date.now() + + stats.speed = speed(bytes) + stats.blocks++ + stats.totalBytes += bytes + stats.monitoringBytes += bytes + // NOTE: you should not rely on the percentage until the monitor is initialized with the local state of the file + stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100) + + this.emit('update') + } + + downloadSpeed () { + return this.downloadSpeedometer ? this.downloadSpeedometer() : 0 + } + + uploadSpeed () { + return this.uploadSpeedometer ? this.uploadSpeedometer() : 0 + } +} + +function isWithinRange (index, { blockOffset, blockLength }) { + return index >= blockOffset && index < blockOffset + blockLength +} + +function toFixed (n) { + return Math.round(n * 100) / 100 +} diff --git a/package.json b/package.json index 21a4162..36154e4 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,9 @@ "b4a": "^1.6.1", "hypercore-errors": "^1.1.1", "mutexify": "^1.4.0", + "ready-resource": "^1.1.1", + "safety-catch": "^1.0.2", + "speedometer": "^1.1.0", "streamx": "^2.13.2" }, "devDependencies": { diff --git a/test/all.js b/test/all.js index a2d7e43..41afd68 100644 --- a/test/all.js +++ b/test/all.js @@ -305,6 +305,73 @@ test('clear with diff option', async function (t) { t.is(cleared3.blocks, 0) }) +test.solo('upload/download can be monitored', async (t) => { + t.plan(30) + + const [a, b] = await createPair() + const blobsA = new Hyperblobs(a) + const blobsB = new Hyperblobs(b) + + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buf = Buffer.alloc(bytes, '0') + const id = await blobsA.put(buf) + + // add another blob which should not be monitored + const controlId = await blobsA.put(buf) + + { + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + const expectedPercentage = [100, 50] + + // Start monitoring upload + const monitor = blobsA.monitor(id) + monitor.on('update', () => { + t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) + t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.uploadStats.targetBlocks, 2) + t.is(monitor.uploadStats.targetBytes, bytes) + t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) + t.is(monitor.uploadStats.percentage, expectedPercentage.pop()) + t.absent(monitor.downloadStats.blocks) + }) + } + + { + // Start monitoring download + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + const expectedPercentage = [100, 50] + + const monitor = blobsB.monitor(id) + monitor.on('update', () => { + t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) + t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.downloadStats.targetBlocks, 2) + t.is(monitor.downloadStats.targetBytes, bytes) + t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) + t.is(monitor.downloadStats.percentage, expectedPercentage.pop()) + t.absent(monitor.uploadStats.blocks) + }) + } + + const res = await blobsB.get(id) + t.alike(res, buf) + + // should not generate events + const controRes = await blobsB.get(controlId) + t.alike(controRes, buf) +}) + +// test('monitor is removed from the Set on close', async (t) => { +// const { drive } = await testenv(t) +// const monitor = drive.monitor('/example.md') +// await monitor.ready() +// t.is(drive.monitors.size, 1) +// await monitor.close() +// t.is(drive.monitors.size, 0) +// }) + async function createPair () { const a = new Hypercore(RAM) await a.ready() From d3296e0ee22b1b89ca456d0681af62a28bb0ad6e Mon Sep 17 00:00:00 2001 From: natzcam Date: Tue, 17 Dec 2024 00:19:07 +0800 Subject: [PATCH 2/6] remove solo --- lib/monitor.js | 5 ++--- test/all.js | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index fb620df..91967ac 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -39,12 +39,11 @@ module.exports = class Monitor extends ReadyResource { _open () { this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength + this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length this.uploadSpeedometer = speedometer() this.downloadSpeedometer = speedometer() - this._updatePeers() - // Handlers this.blobs.core.on('peer-add', this._boundPeerUpdate) this.blobs.core.on('peer-remove', this._boundPeerUpdate) @@ -57,6 +56,7 @@ module.exports = class Monitor extends ReadyResource { this.blobs.core.off('peer-remove', this._boundPeerUpdate) this.blobs.core.off('upload', this._boundOnUpload) this.blobs.core.off('download', this._boundOnDownload) + this.blobs.monitors.delete(this) } @@ -83,7 +83,6 @@ module.exports = class Monitor extends ReadyResource { stats.blocks++ stats.totalBytes += bytes stats.monitoringBytes += bytes - // NOTE: you should not rely on the percentage until the monitor is initialized with the local state of the file stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100) this.emit('update') diff --git a/test/all.js b/test/all.js index 41afd68..3c02f93 100644 --- a/test/all.js +++ b/test/all.js @@ -305,7 +305,7 @@ test('clear with diff option', async function (t) { t.is(cleared3.blocks, 0) }) -test.solo('upload/download can be monitored', async (t) => { +test('upload/download can be monitored', async (t) => { t.plan(30) const [a, b] = await createPair() From 792f5c7f7f6364354cae47e92c913438ae9a6173 Mon Sep 17 00:00:00 2001 From: natzcam Date: Tue, 17 Dec 2024 00:30:10 +0800 Subject: [PATCH 3/6] remove open --- lib/monitor.js | 6 ------ package.json | 1 - 2 files changed, 7 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 91967ac..83bfd9e 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,5 +1,4 @@ const ReadyResource = require('ready-resource') -const safetyCatch = require('safety-catch') const speedometer = require('speedometer') module.exports = class Monitor extends ReadyResource { @@ -32,11 +31,6 @@ module.exports = class Monitor extends ReadyResource { this.uploadStats = { ...stats } this.downloadStats = { ...stats } - - this.ready().catch(safetyCatch) - } - - _open () { this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length diff --git a/package.json b/package.json index 36154e4..4c6d4a0 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,6 @@ "hypercore-errors": "^1.1.1", "mutexify": "^1.4.0", "ready-resource": "^1.1.1", - "safety-catch": "^1.0.2", "speedometer": "^1.1.0", "streamx": "^2.13.2" }, From 00fae096034ba47896818c338610a4c9265a1e0b Mon Sep 17 00:00:00 2001 From: natzcam Date: Tue, 17 Dec 2024 00:40:03 +0800 Subject: [PATCH 4/6] add close test --- index.js | 6 ------ test/all.js | 20 ++++++++++++-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/index.js b/index.js index 3b5fcf3..00b998c 100644 --- a/index.js +++ b/index.js @@ -75,10 +75,4 @@ module.exports = class Hyperblobs { this.monitors.add(monitor) return monitor } - -// closeMonitors () { -// const closing = [] -// for (const monitor of this.monitors) closing.push(monitor.close()) -// await Promise.allSettled(closing) -// } } diff --git a/test/all.js b/test/all.js index 3c02f93..7130e49 100644 --- a/test/all.js +++ b/test/all.js @@ -363,14 +363,18 @@ test('upload/download can be monitored', async (t) => { t.alike(controRes, buf) }) -// test('monitor is removed from the Set on close', async (t) => { -// const { drive } = await testenv(t) -// const monitor = drive.monitor('/example.md') -// await monitor.ready() -// t.is(drive.monitors.size, 1) -// await monitor.close() -// t.is(drive.monitors.size, 0) -// }) +test('monitor is removed from the Set on close', async (t) => { + const core = new Hypercore(RAM) + const blobs = new Hyperblobs(core) + + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buf = Buffer.alloc(bytes, '0') + const id = await blobs.put(buf) + const monitor = blobs.monitor(id) + t.is(blobs.monitors.size, 1) + await monitor.close() + t.is(blobs.monitors.size, 0) +}) async function createPair () { const a = new Hypercore(RAM) From 0a382849537444b61168841b8484e7095d6c8aac Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 16 Dec 2024 22:29:32 +0100 Subject: [PATCH 5/6] only ever use one listener --- index.js | 44 ++++++++++++++++++++++++++++++++++++++++---- lib/monitor.js | 23 +++++------------------ package.json | 8 +++++++- test/all.js | 6 +++--- 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/index.js b/index.js index 00b998c..bcd2f9c 100644 --- a/index.js +++ b/index.js @@ -13,7 +13,11 @@ module.exports = class Hyperblobs { this._lock = mutexify() this._core = core - this.monitors = new Set() + this._monitors = new Set() + + this._boundUpdatePeers = this._updatePeers.bind(this) + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) } get feed () { @@ -70,9 +74,41 @@ module.exports = class Hyperblobs { return new BlobWriteStream(core, this._lock, opts) } - monitor (id, opts = {}) { - const monitor = new Monitor(this, id, { ...opts }) - this.monitors.add(monitor) + monitor (id) { + const monitor = new Monitor(this, id) + if (this._monitors.size === 0) this._startListening() + this._monitors.add(monitor) return monitor } + + _removeMonitor (mon) { + this._monitors.delete(mon) + if (this._monitors.size === 0) this._stopListening() + } + + _updatePeers () { + for (const m of this._monitors) m._updatePeers() + } + + _onUpload (index, bytes, from) { + for (const m of this._monitors) m._onUpload(index, bytes, from) + } + + _onDownload (index, bytes, from) { + for (const m of this._monitors) m._onDownload(index, bytes, from) + } + + _startListening () { + this.core.on('peer-add', this._boundUpdatePeers) + this.core.on('peer-remove', this._boundUpdatePeers) + this.core.on('upload', this._boundOnUpload) + this.core.on('download', this._boundOnDownload) + } + + _stopListening () { + this.core.off('peer-add', this._boundUpdatePeers) + this.core.off('peer-remove', this._boundUpdatePeers) + this.core.off('upload', this._boundOnUpload) + this.core.off('download', this._boundOnDownload) + } } diff --git a/lib/monitor.js b/lib/monitor.js index 83bfd9e..5ca1c7e 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,10 +1,10 @@ -const ReadyResource = require('ready-resource') +const EventEmitter = require('events') const speedometer = require('speedometer') -module.exports = class Monitor extends ReadyResource { +module.exports = class Monitor extends EventEmitter { constructor (blobs, id) { super() - if (!blobs) throw new Error('blobs is required') + if (!id) throw new Error('id is required') this.blobs = blobs @@ -13,10 +13,6 @@ module.exports = class Monitor extends ReadyResource { this.uploadSpeedometer = null this.downloadSpeedometer = null - this._boundOnUpload = this._onUpload.bind(this) - this._boundOnDownload = this._onDownload.bind(this) - this._boundPeerUpdate = this._updatePeers.bind(this) - const stats = { startTime: 0, percentage: 0, @@ -39,19 +35,10 @@ module.exports = class Monitor extends ReadyResource { this.downloadSpeedometer = speedometer() // Handlers - this.blobs.core.on('peer-add', this._boundPeerUpdate) - this.blobs.core.on('peer-remove', this._boundPeerUpdate) - this.blobs.core.on('upload', this._boundOnUpload) - this.blobs.core.on('download', this._boundOnDownload) } - _close () { - this.blobs.core.off('peer-add', this._boundPeerUpdate) - this.blobs.core.off('peer-remove', this._boundPeerUpdate) - this.blobs.core.off('upload', this._boundOnUpload) - this.blobs.core.off('download', this._boundOnDownload) - - this.blobs.monitors.delete(this) + close () { + this.blobs._removeMonitor(this) } _onUpload (index, bytes, from) { diff --git a/package.json b/package.json index 4c6d4a0..6d5ff3c 100644 --- a/package.json +++ b/package.json @@ -25,11 +25,17 @@ "index.js", "lib/**.js" ], + "imports": { + "events": { + "bare": "bare-events", + "default": "events" + } + }, "dependencies": { "b4a": "^1.6.1", + "bare-events": "^2.5.0", "hypercore-errors": "^1.1.1", "mutexify": "^1.4.0", - "ready-resource": "^1.1.1", "speedometer": "^1.1.0", "streamx": "^2.13.2" }, diff --git a/test/all.js b/test/all.js index 7130e49..5643904 100644 --- a/test/all.js +++ b/test/all.js @@ -371,9 +371,9 @@ test('monitor is removed from the Set on close', async (t) => { const buf = Buffer.alloc(bytes, '0') const id = await blobs.put(buf) const monitor = blobs.monitor(id) - t.is(blobs.monitors.size, 1) - await monitor.close() - t.is(blobs.monitors.size, 0) + t.is(blobs._monitors.size, 1) + monitor.close() + t.is(blobs._monitors.size, 0) }) async function createPair () { From 286738494da757dc1f09dde3dde4fabd3cbc4caa Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 16 Dec 2024 22:30:11 +0100 Subject: [PATCH 6/6] add mon.destroy --- lib/monitor.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/monitor.js b/lib/monitor.js index 5ca1c7e..4626eb4 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -37,6 +37,11 @@ module.exports = class Monitor extends EventEmitter { // Handlers } + // just an alias + destroy () { + return this.close() + } + close () { this.blobs._removeMonitor(this) }