diff --git a/package.json b/package.json index 268b7db5..fcc3574c 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,6 @@ "lodash": "^4.17.5", "multihashes": "~0.4.13", "multihashing-async": "~0.4.8", - "pull-async-values": "^1.0.3", "pull-batch": "^1.0.0", "pull-block": "^1.4.0", "pull-cat": "^1.1.11", @@ -75,6 +74,7 @@ "pull-pause": "0.0.2", "pull-pushable": "^2.2.0", "pull-stream": "^3.6.2", + "pull-through": "^1.0.18", "pull-traverse": "^1.0.3", "pull-write": "^1.1.4", "sparse-array": "^1.3.1" diff --git a/src/exporter/file.js b/src/exporter/file.js index 02b592a1..50787fc9 100644 --- a/src/exporter/file.js +++ b/src/exporter/file.js @@ -1,7 +1,6 @@ 'use strict' const traverse = require('pull-traverse') -const traverseSlice = require('./traverse-slice') const UnixFS = require('ipfs-unixfs') const CID = require('cids') const pull = require('pull-stream') @@ -9,23 +8,6 @@ const paramap = require('pull-paramap') // Logic to export a single (possibly chunked) unixfs file. module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => { - function getData (node) { - try { - const file = UnixFS.unmarshal(node.data) - return file.data || Buffer.alloc(0) - } catch (err) { - throw new Error('Failed to unmarshal node') - } - } - - function visitor (node) { - return pull( - pull.values(node.links), - paramap((link, cb) => dag.get(new CID(link.multihash), cb)), - pull.map((result) => result.value) - ) - } - const accepts = pathRest[0] if (accepts !== undefined && accepts !== path) { @@ -34,17 +16,7 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, const file = UnixFS.unmarshal(node.data) const fileSize = size || file.fileSize() - - let content - - if (!isNaN(begin)) { - content = traverseSlice(node, dag, begin, end) - } else { - content = pull( - traverse.depthFirst(node, visitor), - pull.map(getData) - ) - } + const content = streamBytes(dag, node, fileSize, findByteRange(fileSize, begin, end)) return pull.values([{ depth: depth, @@ -56,3 +28,118 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, type: 'file' }]) } + +function findByteRange (fileSize, begin, end) { + if (!begin) { + begin = 0 + } + + if (!end || end > fileSize) { + end = fileSize + } + + if (begin < 0) { + begin = fileSize + begin + } + + if (end < 0) { + end = fileSize + end + } + + return { + begin, end + } +} + +function streamBytes (dag, node, fileSize, { begin, end }) { + if (begin === end) { + return pull.empty() + } + + let streamPosition = 0 + + function getData ({ node, start }) { + if (!node || !node.data) { + return + } + + try { + const file = UnixFS.unmarshal(node.data) + + if (!file.data) { + return + } + + const block = extractDataFromBlock(file.data, start, begin, end) + + return block + } catch (err) { + throw new Error('Failed to unmarshal node') + } + } + + function visitor ({ node }) { + const file = UnixFS.unmarshal(node.data) + + // work out which child nodes contain the requested data + const filteredLinks = node.links + .map((link, index) => { + const child = { + link: link, + start: streamPosition, + end: streamPosition + file.blockSizes[index] + } + + streamPosition = child.end + + return child + }) + .filter((child, index) => { + return (begin >= child.start && begin < child.end) || // child has begin byte + (end > child.start && end <= child.end) || // child has end byte + (begin < child.start && end > child.end) // child is between begin and end bytes + }) + + if (filteredLinks.length) { + // move stream position to the first node we're going to return data from + streamPosition = filteredLinks[0].start + } + + return pull( + pull.values(filteredLinks), + paramap((child, cb) => { + dag.get(new CID(child.link.multihash), (error, result) => cb(error, { + start: child.start, + end: child.end, + node: result && result.value + })) + }) + ) + } + + return pull( + traverse.depthFirst({ + node, + start: 0, + end: fileSize + }, visitor), + pull.map(getData), + pull.filter(Boolean) + ) +} + +function extractDataFromBlock (block, streamPosition, begin, end) { + const blockLength = block.length + + if (end - streamPosition < blockLength) { + // If the end byte is in the current block, truncate the block to the end byte + block = block.slice(0, end - streamPosition) + } + + if (begin > streamPosition && begin < (streamPosition + blockLength)) { + // If the start byte is in the current block, skip to the start byte + block = block.slice(begin - streamPosition) + } + + return block +} diff --git a/src/exporter/traverse-slice.js b/src/exporter/traverse-slice.js deleted file mode 100644 index 598ca091..00000000 --- a/src/exporter/traverse-slice.js +++ /dev/null @@ -1,104 +0,0 @@ -'use strict' - -const CID = require('cids') -const pull = require('pull-stream') -const asyncValues = require('pull-async-values') -const asyncMap = require('pull-stream/throughs/async-map') -const map = require('pull-stream/throughs/map') -const UnixFS = require('ipfs-unixfs') -const waterfall = require('async/waterfall') - -module.exports = (fileNode, dag, begin = 0, end) => { - let streamPosition = 0 - - return pull( - asyncValues((cb) => { - const meta = UnixFS.unmarshal(fileNode.data) - - if (meta.type !== 'file') { - return cb(new Error(`Node ${fileNode} was not a file (was ${meta.type}), can only read files`)) - } - - const fileSize = meta.fileSize() - - if (!end || end > fileSize) { - end = fileSize - } - - if (begin < 0) { - begin = fileSize + begin - } - - if (end < 0) { - end = fileSize + end - } - - const links = fileNode.links - - if (!links || !links.length) { - if (meta.data && meta.data.length) { - // file was small enough to fit in one DAGNode so has no links - return cb(null, [(done) => done(null, meta.data)]) - } - - return cb(new Error(`Path ${fileNode} had no links or data`)) - } - - const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0) - const overhead = (linkedDataSize - meta.fileSize()) / links.length - - // create an array of functions to fetch link data - cb(null, links.map((link) => (done) => { - // DAGNode Links report unixfs object data sizes $overhead bytes (typically 14) - // larger than they actually are due to the protobuf wrapper - const bytesInLinkedObjectData = link.size - overhead - - if (begin > (streamPosition + bytesInLinkedObjectData)) { - // Start byte is after this block so skip it - streamPosition += bytesInLinkedObjectData - - return done() - } - - if (end < streamPosition) { - // End byte was before this block so skip it - streamPosition += bytesInLinkedObjectData - - return done() - } - - // transform the multihash to a cid, the cid to a node and the node to some data - waterfall([ - (next) => dag.get(new CID(link.multihash), next), - (node, next) => next(null, node.value.data), - (data, next) => next(null, UnixFS.unmarshal(data).data) - ], done) - })) - }), - asyncMap((loadLinkData, cb) => loadLinkData(cb)), - pull.filter(Boolean), - map((data) => { - const block = extractDataFromBlock(data, streamPosition, begin, end) - - streamPosition += data.length - - return block - }) - ) -} - -function extractDataFromBlock (block, streamPosition, begin, end) { - const blockLength = block.length - - if (end - streamPosition < blockLength) { - // If the end byte is in the current block, truncate the block to the end byte - block = block.slice(0, end - streamPosition) - } - - if (begin > streamPosition && begin < (streamPosition + blockLength)) { - // If the start byte is in the current block, skip to the start byte - block = block.slice(begin - streamPosition) - } - - return block -} diff --git a/test/exporter.js b/test/exporter.js index 83fe9f52..02ac4e2f 100644 --- a/test/exporter.js +++ b/test/exporter.js @@ -12,6 +12,8 @@ const pull = require('pull-stream') const zip = require('pull-zip') const CID = require('cids') const loadFixture = require('aegir/fixtures') +const doUntil = require('async/doUntil') +const waterfall = require('async/waterfall') const unixFSEngine = require('./../src') const exporter = unixFSEngine.exporter @@ -23,35 +25,45 @@ module.exports = (repo) => { describe('exporter', () => { let ipld - function addAndReadTestFile ({file, begin, end, strategy = 'balanced', path = '/foo'}, cb) { + function addTestFile ({file, strategy = 'balanced', path = '/foo', maxChunkSize}, cb) { pull( pull.values([{ path, content: file }]), importer(ipld, { - strategy + strategy, + chunkerOptions: { + maxChunkSize + } }), pull.collect((error, nodes) => { - expect(error).to.not.exist() - expect(nodes.length).to.be.eql(1) - - pull( - exporter(nodes[0].multihash, ipld, { - begin, end - }), - pull.collect((error, files) => { - if (error) { - return cb(error) - } - - readFile(files[0], cb) - }) - ) + cb(error, nodes && nodes[0] && nodes[0].multihash) }) ) } + function addAndReadTestFile ({file, begin, end, strategy = 'balanced', path = '/foo', maxChunkSize}, cb) { + addTestFile({file, strategy, path, maxChunkSize}, (error, multihash) => { + if (error) { + return cb(error) + } + + pull( + exporter(multihash, ipld, { + begin, end + }), + pull.collect((error, files) => { + if (error) { + return cb(error) + } + + readFile(files[0], cb) + }) + ) + }) + } + function checkBytesThatSpanBlocks (strategy, cb) { const bytesInABlock = 262144 const bytes = Buffer.alloc(bytesInABlock + 100, 0) @@ -106,7 +118,7 @@ module.exports = (repo) => { }) }) - it('export a file with no links', (done) => { + it('exports a file with no links', (done) => { const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8' pull( @@ -127,7 +139,7 @@ module.exports = (repo) => { ) }) - it('export a chunk of a file with no links', (done) => { + it('exports a chunk of a file with no links', (done) => { const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8' const begin = 0 const end = 5 @@ -154,9 +166,10 @@ module.exports = (repo) => { ) }) - it('export a small file with links', function (done) { + it('exports a small file with links', function (done) { this.timeout(30 * 1000) const hash = 'QmW7BDxEbGqxxSYVtn3peNPQgdDXbWkoQ6J1EFYAEuQV3Q' + pull( exporter(hash, ipld), pull.collect((err, files) => { @@ -186,7 +199,7 @@ module.exports = (repo) => { ) }) - it('export a small file with links using CID instead of multihash', function (done) { + it('exports a small file with links using CID instead of multihash', function (done) { this.timeout(30 * 1000) const cid = new CID('QmW7BDxEbGqxxSYVtn3peNPQgdDXbWkoQ6J1EFYAEuQV3Q') @@ -219,7 +232,7 @@ module.exports = (repo) => { ) }) - it('export a large file > 5mb', function (done) { + it('exports a large file > 5mb', function (done) { this.timeout(30 * 1000) const hash = 'QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE' pull( @@ -274,7 +287,7 @@ module.exports = (repo) => { ) }) - it('export a directory', function (done) { + it('exports a directory', function (done) { this.timeout(30 * 1000) const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN' @@ -315,7 +328,7 @@ module.exports = (repo) => { ) }) - it('export a directory one deep', function (done) { + it('exports a directory one deep', function (done) { this.timeout(30 * 1000) const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN' @@ -370,10 +383,7 @@ module.exports = (repo) => { file: Buffer.from([0, 1, 2, 3]), begin: 1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([1, 2, 3])) done() @@ -385,10 +395,7 @@ module.exports = (repo) => { file: Buffer.from([0, 1, 2, 3]), begin: -1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([3])) done() @@ -401,10 +408,7 @@ module.exports = (repo) => { begin: 0, end: 1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([0])) done() @@ -417,10 +421,7 @@ module.exports = (repo) => { begin: 2, end: -1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([2, 3])) done() @@ -433,10 +434,7 @@ module.exports = (repo) => { begin: 1, end: 4 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([1, 2, 3])) done() @@ -449,10 +447,7 @@ module.exports = (repo) => { begin: -1, end: -1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([])) done() @@ -465,10 +460,7 @@ module.exports = (repo) => { begin: -2, end: -1 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([3])) done() @@ -481,16 +473,77 @@ module.exports = (repo) => { begin: 0, end: 100 }, (error, data) => { - if (error) { - return done(error) - } - + expect(error).to.not.exist() expect(data).to.deep.equal(Buffer.from([0, 1, 2, 3])) done() }) }) + it('reads files that are split across lots of nodes', function (done) { + this.timeout(30 * 1000) + + addAndReadTestFile({ + file: bigFile, + begin: 0, + end: bigFile.length, + maxChunkSize: 1024 + }, (error, data) => { + expect(error).to.not.exist() + expect(data).to.deep.equal(bigFile) + + done() + }) + }) + + it('reads files in multiple steps that are split across lots of nodes in really small chunks', function (done) { + this.timeout(600 * 1000) + + let results = [] + let chunkSize = 1024 + let begin = 0 + + addTestFile({ + file: bigFile, + maxChunkSize: 1024 + }, (error, multihash) => { + expect(error).to.not.exist() + + doUntil( + (cb) => { + waterfall([ + (next) => { + pull( + exporter(multihash, ipld, { + begin, + end: begin + chunkSize + }), + pull.collect(next) + ) + }, + (files, next) => readFile(files[0], next) + ], cb) + }, + (result) => { + results.push(result) + + begin += result.length + + return begin >= bigFile.length + }, + (error) => { + expect(error).to.not.exist() + + const buffer = Buffer.concat(results) + + expect(buffer).to.deep.equal(bigFile) + + done() + } + ) + }) + }) + it('reads bytes with an offset and a length that span blocks using balanced layout', (done) => { checkBytesThatSpanBlocks('balanced', done) }) @@ -536,6 +589,7 @@ function fileEql (actual, expected, done) { } catch (err) { return done(err) } + done() }) }