Skip to content

Commit 1db98da

Browse files
natzcammafintosh
andauthored
Add monitor (#25)
* add monitor * remove solo * remove open * add close test * only ever use one listener * add mon.destroy --------- Co-authored-by: Mathias Buus <[email protected]>
1 parent 7cc126c commit 1db98da

File tree

4 files changed

+215
-0
lines changed

4 files changed

+215
-0
lines changed

index.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const mutexify = require('mutexify')
22
const b4a = require('b4a')
33

44
const { BlobReadStream, BlobWriteStream } = require('./lib/streams')
5+
const Monitor = require('./lib/monitor')
56

67
const DEFAULT_BLOCK_SIZE = 2 ** 16
78

@@ -12,6 +13,11 @@ module.exports = class Hyperblobs {
1213

1314
this._lock = mutexify()
1415
this._core = core
16+
this._monitors = new Set()
17+
18+
this._boundUpdatePeers = this._updatePeers.bind(this)
19+
this._boundOnUpload = this._onUpload.bind(this)
20+
this._boundOnDownload = this._onDownload.bind(this)
1521
}
1622

1723
get feed () {
@@ -67,4 +73,42 @@ module.exports = class Hyperblobs {
6773
const core = (opts && opts.core) ? opts.core : this._core
6874
return new BlobWriteStream(core, this._lock, opts)
6975
}
76+
77+
monitor (id) {
78+
const monitor = new Monitor(this, id)
79+
if (this._monitors.size === 0) this._startListening()
80+
this._monitors.add(monitor)
81+
return monitor
82+
}
83+
84+
_removeMonitor (mon) {
85+
this._monitors.delete(mon)
86+
if (this._monitors.size === 0) this._stopListening()
87+
}
88+
89+
_updatePeers () {
90+
for (const m of this._monitors) m._updatePeers()
91+
}
92+
93+
_onUpload (index, bytes, from) {
94+
for (const m of this._monitors) m._onUpload(index, bytes, from)
95+
}
96+
97+
_onDownload (index, bytes, from) {
98+
for (const m of this._monitors) m._onDownload(index, bytes, from)
99+
}
100+
101+
_startListening () {
102+
this.core.on('peer-add', this._boundUpdatePeers)
103+
this.core.on('peer-remove', this._boundUpdatePeers)
104+
this.core.on('upload', this._boundOnUpload)
105+
this.core.on('download', this._boundOnDownload)
106+
}
107+
108+
_stopListening () {
109+
this.core.off('peer-add', this._boundUpdatePeers)
110+
this.core.off('peer-remove', this._boundUpdatePeers)
111+
this.core.off('upload', this._boundOnUpload)
112+
this.core.off('download', this._boundOnDownload)
113+
}
70114
}

lib/monitor.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
const EventEmitter = require('events')
2+
const speedometer = require('speedometer')
3+
4+
module.exports = class Monitor extends EventEmitter {
5+
constructor (blobs, id) {
6+
super()
7+
8+
if (!id) throw new Error('id is required')
9+
10+
this.blobs = blobs
11+
this.id = id
12+
this.peers = 0
13+
this.uploadSpeedometer = null
14+
this.downloadSpeedometer = null
15+
16+
const stats = {
17+
startTime: 0,
18+
percentage: 0,
19+
peers: 0,
20+
speed: 0,
21+
blocks: 0,
22+
totalBytes: 0, // local + bytes loaded during monitoring
23+
monitoringBytes: 0, // bytes loaded during monitoring
24+
targetBytes: 0,
25+
targetBlocks: 0
26+
}
27+
28+
this.uploadStats = { ...stats }
29+
this.downloadStats = { ...stats }
30+
this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength
31+
this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength
32+
this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length
33+
34+
this.uploadSpeedometer = speedometer()
35+
this.downloadSpeedometer = speedometer()
36+
37+
// Handlers
38+
}
39+
40+
// just an alias
41+
destroy () {
42+
return this.close()
43+
}
44+
45+
close () {
46+
this.blobs._removeMonitor(this)
47+
}
48+
49+
_onUpload (index, bytes, from) {
50+
this._updateStats(this.uploadSpeedometer, this.uploadStats, index, bytes, from)
51+
}
52+
53+
_onDownload (index, bytes, from) {
54+
this._updateStats(this.downloadSpeedometer, this.downloadStats, index, bytes, from)
55+
}
56+
57+
_updatePeers () {
58+
this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length
59+
this.emit('update')
60+
}
61+
62+
_updateStats (speed, stats, index, bytes) {
63+
if (this.closing) return
64+
if (!isWithinRange(index, this.id)) return
65+
66+
if (!stats.startTime) stats.startTime = Date.now()
67+
68+
stats.speed = speed(bytes)
69+
stats.blocks++
70+
stats.totalBytes += bytes
71+
stats.monitoringBytes += bytes
72+
stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100)
73+
74+
this.emit('update')
75+
}
76+
77+
downloadSpeed () {
78+
return this.downloadSpeedometer ? this.downloadSpeedometer() : 0
79+
}
80+
81+
uploadSpeed () {
82+
return this.uploadSpeedometer ? this.uploadSpeedometer() : 0
83+
}
84+
}
85+
86+
function isWithinRange (index, { blockOffset, blockLength }) {
87+
return index >= blockOffset && index < blockOffset + blockLength
88+
}
89+
90+
function toFixed (n) {
91+
return Math.round(n * 100) / 100
92+
}

