diff --git a/README.md b/README.md index 674f3e4..da32517 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ - [Example](#example) - [API](#api) - [const import = importer(source, ipld [, options])](#const-import--importersource-ipld--options) +- [Overriding internals](#overriding-internals) - [Contribute](#contribute) - [License](#license) @@ -145,8 +146,32 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50). - `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50). +## Overriding internals + +Several aspects of the importer are overridable by specifying functions as part of the options object with these keys: + +- `chunkValidator` (function): Optional function that supports the signature `async function * (source, options)` + - This function takes input from the `content` field of imported entries. It should transform them into `Buffer`s, throwing an error if it cannot. + - It should yield `Buffer` objects constructed from the `source` or throw an `Error` +- `chunker` (function): Optional function that supports the signature `async function * (source, options)` where `source` is an async generator and `options` is an options object + - It should yield `Buffer` objects. +- `bufferImporter` (function): Optional function that supports the signature `async function * (entry, source, ipld, options)` + - This function should read `Buffer`s from `source` and persist them using `ipld.put` or similar + - `entry` is the `{ path, content }` entry, `source` is an async generator that yields Buffers + - It should yield functions that return a Promise that resolves to an object with the properties `{ cid, unixfs, size }` where `cid` is a [CID], `unixfs` is a [UnixFS] entry and `size` is a `Number` that represents the serialized size of the [IPLD] node that holds the buffer data. + - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `blockWriteConcurrency` option (default: 10) +- `dagBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - This function should read `{ path, content }` entries from `source` and turn them into DAGs + - It should yield a `function` that returns a `Promise` that resolves to `{ cid, path, unixfs, node }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `node` is a `DAGNode`. + - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `fileImportConcurrency` option (default: 50) +- `treeBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - This function should read `{ cid, path, unixfs, node }` entries from `source` and place them in a directory structure + - It should yield an object with the properties `{ cid, path, unixfs, size }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `size` is a `Number`. + [ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver [UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs +[IPLD]: https://github.com/ipld/js-ipld +[CID]: https://github.com/multiformats/js-cid ## Contribute diff --git a/src/dag-builder/dir.js b/src/dag-builder/dir.js index c3f8381..42cce15 100644 --- a/src/dag-builder/dir.js +++ b/src/dag-builder/dir.js @@ -21,7 +21,7 @@ const dirBuilder = async (item, ipld, options) => { cid, path, unixfs, - node + size: node.size } } diff --git a/src/dag-builder/file/buffer-importer.js b/src/dag-builder/file/buffer-importer.js new file mode 100644 index 0000000..88d89bd --- /dev/null +++ b/src/dag-builder/file/buffer-importer.js @@ -0,0 +1,50 @@ +'use strict' + +const UnixFS = require('ipfs-unixfs') +const persist = require('../../utils/persist') +const { + DAGNode +} = require('ipld-dag-pb') + +async function * bufferImporter (file, source, ipld, options) { + for await (const buffer of source) { + yield async () => { + options.progress(buffer.length) + let node + let unixfs + let size + + const opts = { + ...options + } + + if (options.rawLeaves) { + node = buffer + size = buffer.length + + opts.codec = 'raw' + opts.cidVersion = 1 + } else { + unixfs = new UnixFS({ + type: options.leafType, + data: buffer, + mtime: file.mtime, + mode: file.mode + }) + + node = new DAGNode(unixfs.marshal()) + size = node.size + } + + const cid = await persist(node, ipld, opts) + + return { + cid: cid, + unixfs, + size + } + } + } +} + +module.exports = bufferImporter diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index f9df4fd..0f32c33 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -16,49 +16,18 @@ const dagBuilders = { trickle: require('./trickle') } -async function * importBuffer (file, source, ipld, options) { - for await (const buffer of source) { - yield async () => { - options.progress(buffer.length) - let node - let unixfs - - const opts = { - ...options - } - - if (options.rawLeaves) { - node = buffer - - opts.codec = 'raw' - opts.cidVersion = 1 - } else { - unixfs = new UnixFS({ - type: options.leafType, - data: buffer, - mtime: file.mtime, - mode: file.mode - }) - - node = new DAGNode(unixfs.marshal()) - } - - const cid = await persist(node, ipld, opts) - - return { - cid: cid, - unixfs, - node - } - } - } -} - async function * buildFileBatch (file, source, ipld, options) { let count = -1 let previous + let bufferImporter + + if (typeof options.bufferImporter === 'function') { + bufferImporter = options.bufferImporter + } else { + bufferImporter = require('./buffer-importer') + } - for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) { + for await (const entry of parallelBatch(bufferImporter(file, source, ipld, options), options.blockWriteConcurrency)) { count++ if (count === 0) { @@ -86,9 +55,8 @@ const reduce = (file, ipld, options) => { return { cid: leaf.cid, path: file.path, - name: (file.path || '').split('/').pop(), unixfs: leaf.unixfs, - node: leaf.node + size: leaf.size } } @@ -101,7 +69,7 @@ const reduce = (file, ipld, options) => { const links = leaves .filter(leaf => { - if (leaf.cid.codec === 'raw' && leaf.node.length) { + if (leaf.cid.codec === 'raw' && leaf.size) { return true } @@ -114,9 +82,9 @@ const reduce = (file, ipld, options) => { .map((leaf) => { if (leaf.cid.codec === 'raw') { // node is a leaf buffer - f.addBlockSize(leaf.node.length) + f.addBlockSize(leaf.size) - return new DAGLink(leaf.name, leaf.node.length, leaf.cid) + return new DAGLink(leaf.name, leaf.size, leaf.cid) } if (!leaf.unixfs.data) { @@ -127,7 +95,7 @@ const reduce = (file, ipld, options) => { f.addBlockSize(leaf.unixfs.data.length) } - return new DAGLink(leaf.name, leaf.node.size, leaf.cid) + return new DAGLink(leaf.name, leaf.size, leaf.cid) }) const node = new DAGNode(f.marshal(), links) @@ -137,7 +105,6 @@ const reduce = (file, ipld, options) => { cid, path: file.path, unixfs: f, - node, size: node.size } } diff --git a/src/dag-builder/index.js b/src/dag-builder/index.js index bc2f4b2..a55888d 100644 --- a/src/dag-builder/index.js +++ b/src/dag-builder/index.js @@ -2,8 +2,6 @@ const dirBuilder = require('./dir') const fileBuilder = require('./file') -const createChunker = require('../chunker') -const validateChunks = require('./validate-chunks') async function * dagBuilder (source, ipld, options) { for await (const entry of source) { @@ -30,10 +28,26 @@ async function * dagBuilder (source, ipld, options) { } } - const chunker = createChunker(options.chunker, validateChunks(source), options) + let chunker + + if (typeof options.chunker === 'function') { + chunker = options.chunker + } else if (options.chunker === 'rabin') { + chunker = require('../chunker/rabin') + } else { + chunker = require('../chunker/fixed-size') + } + + let chunkValidator + + if (typeof options.chunkValidator === 'function') { + chunkValidator = options.chunkValidator + } else { + chunkValidator = require('./validate-chunks') + } // item is a file - yield () => fileBuilder(entry, chunker, ipld, options) + yield () => fileBuilder(entry, chunker(chunkValidator(source, options), options), ipld, options) } else { // item is a directory yield () => dirBuilder(entry, ipld, options) diff --git a/src/dir-flat.js b/src/dir-flat.js index 266901b..5086604 100644 --- a/src/dir-flat.js +++ b/src/dir-flat.js @@ -65,7 +65,7 @@ class DirFlat extends Dir { } } - links.push(new DAGLink(children[i], child.node.length || child.node.size, child.cid)) + links.push(new DAGLink(children[i], child.size, child.cid)) } const unixfs = new UnixFS({ @@ -84,7 +84,7 @@ class DirFlat extends Dir { cid, unixfs, path, - node + size: node.size } } } diff --git a/src/dir-sharded.js b/src/dir-sharded.js index 26065c7..e295984 100644 --- a/src/dir-sharded.js +++ b/src/dir-sharded.js @@ -107,7 +107,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) { shard = subShard } - links.push(new DAGLink(labelPrefix, shard.node.size, shard.cid)) + links.push(new DAGLink(labelPrefix, shard.size, shard.cid)) } else if (typeof child.value.flush === 'function') { const dir = child.value let flushedDir @@ -119,7 +119,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) { } const label = labelPrefix + child.key - links.push(new DAGLink(label, flushedDir.node.size, flushedDir.cid)) + links.push(new DAGLink(label, flushedDir.size, flushedDir.cid)) } else { const value = child.value @@ -155,8 +155,8 @@ async function * flush (path, bucket, ipld, shardRoot, options) { yield { cid, - node, unixfs: dir, - path + path, + size: node.size } } diff --git a/src/index.js b/src/index.js index e1240e9..052acff 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,5 @@ 'use strict' -const dagBuilder = require('./dag-builder') -const treeBuilder = require('./tree-builder') const parallelBatch = require('it-parallel-batch') const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) @@ -30,7 +28,9 @@ const defaultOptions = { pin: true, recursive: false, hidden: false, - preload: true + preload: true, + chunkValidator: null, + importBuffer: null } module.exports = async function * (source, ipld, options = {}) { @@ -58,6 +58,22 @@ module.exports = async function * (source, ipld, options = {}) { opts.codec = options.format } + let dagBuilder + + if (typeof options.dagBuilder === 'function') { + dagBuilder = options.dagBuilder + } else { + dagBuilder = require('./dag-builder') + } + + let treeBuilder + + if (typeof options.treeBuilder === 'function') { + treeBuilder = options.treeBuilder + } else { + treeBuilder = require('./tree-builder') + } + for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) { yield { cid: entry.cid, diff --git a/test/chunker-custom.spec.js b/test/chunker-custom.spec.js new file mode 100644 index 0000000..2dd00f0 --- /dev/null +++ b/test/chunker-custom.spec.js @@ -0,0 +1,73 @@ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const IPLD = require('ipld') +const inMemory = require('ipld-in-memory') +const mc = require('multicodec') + +// eslint bug https://github.com/eslint/eslint/issues/12459 +// eslint-disable-next-line require-await +const iter = async function * () { + yield Buffer.from('one') + yield Buffer.from('two') +} + +describe('custom chunker', function () { + let inmem + + const fromPartsTest = (iter, size) => async () => { + for await (const part of importer([{ + content: iter() + }], inmem, { + chunkValidator: source => source, + chunker: source => source, + bufferImporter: async function * (file, source, ipld, options) { + for await (const item of source) { + yield () => Promise.resolve(item) + } + } + })) { + expect(part.size).to.equal(size) + } + } + + before(async () => { + inmem = await inMemory(IPLD) + }) + + it('keeps custom chunking', async () => { + const chunker = source => source + const content = iter() + for await (const part of importer([{ path: 'test', content }], inmem, { + chunker + })) { + expect(part.size).to.equal(116) + } + }) + + // eslint bug https://github.com/eslint/eslint/issues/12459 + const multi = async function * () { + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + } + it('works with multiple parts', fromPartsTest(multi, 120)) + + const single = async function * () { + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + } + it('works with single part', fromPartsTest(single, 11)) +}) diff --git a/test/importer.spec.js b/test/importer.spec.js index 50bef67..cf6784e 100644 --- a/test/importer.spec.js +++ b/test/importer.spec.js @@ -924,3 +924,64 @@ strategies.forEach((strategy) => { }) }) }) + +describe('configuration', () => { + it('alllows configuring with custom dag and tree builder', async () => { + let builtTree = false + const ipld = 'ipld' + const entries = await all(importer([{ + path: 'path', + content: 'content' + }], ipld, { + dagBuilder: async function * (source, ipld, opts) { // eslint-disable-line require-await + yield function () { + return Promise.resolve({ + cid: 'cid', + path: 'path', + unixfs: 'unixfs' + }) + } + }, + treeBuilder: async function * (source, ipld, opts) { // eslint-disable-line require-await + builtTree = true + yield * source + } + })) + + expect(entries).to.have.lengthOf(1) + expect(entries).to.have.nested.property('[0].cid', 'cid') + expect(entries).to.have.nested.property('[0].path', 'path') + expect(entries).to.have.nested.property('[0].unixfs', 'unixfs') + + expect(builtTree).to.be.true() + }) + + it('alllows configuring with custom chunker', async () => { + let validated = false + let chunked = false + const ipld = { + put: () => 'cid' + } + const entries = await all(importer([{ + path: 'path', + content: 'content' + }], ipld, { + chunkValidator: async function * (source, opts) { // eslint-disable-line require-await + validated = true + yield * source + }, + chunker: async function * (source, opts) { // eslint-disable-line require-await + chunked = true + yield * source + } + })) + + expect(entries).to.have.lengthOf(1) + expect(entries).to.have.nested.property('[0].cid', 'cid') + expect(entries).to.have.nested.property('[0].path', 'path') + expect(entries).to.have.nested.property('[0].unixfs') + + expect(validated).to.be.true() + expect(chunked).to.be.true() + }) +})