@@ -5,16 +5,16 @@ const pull = require('pull-stream')
5
5
const Lock = require ( 'lock' )
6
6
const base32 = require ( 'base32.js' )
7
7
const path = require ( 'path' )
8
- const write = require ( 'pull-write' )
8
+ const pullWrite = require ( 'pull-write' )
9
9
const parallel = require ( 'run-parallel' )
10
- const defer = require ( 'pull-defer/source' )
10
+ const pullDefer = require ( 'pull-defer/source' )
11
11
12
12
const PREFIX_LENGTH = 5
13
13
14
14
exports = module . exports
15
15
16
- function multihashToPath ( multihash , extension ) {
17
- extension = extension || 'data'
16
+ function multihashToPath ( multihash ) {
17
+ const extension = 'data'
18
18
const encoder = new base32 . Encoder ( )
19
19
const hash = encoder . write ( multihash ) . finalize ( )
20
20
const filename = `${ hash } .${ extension } `
@@ -27,82 +27,105 @@ exports.setUp = (basePath, BlobStore, locks) => {
27
27
const store = new BlobStore ( basePath + '/blocks' )
28
28
const lock = new Lock ( )
29
29
30
- function writeBlock ( block , cb ) {
30
+ function writeBlock ( block , callback ) {
31
31
if ( ! block || ! block . data ) {
32
- return cb ( new Error ( 'Invalid block' ) )
32
+ return callback ( new Error ( 'Invalid block' ) )
33
33
}
34
34
35
- const key = multihashToPath ( block . key , block . extension )
36
-
37
- lock ( key , ( release ) => pull (
38
- pull . values ( [ block . data ] ) ,
39
- store . write ( key , release ( ( err ) => {
40
- if ( err ) {
41
- return cb ( err )
42
- }
43
- cb ( null , { key} )
44
- } ) )
45
- ) )
35
+ const key = multihashToPath ( block . key ( ) )
36
+
37
+ lock ( key , ( release ) => {
38
+ pull (
39
+ pull . values ( [
40
+ block . data
41
+ ] ) ,
42
+ store . write ( key , release ( released ) )
43
+ )
44
+ } )
45
+
46
+ // called once the lock is released
47
+ function released ( err ) {
48
+ if ( err ) {
49
+ return callback ( err )
50
+ }
51
+ callback ( null , { key : key } )
52
+ }
46
53
}
47
54
48
55
return {
49
- getStream ( key , extension ) {
56
+ // returns a pull-stream of one block being read
57
+ getStream ( key ) {
50
58
if ( ! key ) {
51
59
return pull . error ( new Error ( 'Invalid key' ) )
52
60
}
53
61
54
- const p = multihashToPath ( key , extension )
55
- const deferred = defer ( )
62
+ const blockPath = multihashToPath ( key )
63
+ const deferred = pullDefer ( )
56
64
57
- lock ( p , ( release ) => {
58
- const ext = extension === 'data' ? 'protobuf' : extension
65
+ lock ( blockPath , ( release ) => {
59
66
pull (
60
- store . read ( p ) ,
61
- pull . collect ( release ( ( err , data ) => {
62
- if ( err ) {
63
- return deferred . abort ( err )
64
- }
65
-
66
- deferred . resolve ( pull . values ( [
67
- new Block ( Buffer . concat ( data ) , ext )
68
- ] ) )
69
- } ) )
67
+ store . read ( blockPath ) ,
68
+ pull . collect ( release ( released ) )
70
69
)
71
70
} )
72
71
72
+ function released ( err , data ) {
73
+ if ( err ) {
74
+ return deferred . abort ( err )
75
+ }
76
+
77
+ deferred . resolve (
78
+ pull . values ( [
79
+ new Block ( Buffer . concat ( data ) )
80
+ ] )
81
+ )
82
+ }
83
+
73
84
return deferred
74
85
} ,
75
86
87
+ // returns a pull-stream to write blocks into
88
+ // TODO use a more explicit name, given that getStream is just for
89
+ // one block, multiple blocks should have different naming
76
90
putStream ( ) {
77
91
let ended = false
78
92
let written = [ ]
79
93
let push = null
80
94
81
- const sink = write ( ( blocks , cb ) => {
82
- parallel ( blocks . map ( ( block ) => ( cb ) => {
83
- writeBlock ( block , ( err , meta ) => {
84
- if ( err ) {
85
- return cb ( err )
86
- }
87
-
88
- if ( push ) {
89
- const read = push
90
- push = null
91
- read ( null , meta )
92
- return cb ( )
93
- }
94
-
95
- written . push ( meta )
96
- cb ( )
97
- } )
98
- } ) , cb )
95
+ const sink = pullWrite ( ( blocks , cb ) => {
96
+ const tasks = blocks . map ( ( block ) => {
97
+ return ( cb ) => {
98
+ writeBlock ( block , ( err , meta ) => {
99
+ if ( err ) {
100
+ return cb ( err )
101
+ }
102
+
103
+ if ( push ) {
104
+ const read = push
105
+ push = null
106
+ read ( null , meta )
107
+ return cb ( )
108
+ }
109
+
110
+ written . push ( meta )
111
+ cb ( )
112
+ } )
113
+ }
114
+ } )
115
+
116
+ parallel ( tasks , cb )
99
117
} , null , 100 , ( err ) => {
100
118
ended = err || true
101
- if ( push ) push ( ended )
119
+ if ( push ) {
120
+ push ( ended )
121
+ }
102
122
} )
103
123
124
+ // TODO ??Why does a putStream need to be a source as well??
104
125
const source = ( end , cb ) => {
105
- if ( end ) ended = end
126
+ if ( end ) {
127
+ ended = end
128
+ }
106
129
if ( ended ) {
107
130
return cb ( ended )
108
131
}
@@ -114,35 +137,25 @@ exports.setUp = (basePath, BlobStore, locks) => {
114
137
push = cb
115
138
}
116
139
117
- return { source, sink}
140
+ return { source : source , sink : sink }
118
141
} ,
119
142
120
- has ( key , extension , cb ) {
121
- if ( typeof extension === 'function' ) {
122
- cb = extension
123
- extension = undefined
124
- }
125
-
143
+ has ( key , callback ) {
126
144
if ( ! key ) {
127
- return cb ( new Error ( 'Invalid key' ) )
145
+ return callback ( new Error ( 'Invalid key' ) )
128
146
}
129
147
130
- const p = multihashToPath ( key , extension )
131
- store . exists ( p , cb )
148
+ const blockPath = multihashToPath ( key )
149
+ store . exists ( blockPath , callback )
132
150
} ,
133
151
134
- delete ( key , extension , cb ) {
135
- if ( typeof extension === 'function' ) {
136
- cb = extension
137
- extension = undefined
138
- }
139
-
152
+ delete ( key , callback ) {
140
153
if ( ! key ) {
141
- return cb ( new Error ( 'Invalid key' ) )
154
+ return callback ( new Error ( 'Invalid key' ) )
142
155
}
143
156
144
- const p = multihashToPath ( key , extension )
145
- store . remove ( p , cb )
157
+ const blockPath = multihashToPath ( key )
158
+ store . remove ( blockPath , callback )
146
159
}
147
160
}
148
161
}
0 commit comments