Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit 8568cd5

Browse files
achingbraindaviddias
authored andcommitted
feat: exporter - support slicing streams stored in deeply nested DAGs (#208)
1 parent 34d08a6 commit 8568cd5

File tree

4 files changed

+227
-190
lines changed

4 files changed

+227
-190
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
"lodash": "^4.17.5",
6767
"multihashes": "~0.4.13",
6868
"multihashing-async": "~0.4.8",
69-
"pull-async-values": "^1.0.3",
7069
"pull-batch": "^1.0.0",
7170
"pull-block": "^1.4.0",
7271
"pull-cat": "^1.1.11",
@@ -75,6 +74,7 @@
7574
"pull-pause": "0.0.2",
7675
"pull-pushable": "^2.2.0",
7776
"pull-stream": "^3.6.2",
77+
"pull-through": "^1.0.18",
7878
"pull-traverse": "^1.0.3",
7979
"pull-write": "^1.1.4",
8080
"sparse-array": "^1.3.1"

src/exporter/file.js

Lines changed: 116 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,13 @@
11
'use strict'
22

33
const traverse = require('pull-traverse')
4-
const traverseSlice = require('./traverse-slice')
54
const UnixFS = require('ipfs-unixfs')
65
const CID = require('cids')
76
const pull = require('pull-stream')
87
const paramap = require('pull-paramap')
98

109
// Logic to export a single (possibly chunked) unixfs file.
1110
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
12-
function getData (node) {
13-
try {
14-
const file = UnixFS.unmarshal(node.data)
15-
return file.data || Buffer.alloc(0)
16-
} catch (err) {
17-
throw new Error('Failed to unmarshal node')
18-
}
19-
}
20-
21-
function visitor (node) {
22-
return pull(
23-
pull.values(node.links),
24-
paramap((link, cb) => dag.get(new CID(link.multihash), cb)),
25-
pull.map((result) => result.value)
26-
)
27-
}
28-
2911
const accepts = pathRest[0]
3012

3113
if (accepts !== undefined && accepts !== path) {
@@ -34,17 +16,7 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,
3416

3517
const file = UnixFS.unmarshal(node.data)
3618
const fileSize = size || file.fileSize()
37-
38-
let content
39-
40-
if (!isNaN(begin)) {
41-
content = traverseSlice(node, dag, begin, end)
42-
} else {
43-
content = pull(
44-
traverse.depthFirst(node, visitor),
45-
pull.map(getData)
46-
)
47-
}
19+
const content = streamBytes(dag, node, fileSize, findByteRange(fileSize, begin, end))
4820

4921
return pull.values([{
5022
depth: depth,
@@ -56,3 +28,118 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,
5628
type: 'file'
5729
}])
5830
}
31+
32+
function findByteRange (fileSize, begin, end) {
33+
if (!begin) {
34+
begin = 0
35+
}
36+
37+
if (!end || end > fileSize) {
38+
end = fileSize
39+
}
40+
41+
if (begin < 0) {
42+
begin = fileSize + begin
43+
}
44+
45+
if (end < 0) {
46+
end = fileSize + end
47+
}
48+
49+
return {
50+
begin, end
51+
}
52+
}
53+
54+
function streamBytes (dag, node, fileSize, { begin, end }) {
55+
if (begin === end) {
56+
return pull.empty()
57+
}
58+
59+
let streamPosition = 0
60+
61+
function getData ({ node, start }) {
62+
if (!node || !node.data) {
63+
return
64+
}
65+
66+
try {
67+
const file = UnixFS.unmarshal(node.data)
68+
69+
if (!file.data) {
70+
return
71+
}
72+
73+
const block = extractDataFromBlock(file.data, start, begin, end)
74+
75+
return block
76+
} catch (err) {
77+
throw new Error('Failed to unmarshal node')
78+
}
79+
}
80+
81+
function visitor ({ node }) {
82+
const file = UnixFS.unmarshal(node.data)
83+
84+
// work out which child nodes contain the requested data
85+
const filteredLinks = node.links
86+
.map((link, index) => {
87+
const child = {
88+
link: link,
89+
start: streamPosition,
90+
end: streamPosition + file.blockSizes[index]
91+
}
92+
93+
streamPosition = child.end
94+
95+
return child
96+
})
97+
.filter((child, index) => {
98+
return (begin >= child.start && begin < child.end) || // child has begin byte
99+
(end > child.start && end <= child.end) || // child has end byte
100+
(begin < child.start && end > child.end) // child is between begin and end bytes
101+
})
102+
103+
if (filteredLinks.length) {
104+
// move stream position to the first node we're going to return data from
105+
streamPosition = filteredLinks[0].start
106+
}
107+
108+
return pull(
109+
pull.values(filteredLinks),
110+
paramap((child, cb) => {
111+
dag.get(new CID(child.link.multihash), (error, result) => cb(error, {
112+
start: child.start,
113+
end: child.end,
114+
node: result && result.value
115+
}))
116+
})
117+
)
118+
}
119+
120+
return pull(
121+
traverse.depthFirst({
122+
node,
123+
start: 0,
124+
end: fileSize
125+
}, visitor),
126+
pull.map(getData),
127+
pull.filter(Boolean)
128+
)
129+
}
130+
131+
function extractDataFromBlock (block, streamPosition, begin, end) {
132+
const blockLength = block.length
133+
134+
if (end - streamPosition < blockLength) {
135+
// If the end byte is in the current block, truncate the block to the end byte
136+
block = block.slice(0, end - streamPosition)
137+
}
138+
139+
if (begin > streamPosition && begin < (streamPosition + blockLength)) {
140+
// If the start byte is in the current block, skip to the start byte
141+
block = block.slice(begin - streamPosition)
142+
}
143+
144+
return block
145+
}

src/exporter/traverse-slice.js

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)