Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit f8fe961

Browse files
committed
Merge pull request #43 from noffle/stream-to-content
Rename "stream" to "content" in tuples.
2 parents 194e19a + 3f07067 commit f8fe961

File tree

6 files changed

+128
-54
lines changed

6 files changed

+128
-54
lines changed

README.md

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
IPFS unixFS Engine
22
===================
33

4-
> Import data into an IPFS DAG Service.
4+
> Import & Export data to/from an [IPFS DAG Service][]
55
66
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
77
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
@@ -47,8 +47,8 @@ const res = []
4747

4848
const rs = fs.createReadStream(file)
4949
const rs2 = fs.createReadStream(file2)
50-
const input = {path: /tmp/foo/bar, stream: rs}
51-
const input2 = {path: /tmp/foo/quxx, stream: rs2}
50+
const input = {path: /tmp/foo/bar, content: rs}
51+
const input2 = {path: /tmp/foo/quxx, content: rs2}
5252

5353
// Listen for the data event from the importer stream
5454

@@ -74,41 +74,47 @@ When run, the stat of DAG Node is outputted for each file on data event until th
7474

7575
```
7676
{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
77-
Size: 39243,
77+
size: 39243,
7878
path: '/tmp/foo/bar' }
7979
8080
{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
81-
Size: 59843,
81+
size: 59843,
8282
path: '/tmp/foo/quxx' }
8383
8484
{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
85-
Size: 93242,
85+
size: 93242,
8686
path: '/tmp/foo' }
8787
8888
{ multihash: <Buffer 12 20 bd e2 2b 57 3f 6f bd 7c cc 5a 11 7f 28 6c a2 9a 9f c0 90 e1 d4 16 d0 5f 42 81 ec 0c 2a 7f 7f 93>,
89-
Size: 94234,
89+
size: 94234,
9090
path: '/tmp' }
9191
9292
```
9393

94-
## API
94+
## Importer API
9595

9696
```js
9797
const Importer = require('ipfs-unixfs-engine').importer
9898
```
9999

100100
### const add = new Importer(dag)
101101

102-
The importer is a duplex stream in object mode that writes inputs of tuples
103-
of path and readable streams of data. You can stream an array of files to the
104-
importer, just call the 'end' function to signal that you are done inputting file/s.
105-
Listen to the 'data' for the returned informtion 'multihash, size and path' for
106-
each file added. Listen to the 'end' event from the stream to know when the
107-
importer has finished importing files. Input file paths with directory structure
108-
will preserve the hierarchy in the dag node.
102+
The importer is a object Transform stream that accepts objects of the form
103+
104+
```js
105+
{
106+
path: 'a name',
107+
content: (Buffer or Readable stream)
108+
}
109+
```
110+
111+
The stream will output IPFS DAG Node stats for the nodes it as added to the DAG
112+
Service. When stats on a node are emitted they are guaranteed to have been
113+
written into the DAG Service's storage mechanism.
114+
115+
The input's file paths and directory structure will be preserved in the DAG
116+
Nodes.
109117

