Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

feat: allow overriding of internal functions #48

Merged
merged 2 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [Example](#example)
- [API](#api)
- [const import = importer(source, ipld [, options])](#const-import--importersource-ipld--options)
- [Overriding internals](#overriding-internals)
- [Contribute](#contribute)
- [License](#license)

Expand Down Expand Up @@ -145,8 +146,32 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).

## Overriding internals

Several aspects of the importer are overridable by specifying functions as part of the options object with these keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to know the role of each of these in addition to their signature and return type. It’s great that they can be overridden but I don’t see why/how a user can use it without knowing what part these functions play in the import process.


- `chunkValidator` (function): Optional function that supports the signature `async function * (source, options)`
- This function takes input from the `content` field of imported entries. It should transform them into `Buffer`s, throwing an error if it cannot.
- It should yield `Buffer` objects constructed from the `source` or throw an `Error`
- `chunker` (function): Optional function that supports the signature `async function * (source, options)` where `source` is an async generator and `options` is an options object
- It should yield `Buffer` objects.
- `bufferImporter` (function): Optional function that supports the signature `async function * (entry, source, ipld, options)`
- This function should read `Buffer`s from `source` and persist them using `ipld.put` or similar
- `entry` is the `{ path, content }` entry, `source` is an async generator that yields Buffers
- It should yield functions that return a Promise that resolves to an object with the properties `{ cid, unixfs, size }` where `cid` is a [CID], `unixfs` is a [UnixFS] entry and `size` is a `Number` that represents the serialized size of the [IPLD] node that holds the buffer data.
- Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `blockWriteConcurrency` option (default: 10)
- `dagBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)`
- This function should read `{ path, content }` entries from `source` and turn them into DAGs
- It should yield a `function` that returns a `Promise` that resolves to `{ cid, path, unixfs, node }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `node` is a `DAGNode`.
- Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `fileImportConcurrency` option (default: 50)
- `treeBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)`
- This function should read `{ cid, path, unixfs, node }` entries from `source` and place them in a directory structure
- It should yield an object with the properties `{ cid, path, unixfs, size }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `size` is a `Number`.

[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[IPLD]: https://github.com/ipld/js-ipld
[CID]: https://github.com/multiformats/js-cid

## Contribute

Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/dir.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const dirBuilder = async (item, ipld, options) => {
cid,
path,
unixfs,
node
size: node.size
}
}

Expand Down
50 changes: 50 additions & 0 deletions src/dag-builder/file/buffer-importer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict'

const UnixFS = require('ipfs-unixfs')
const persist = require('../../utils/persist')
const {
DAGNode
} = require('ipld-dag-pb')

async function * bufferImporter (file, source, ipld, options) {
for await (const buffer of source) {
yield async () => {
options.progress(buffer.length)
let node
let unixfs
let size

const opts = {
...options
}

if (options.rawLeaves) {
node = buffer
size = buffer.length

opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS({
type: options.leafType,
data: buffer,
mtime: file.mtime,
mode: file.mode
})

node = new DAGNode(unixfs.marshal())
size = node.size
}

const cid = await persist(node, ipld, opts)

return {
cid: cid,
unixfs,
size
}
}
}
}

module.exports = bufferImporter
59 changes: 13 additions & 46 deletions src/dag-builder/file/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,18 @@ const dagBuilders = {
trickle: require('./trickle')
}

async function * importBuffer (file, source, ipld, options) {
for await (const buffer of source) {
yield async () => {
options.progress(buffer.length)
let node
let unixfs

const opts = {
...options
}

if (options.rawLeaves) {
node = buffer

opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS({
type: options.leafType,
data: buffer,
mtime: file.mtime,
mode: file.mode
})

node = new DAGNode(unixfs.marshal())
}

const cid = await persist(node, ipld, opts)

return {
cid: cid,
unixfs,
node
}
}
}
}

async function * buildFileBatch (file, source, ipld, options) {
let count = -1
let previous
let bufferImporter

if (typeof options.bufferImporter === 'function') {
bufferImporter = options.bufferImporter
} else {
bufferImporter = require('./buffer-importer')
}

for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
for await (const entry of parallelBatch(bufferImporter(file, source, ipld, options), options.blockWriteConcurrency)) {
count++

if (count === 0) {
Expand Down Expand Up @@ -86,9 +55,8 @@ const reduce = (file, ipld, options) => {
return {
cid: leaf.cid,
path: file.path,
name: (file.path || '').split('/').pop(),
unixfs: leaf.unixfs,
node: leaf.node
size: leaf.size
}
}

Expand All @@ -101,7 +69,7 @@ const reduce = (file, ipld, options) => {

const links = leaves
.filter(leaf => {
if (leaf.cid.codec === 'raw' && leaf.node.length) {
if (leaf.cid.codec === 'raw' && leaf.size) {
return true
}

Expand All @@ -114,9 +82,9 @@ const reduce = (file, ipld, options) => {
.map((leaf) => {
if (leaf.cid.codec === 'raw') {
// node is a leaf buffer
f.addBlockSize(leaf.node.length)
f.addBlockSize(leaf.size)

return new DAGLink(leaf.name, leaf.node.length, leaf.cid)
return new DAGLink(leaf.name, leaf.size, leaf.cid)
}

if (!leaf.unixfs.data) {
Expand All @@ -127,7 +95,7 @@ const reduce = (file, ipld, options) => {
f.addBlockSize(leaf.unixfs.data.length)
}

return new DAGLink(leaf.name, leaf.node.size, leaf.cid)
return new DAGLink(leaf.name, leaf.size, leaf.cid)
})

const node = new DAGNode(f.marshal(), links)
Expand All @@ -137,7 +105,6 @@ const reduce = (file, ipld, options) => {
cid,
path: file.path,
unixfs: f,
node,
size: node.size
}
}
Expand Down
22 changes: 18 additions & 4 deletions src/dag-builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

const dirBuilder = require('./dir')
const fileBuilder = require('./file')
const createChunker = require('../chunker')
const validateChunks = require('./validate-chunks')

async function * dagBuilder (source, ipld, options) {
for await (const entry of source) {
Expand All @@ -30,10 +28,26 @@ async function * dagBuilder (source, ipld, options) {
}
}

const chunker = createChunker(options.chunker, validateChunks(source), options)
let chunker

if (typeof options.chunker === 'function') {
chunker = options.chunker
} else if (options.chunker === 'rabin') {
chunker = require('../chunker/rabin')
} else {
chunker = require('../chunker/fixed-size')
}

let chunkValidator

if (typeof options.chunkValidator === 'function') {
chunkValidator = options.chunkValidator
} else {
chunkValidator = require('./validate-chunks')
}

// item is a file
yield () => fileBuilder(entry, chunker, ipld, options)
yield () => fileBuilder(entry, chunker(chunkValidator(source, options), options), ipld, options)
} else {
// item is a directory
yield () => dirBuilder(entry, ipld, options)
Expand Down
4 changes: 2 additions & 2 deletions src/dir-flat.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DirFlat extends Dir {
}
}

links.push(new DAGLink(children[i], child.node.length || child.node.size, child.cid))
links.push(new DAGLink(children[i], child.size, child.cid))
}

const unixfs = new UnixFS({
Expand All @@ -84,7 +84,7 @@ class DirFlat extends Dir {
cid,
unixfs,
path,
node
size: node.size
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/dir-sharded.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
shard = subShard
}

links.push(new DAGLink(labelPrefix, shard.node.size, shard.cid))
links.push(new DAGLink(labelPrefix, shard.size, shard.cid))
} else if (typeof child.value.flush === 'function') {
const dir = child.value
let flushedDir
Expand All @@ -119,7 +119,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
}

const label = labelPrefix + child.key
links.push(new DAGLink(label, flushedDir.node.size, flushedDir.cid))
links.push(new DAGLink(label, flushedDir.size, flushedDir.cid))
} else {
const value = child.value

Expand Down Expand Up @@ -155,8 +155,8 @@ async function * flush (path, bucket, ipld, shardRoot, options) {

yield {
cid,
node,
unixfs: dir,
path
path,
size: node.size
}
}
22 changes: 19 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict'

const dagBuilder = require('./dag-builder')
const treeBuilder = require('./tree-builder')
const parallelBatch = require('it-parallel-batch')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

Expand Down Expand Up @@ -30,7 +28,9 @@ const defaultOptions = {
pin: true,
recursive: false,
hidden: false,
preload: true
preload: true,
chunkValidator: null,
importBuffer: null
}

module.exports = async function * (source, ipld, options = {}) {
Expand Down Expand Up @@ -58,6 +58,22 @@ module.exports = async function * (source, ipld, options = {}) {
opts.codec = options.format
}

let dagBuilder

if (typeof options.dagBuilder === 'function') {
dagBuilder = options.dagBuilder
} else {
dagBuilder = require('./dag-builder')
}

let treeBuilder

if (typeof options.treeBuilder === 'function') {
treeBuilder = options.treeBuilder
} else {
treeBuilder = require('./tree-builder')
}

for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
yield {
cid: entry.cid,
Expand Down
Loading