Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit bbc74ff

Browse files
committed
refactor: move EventEmitter from GCLock to test
1 parent 793c668 commit bbc74ff

File tree

3 files changed

+59
-33
lines changed

3 files changed

+59
-33
lines changed

src/core/components/pin/gc-lock.js

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,19 @@
33
const pull = require('pull-stream/pull')
44
const pullThrough = require('pull-stream/throughs/through')
55
const pullAsyncMap = require('pull-stream/throughs/async-map')
6-
const EventEmitter = require('events')
76
const Mutex = require('../../../utils/mutex')
87
const log = require('debug')('ipfs:gc:lock')
98

10-
class GCLock extends EventEmitter {
9+
class GCLock {
1110
constructor (repoOwner) {
12-
super()
13-
1411
this.mutex = new Mutex(repoOwner, { log })
1512
}
1613

1714
readLock (lockedFn, cb) {
18-
this.emit(`readLock request`)
1915
return this.mutex.readLock(lockedFn, cb)
2016
}
2117

2218
writeLock (lockedFn, cb) {
23-
this.emit(`writeLock request`)
2419
return this.mutex.writeLock(lockedFn, cb)
2520
}
2621

@@ -33,7 +28,7 @@ class GCLock extends EventEmitter {
3328
}
3429

3530
pullLock (type, lockedPullFn) {
36-
const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++)
31+
const pullLocker = new PullLocker(this.mutex, type)
3732

3833
return pull(
3934
pullLocker.take(),
@@ -44,8 +39,7 @@ class GCLock extends EventEmitter {
4439
}
4540

4641
class PullLocker {
47-
constructor (emitter, mutex, type) {
48-
this.emitter = emitter
42+
constructor (mutex, type) {
4943
this.mutex = mutex
5044
this.type = type
5145

@@ -54,26 +48,30 @@ class PullLocker {
5448
}
5549

5650
take () {
57-
return pull(
58-
pullAsyncMap((i, cb) => {
59-
if (this.lockRequested) {
60-
return cb(null, i)
61-
}
62-
this.lockRequested = true
63-
64-
this.emitter.emit(`${this.type} request`)
65-
66-
this.mutex[this.type]((releaseLock) => {
67-
cb(null, i)
68-
this.releaseLock = releaseLock
69-
})
51+
return pullAsyncMap((i, cb) => {
52+
// Check if the lock has already been acquired.
53+
// Note: new items will only come through the pull stream once the first
54+
// item has acquired a lock.
55+
if (this.releaseLock) {
56+
// The lock has been acquired so return immediately
57+
return cb(null, i)
58+
}
59+
60+
// Request the lock
61+
this.mutex[this.type]((releaseLock) => {
62+
// The lock has been granted, so run the locked piece of code
63+
cb(null, i)
64+
65+
// Save the release function to be called when the stream completes
66+
this.releaseLock = releaseLock
7067
})
71-
)
68+
})
7269
}
7370

7471
// Releases the lock
7572
release () {
7673
return pullThrough(null, (err) => {
74+
// When the stream completes, release the lock
7775
this.releaseLock(err)
7876
})
7977
}

src/utils/mutex.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const noop = () => {}
88

99
// Wrap mortice to present a callback interface
1010
class Mutex {
11-
constructor (repoOwner, options) {
11+
constructor (repoOwner, options = {}) {
1212
// Ensure that we get a different mutex for each instance of the lock
1313
const randId = nanoid()
1414
this.mutex = mortice(randId, {

test/core/gc.spec.js

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,28 @@ const pEvent = require('p-event')
1212
const env = require('ipfs-utils/src/env')
1313
const IPFS = require('../../src/core')
1414

15+
// We need to detect when a readLock or writeLock is requested for the tests
16+
// so we override the Mutex class to emit an event
17+
const EventEmitter = require('events')
18+
const Mutex = require('../../src/utils/mutex')
19+
20+
class MutexEmitter extends Mutex {
21+
constructor (repoOwner) {
22+
super(repoOwner)
23+
this.emitter = new EventEmitter()
24+
}
25+
26+
readLock (lockedFn, cb) {
27+
this.emitter.emit('readLock request')
28+
return super.readLock(lockedFn, cb)
29+
}
30+
31+
writeLock (lockedFn, cb) {
32+
this.emitter.emit('writeLock request')
33+
return super.writeLock(lockedFn, cb)
34+
}
35+
}
36+
1537
describe('gc', function () {
1638
const fixtures = [{
1739
path: 'test/my/path1',
@@ -29,6 +51,7 @@ describe('gc', function () {
2951

3052
let ipfsd
3153
let ipfs
54+
let lockEmitter
3255

3356
before(function (done) {
3457
this.timeout(40 * 1000)
@@ -48,6 +71,11 @@ describe('gc', function () {
4871
ipfsd = node
4972
ipfs = ipfsd.api
5073

74+
// Replace the Mutex with one that emits events when a readLock or
75+
// writeLock is requested (needed in the tests below)
76+
ipfs._gcLock.mutex = new MutexEmitter(ipfs._options.repoOwner)
77+
lockEmitter = ipfs._gcLock.mutex.emitter
78+
5179
done()
5280
})
5381
})
@@ -79,13 +107,13 @@ describe('gc', function () {
79107
it(`garbage collection should wait for pending ${test.name} to finish`, async () => {
80108
// Add blocks to IPFS
81109
// Note: add operation will take a read lock
82-
const addLockRequested = pEvent(ipfs._gcLock, 'readLock request')
110+
const addLockRequested = pEvent(lockEmitter, 'readLock request')
83111
const add1 = test.add1()
84112

85113
// Once add lock has been requested, start GC
86114
await addLockRequested
87115
// Note: GC will take a write lock
88-
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request')
116+
const gcStarted = pEvent(lockEmitter, 'writeLock request')
89117
const gc = ipfs.repo.gc()
90118

91119
// Once GC has started, start second add
@@ -109,13 +137,13 @@ describe('gc', function () {
109137
it('garbage collection should wait for pending add + pin to finish', async () => {
110138
// Add blocks to IPFS
111139
// Note: add operation will take a read lock
112-
const addLockRequested = pEvent(ipfs._gcLock, 'readLock request')
140+
const addLockRequested = pEvent(lockEmitter, 'readLock request')
113141
const add1 = ipfs.add(fixtures[2], { pin: true })
114142

115143
// Once add lock has been requested, start GC
116144
await addLockRequested
117145
// Note: GC will take a write lock
118-
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request')
146+
const gcStarted = pEvent(lockEmitter, 'writeLock request')
119147
const gc = ipfs.repo.gc()
120148

121149
// Once GC has started, start second add
@@ -142,13 +170,13 @@ describe('gc', function () {
142170

143171
// Remove first block from IPFS
144172
// Note: block rm will take a write lock
145-
const rmLockRequested = pEvent(ipfs._gcLock, 'writeLock request')
173+
const rmLockRequested = pEvent(lockEmitter, 'writeLock request')
146174
const rm1 = ipfs.block.rm(cid1)
147175

148176
// Once rm lock has been requested, start GC
149177
await rmLockRequested
150178
// Note: GC will take a write lock
151-
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request')
179+
const gcStarted = pEvent(lockEmitter, 'writeLock request')
152180
const gc = ipfs.repo.gc()
153181

154182
// Once GC has started, start second rm
@@ -185,7 +213,7 @@ describe('gc', function () {
185213

186214
// Pin first block
187215
// Note: pin add will take a read lock
188-
const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request')
216+
const pinLockRequested = pEvent(lockEmitter, 'readLock request')
189217
const pin1 = ipfs.pin.add(cid1)
190218

191219
// Once pin lock has been requested, start GC
@@ -222,13 +250,13 @@ describe('gc', function () {
222250

223251
// Unpin first block
224252
// Note: pin rm will take a read lock
225-
const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request')
253+
const pinLockRequested = pEvent(lockEmitter, 'readLock request')
226254
const pinRm1 = ipfs.pin.rm(cid1)
227255

228256
// Once pin lock has been requested, start GC
229257
await pinLockRequested
230258
// Note: GC will take a write lock
231-
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request')
259+
const gcStarted = pEvent(lockEmitter, 'writeLock request')
232260
const gc = ipfs.repo.gc()
233261

234262
// Once GC has started, start second pin rm

0 commit comments

Comments
 (0)