110-
Uses the [DAG Service](https://github.com/vijayee/js-ipfs-merkle-dag/) instance
111-
`dagService`.
112118

113119
## Example Exporter
114120

@@ -133,22 +139,35 @@ exportEvent.on('data', (result) => {
133139
}
134140
```
135141

136-
##API
142+
## Exporter: API
137143
```js
138-
const Importer = require('ipfs-unixfs-engine').exporter
144+
const Exporter = require('ipfs-unixfs-engine').exporter
139145
```
140146

141-
The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.
147+
The exporter is a readable stream in object mode that outputs objects of the
148+
form
142149

150+
```js
151+
{
152+
path: 'a name',
153+
content: (Buffer or Readable stream)
154+
}
155+
```
156+
157+
by the multihash of the file from the DAG Service.
143158

144-
## install
159+
160+
## Install
145161

146162
With [npm](https://npmjs.org/) installed, run
147163

148164
```
149165
$ npm install ipfs-unixfs-engine
150166
```
151167

152-
## license
168+
## License
153169

154170
ISC
171+
172+
173+
[IPFS DAG Service]: https://github.com/vijayee/js-ipfs-merkle-dag/

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "ipfs-unixfs-engine",
33
"version": "0.8.0",
44
"description": "JavaScript implementation of the unixfs Engine used by IPFS",
5-
"main": "lib/index.js",
5+
"main": "src/index.js",
66
"jsnext:main": "src/index.js",
77
"scripts": {
88
"lint": "aegir-lint",
@@ -56,8 +56,10 @@
5656
"debug": "^2.2.0",
5757
"ipfs-merkle-dag": "^0.5.0",
5858
"ipfs-unixfs": "^0.1.0",
59+
"isstream": "^0.1.2",
5960
"readable-stream": "^1.1.13",
6061
"run-series": "^1.1.4",
62+
"streamifier": "^0.1.1",
6163
"through2": "^2.0.0"
6264
},
6365
"contributors": [
@@ -68,4 +70,4 @@
6870
"greenkeeperio-bot <[email protected]>",
6971
"nginnever <[email protected]>"
7072
]
71-
}
73+
}

src/exporter.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ function Exporter (hash, dagService, options) {
4242
rs.push(unmarshaledData.data)
4343
rs.push(null)
4444
}
45-
this.push({ stream: rs, path: name })
45+
this.push({ content: rs, path: name })
4646
callback()
4747
return
4848
} else {
@@ -75,7 +75,7 @@ function Exporter (hash, dagService, options) {
7575
return
7676
})
7777
}
78-
this.push({ stream: rs, path: name })
78+
this.push({ content: rs, path: name })
7979
callback()
8080
return
8181
}
@@ -97,7 +97,7 @@ function Exporter (hash, dagService, options) {
9797
rs.push(node.data)
9898
rs.push(null)
9999
}
100-
this.push({stream: null, path: name})
100+
this.push({content: null, path: name})
101101
callback()
102102
return
103103
} else {

src/importer.js

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const UnixFS = require('ipfs-unixfs')
1010
const util = require('util')
1111
const bs58 = require('bs58')
1212
const Duplex = require('readable-stream').Duplex
13+
const isStream = require('isstream')
14+
const streamifier = require('streamifier')
1315

1416
exports = module.exports = Importer
1517

@@ -36,7 +38,7 @@ function Importer (dagService, options) {
3638
this._write = (fl, enc, next) => {
3739
this.read()
3840
counter++
39-
if (!fl.stream) {
41+
if (!fl.content) {
4042
// 1. create the empty dir dag node
4143
// 2. write it to the dag store
4244
// 3. add to the files array {path: <>, hash: <>}
@@ -63,8 +65,20 @@ function Importer (dagService, options) {
6365
return
6466
}
6567

68+
// Convert a buffer to a readable stream
69+
if (Buffer.isBuffer(fl.content)) {
70+
const r = streamifier.createReadStream(fl.content)
71+
fl.content = r
72+
}
73+
74+
// Bail if 'content' is not readable
75+
if (!isStream.isReadable(fl.content)) {
76+
this.emit('error', new Error('"content" is not a Buffer nor Readable stream'))
77+
return
78+
}
79+
6680
const leaves = []
67-
fl.stream
81+
fl.content
6882
.pipe(fsc(CHUNK_SIZE))
6983
.pipe(through2((chunk, enc, cb) => {
7084
// 1. create the unixfs merkledag node
@@ -224,13 +238,15 @@ function Importer (dagService, options) {
224238
// If the value is not an object
225239
// add as a link to the dirNode
226240

227-
function traverse (tree, base) {
241+
let pendingWrites = 0
242+
243+
function traverse (tree, path, done) {
228244
const keys = Object.keys(tree)
229245
let tmpTree = tree
230246
keys.map((key) => {
231247
if (typeof tmpTree[key] === 'object' &&
232248
!Buffer.isBuffer(tmpTree[key])) {
233-
tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key)
249+
tmpTree[key] = traverse.call(this, tmpTree[key], path ? path + '/' + key : key, done)
234250
}
235251
})
236252

@@ -250,28 +266,39 @@ function Importer (dagService, options) {
250266
})
251267

252268
n.data = d.marshal()
269+
270+
pendingWrites++
253271
dagService.add(n, (err) => {
272+
pendingWrites--
254273
if (err) {
255274
this.push({error: 'failed to store dirNode'})
275+
} else if (path) {
276+
const el = {
277+
path: path,
278+
multihash: n.multihash(),
279+
yes: 'no',
280+
size: n.size()
281+
}
282+
this.push(el)
283+
}
284+
285+
if (pendingWrites <= 0) {
286+
done()
256287
}
257288
})
258289

259-
if (!base) {
290+
if (!path) {
260291
return
261292
}
262293

263-
const el = {
264-
path: base,
265-
multihash: n.multihash(),
266-
size: n.size()
267-
}
268-
this.push(el)
269-
270294
mhIndex[bs58.encode(n.multihash())] = { size: n.size() }
271295
return n.multihash()
272296
}
273-
/* const rootHash = */ traverse.call(this, fileTree)
274-
this.push(null)
297+
298+
let self = this
299+
/* const rootHash = */ traverse.call(this, fileTree, null, function () {
300+
self.push(null)
301+
})
275302
}
276303
}
277304
}

test/test-exporter.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ module.exports = function (repo) {
3333
expect(err).to.not.exist
3434
const testExport = exporter(hash, ds)
3535
testExport.on('data', (file) => {
36-
file.stream.pipe(bl((err, bldata) => {
36+
file.content.pipe(bl((err, bldata) => {
3737
expect(err).to.not.exist
3838
expect(bldata).to.deep.equal(unmarsh.data)
3939
done()
@@ -48,7 +48,7 @@ module.exports = function (repo) {
4848
const ds = new DAGService(bs)
4949
const testExport = exporter(hash, ds)
5050
testExport.on('data', (file) => {
51-
file.stream.pipe(bl((err, bldata) => {
51+
file.content.pipe(bl((err, bldata) => {
5252
expect(bldata).to.deep.equal(bigFile)
5353
expect(err).to.not.exist
5454
done()
@@ -63,7 +63,7 @@ module.exports = function (repo) {
6363
const testExport = exporter(hash, ds)
6464
testExport.on('data', (file) => {
6565
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
66-
file.stream.pipe(bl((err, bldata) => {
66+
file.content.pipe(bl((err, bldata) => {
6767
expect(err).to.not.exist
6868
done()
6969
}))
@@ -94,7 +94,7 @@ module.exports = function (repo) {
9494
const ds = new DAGService(bs)
9595
const testExport = exporter(hash, ds)
9696
testExport.on('data', (dir) => {
97-
expect(dir.stream).to.equal(null)
97+
expect(dir.content).to.equal(null)
9898
done()
9999
})
100100
})

0 commit comments

Comments
 (0)