Skip to content

Commit 42273c7

Browse files
refactor: migrate to pull-streams
BREAKING: - Rename `datastore` to `blockstore` and `datastore-legacy` to `datastore`. - Stores now need to adhere to the [interface-pull-blob-store](https://github.com/ipfs/interface-pull-blob-store) definition.
1 parent 6c293ea commit 42273c7

15 files changed

+425
-412
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ sudo: false
22
language: node_js
33
node_js:
44
- 4
5-
- 5
5+
- stable
66

77
# Make sure we have new NPM.
88
before_install:

package.json

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,23 @@
3131
"devDependencies": {
3232
"abstract-blob-store": "^3.2.0",
3333
"aegir": "^6.0.1",
34-
"async": "^2.0.1",
3534
"buffer-loader": "^0.0.1",
3635
"chai": "^3.5.0",
37-
"fs-blob-store": "^5.2.1",
38-
"idb-plus-blob-store": "^1.1.2",
36+
"fs-pull-blob-store": "^0.3.0",
37+
"idb-pull-blob-store": "^0.3.0",
3938
"lodash": "^4.15.0",
4039
"ncp": "^2.0.0",
4140
"pre-commit": "^1.1.3",
42-
"rimraf": "^2.5.4",
43-
"run-series": "^1.1.4"
41+
"rimraf": "^2.5.4"
4442
},
4543
"dependencies": {
4644
"babel-runtime": "^6.11.6",
47-
"bl": "^1.1.2",
48-
"concat-stream": "^1.5.1",
4945
"ipfs-block": "^0.3.0",
5046
"lock": "^0.1.2",
51-
"lockfile": "^1.0.1",
5247
"multihashes": "^0.2.2",
53-
"xtend": "^4.0.1"
48+
"pull-stream": "^3.4.3",
49+
"run-series": "^1.1.4",
50+
"safe-buffer": "^5.0.1"
5451
},
5552
"license": "MIT",
5653
"contributors": [

src/index.js

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,55 @@
11
'use strict'
22

3+
const assert = require('assert')
4+
35
const stores = require('./stores')
46

5-
function Repo (repoPath, options) {
6-
if (!(this instanceof Repo)) {
7-
return new Repo(repoPath, options)
7+
module.exports = class Repo {
8+
constructor (repoPath, options) {
9+
assert.equal(typeof repoPath, 'string', 'missing repoPath')
10+
assert(options, 'missing options')
11+
assert(options.stores, 'missing options.stores')
12+
13+
this.path = repoPath
14+
15+
const blobStores = initializeBlobStores(options.stores)
16+
17+
const setup = (name, needsLocks, needsConfig) => {
18+
const args = [repoPath, blobStores[name]]
19+
if (needsLocks) {
20+
args.push(this.locks)
21+
}
22+
23+
if (needsConfig) {
24+
args.push(this.config)
25+
}
26+
27+
return stores[name].setUp.apply(stores[name], args)
28+
}
29+
30+
this.locks = setup('locks')
31+
this.version = setup('version', true)
32+
this.config = setup('config', true)
33+
this.keys = setup('keys', true, true)
34+
this.blockstore = setup('blockstore', true)
35+
}
36+
37+
exists (callback) {
38+
this.version.exists(callback)
839
}
9-
if (!options) { throw new Error('missing options param') }
10-
if (!options.stores) { throw new Error('missing options.stores param') }
11-
12-
// If options.stores is an abstract-blob-store instead of a map, use it for
13-
// all stores.
14-
if (options.stores.prototype && options.stores.prototype.createWriteStream) {
15-
const store = options.stores
16-
options.stores = {
40+
}
41+
42+
function initializeBlobStores (store) {
43+
if (store.constructor) {
44+
return {
1745
keys: store,
1846
config: store,
19-
datastore: store,
47+
blockstore: store,
2048
logs: store,
2149
locks: store,
2250
version: store
2351
}
2452
}
2553

26-
this.path = repoPath
27-
28-
this.locks = stores
29-
.locks
30-
.setUp(repoPath, options.stores.locks)
31-
32-
this.exists = (callback) => {
33-
this.version.exists(callback)
34-
}
35-
36-
this.version = stores
37-
.version
38-
.setUp(repoPath, options.stores.version, this.locks)
39-
40-
this.config = stores
41-
.config
42-
.setUp(repoPath, options.stores.config, this.locks)
43-
44-
this.keys = stores
45-
.keys
46-
.setUp(repoPath, options.stores.keys, this.locks, this.config)
47-
48-
this.datastore = stores
49-
.datastore
50-
.setUp(repoPath, options.stores.datastore, this.locks)
54+
return store
5155
}
52-
53-
exports = module.exports = Repo

src/stores/blockstore.js

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict'
2+
3+
const Block = require('ipfs-block')
4+
const pull = require('pull-stream')
5+
const Lock = require('lock')
6+
7+
const PREFIX_LENGTH = 8
8+
9+
exports = module.exports
10+
11+
function multihashToPath (multihash, extension) {
12+
extension = extension || 'data'
13+
const filename = `${multihash.toString('hex')}.${extension}`
14+
const folder = filename.slice(0, PREFIX_LENGTH)
15+
const path = folder + '/' + filename
16+
17+
return path
18+
}
19+
20+
exports.setUp = (basePath, BlobStore, locks) => {
21+
const store = new BlobStore(basePath + '/blocks')
22+
const lock = new Lock()
23+
24+
return {
25+
get (key, extension, cb) {
26+
if (typeof extension === 'function') {
27+
cb = extension
28+
extension = 'data'
29+
}
30+
31+
if (!key) {
32+
return cb(new Error('Invalid key'))
33+
}
34+
const p = multihashToPath(key, extension)
35+
36+
pull(
37+
store.read(p),
38+
pull.collect((err, values) => {
39+
if (err) {
40+
return cb(err)
41+
}
42+
43+
if (extension === 'data') {
44+
extension = 'protobuf'
45+
}
46+
47+
const data = Buffer.concat(values)
48+
49+
cb(null, new Block(data, extension))
50+
})
51+
)
52+
},
53+
54+
put (block, cb) {
55+
if (!block || !block.data) {
56+
return cb(new Error('Invalid block'))
57+
}
58+
59+
const key = multihashToPath(block.key, block.extension)
60+
61+
lock(key, (release) => pull(
62+
pull.values([block.data]),
63+
store.write(key, release((err) => {
64+
if (err) {
65+
return cb(err)
66+
}
67+
cb(null, {key})
68+
}))
69+
))
70+
},
71+
72+
has (key, extension, cb) {
73+
if (typeof extension === 'function') {
74+
cb = extension
75+
extension = undefined
76+
}
77+
78+
if (!key) {
79+
return cb(new Error('Invalid key'))
80+
}
81+
82+
const path = multihashToPath(key, extension)
83+
store.exists(path, cb)
84+
},
85+
86+
delete (key, extension, cb) {
87+
if (typeof extension === 'function') {
88+
cb = extension
89+
extension = undefined
90+
}
91+
92+
if (!key) {
93+
return cb(new Error('Invalid key'))
94+
}
95+
96+
const path = multihashToPath(key, extension)
97+
store.remove(path, cb)
98+
}
99+
}
100+
}

src/stores/config.js

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,51 @@
11
'use strict'
22

3-
const bl = require('bl')
3+
const pull = require('pull-stream')
4+
const series = require('run-series')
45

56
exports = module.exports
67

7-
exports.setUp = (basePath, blobStore, locks) => {
8-
const store = blobStore(basePath)
8+
exports.setUp = (basePath, BlobStore, locks) => {
9+
const store = new BlobStore(basePath)
10+
const configFile = 'config'
11+
912
return {
10-
get: (callback) => {
11-
store
12-
.createReadStream('config')
13-
.pipe(bl((err, config) => {
13+
get (callback) {
14+
pull(
15+
store.read(configFile),
16+
pull.collect((err, values) => {
1417
if (err) {
1518
return callback(err)
1619
}
20+
21+
const config = Buffer.concat(values)
1722
let result
1823
try {
1924
result = JSON.parse(config.toString())
2025
} catch (err) {
2126
return callback(err)
2227
}
28+
2329
callback(null, result)
24-
}))
30+
})
31+
)
2532
},
2633

27-
set: (config, callback) => {
28-
locks.lock((err) => {
29-
if (err) {
30-
return callback(err)
34+
set (config, callback) {
35+
series([
36+
(cb) => locks.lock(cb),
37+
(cb) => {
38+
pull(
39+
pull.values([
40+
new Buffer(JSON.stringify(config, null, 2))
41+
]),
42+
store.write(configFile, cb)
43+
)
3144
}
32-
33-
store.createWriteStream('config')
34-
.once('finish', () => {
35-
locks.unlock(callback)
36-
})
37-
.end(JSON.stringify(config, null, 2))
45+
], (err) => {
46+
locks.unlock((err2) => {
47+
callback(err || err2)
48+
})
3849
})
3950
}
4051
}

src/stores/datastore-legacy.js

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)