package.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,18 @@
2525
"index.js",
2626
"lib/**.js"
2727
],
28+
"imports": {
29+
"events": {
30+
"bare": "bare-events",
31+
"default": "events"
32+
}
33+
},
2834
"dependencies": {
2935
"b4a": "^1.6.1",
36+
"bare-events": "^2.5.0",
3037
"hypercore-errors": "^1.1.1",
3138
"mutexify": "^1.4.0",
39+
"speedometer": "^1.1.0",
3240
"streamx": "^2.13.2"
3341
},
3442
"devDependencies": {

test/all.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,77 @@ test('clear with diff option', async function (t) {
305305
t.is(cleared3.blocks, 0)
306306
})
307307

308+
test('upload/download can be monitored', async (t) => {
309+
t.plan(30)
310+
311+
const [a, b] = await createPair()
312+
const blobsA = new Hyperblobs(a)
313+
const blobsB = new Hyperblobs(b)
314+
315+
const bytes = 1024 * 100 // big enough to trigger more than one update event
316+
const buf = Buffer.alloc(bytes, '0')
317+
const id = await blobsA.put(buf)
318+
319+
// add another blob which should not be monitored
320+
const controlId = await blobsA.put(buf)
321+
322+
{
323+
const expectedBlocks = [2, 1]
324+
const expectedBytes = [bytes, 65536]
325+
const expectedPercentage = [100, 50]
326+
327+
// Start monitoring upload
328+
const monitor = blobsA.monitor(id)
329+
monitor.on('update', () => {
330+
t.is(monitor.uploadStats.blocks, expectedBlocks.pop())
331+
t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop())
332+
t.is(monitor.uploadStats.targetBlocks, 2)
333+
t.is(monitor.uploadStats.targetBytes, bytes)
334+
t.is(monitor.uploadSpeed(), monitor.uploadStats.speed)
335+
t.is(monitor.uploadStats.percentage, expectedPercentage.pop())
336+
t.absent(monitor.downloadStats.blocks)
337+
})
338+
}
339+
340+
{
341+
// Start monitoring download
342+
const expectedBlocks = [2, 1]
343+
const expectedBytes = [bytes, 65536]
344+
const expectedPercentage = [100, 50]
345+
346+
const monitor = blobsB.monitor(id)
347+
monitor.on('update', () => {
348+
t.is(monitor.downloadStats.blocks, expectedBlocks.pop())
349+
t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop())
350+
t.is(monitor.downloadStats.targetBlocks, 2)
351+
t.is(monitor.downloadStats.targetBytes, bytes)
352+
t.is(monitor.downloadSpeed(), monitor.downloadStats.speed)
353+
t.is(monitor.downloadStats.percentage, expectedPercentage.pop())
354+
t.absent(monitor.uploadStats.blocks)
355+
})
356+
}
357+
358+
const res = await blobsB.get(id)
359+
t.alike(res, buf)
360+
361+
// should not generate events
362+
const controRes = await blobsB.get(controlId)
363+
t.alike(controRes, buf)
364+
})
365+
366+
test('monitor is removed from the Set on close', async (t) => {
367+
const core = new Hypercore(RAM)
368+
const blobs = new Hyperblobs(core)
369+
370+
const bytes = 1024 * 100 // big enough to trigger more than one update event
371+
const buf = Buffer.alloc(bytes, '0')
372+
const id = await blobs.put(buf)
373+
const monitor = blobs.monitor(id)
374+
t.is(blobs._monitors.size, 1)
375+
monitor.close()
376+
t.is(blobs._monitors.size, 0)
377+
})
378+
308379
async function createPair () {
309380
const a = new Hypercore(RAM)
310381
await a.ready()

0 commit comments

Comments
 (0)