Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

perf: concurrent file import #41

Merged
merged 3 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,26 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
- `fixed`
- `rabin`
- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we get an acccompanying PR to js-ipfs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirBuilder options (listed later here) have not been flattened...intentional?

Copy link
Collaborator Author

@achingbrain achingbrain Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a mistake. Internally it's been flattened for ages so the dirBuilder options haven't worked for a while. Oops.

- `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
- `flat`: flat list of chunks
- `balanced`: builds a balanced tree
- `trickle`: builds [a trickle tree](https://github.com/ipfs/specs/pull/57#issuecomment-265205384)
- `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies
- `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree.
- `reduceSingleLeafToSelf` (boolean, defaults to `true`): optimization for, when reducing a set of nodes with one node, reduce it to that node.
- `dirBuilder` (object): the options for the directory builder
- `hamt` (object): the options for the HAMT sharded directory builder
- bits (positive integer, defaults to `8`): the number of bits at each bucket of the HAMT
- `hamtHashFn` (async function(string) Buffer): a function that hashes file names to create HAMT shards
- `hamtBucketBits` (positive integer, defaults to `8`): the number of bits at each bucket of the HAMT
- `progress` (function): a function that will be called with the byte length of chunks as a file is added to ipfs.
- `onlyHash` (boolean, defaults to false): Only chunk and hash - do not write to disk
- `hashAlg` (string): multihash hashing algorithm to use
- `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version)
- `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes
- `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`)
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).

[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
Expand Down
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,34 @@
"homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme",
"devDependencies": {
"aegir": "^20.0.0",
"async-iterator-buffer-stream": "^1.0.0",
"async-iterator-last": "^1.0.0",
"chai": "^4.2.0",
"cids": "~0.7.1",
"deep-extend": "~0.6.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"ipfs-unixfs-exporter": "^0.39.0",
"ipld": "^0.25.0",
"ipld-in-memory": "^3.0.0",
"it-buffer-stream": "^1.0.0",
"it-last": "^1.0.0",
"multihashes": "~0.4.14",
"nyc": "^14.0.0",
"sinon": "^7.1.0"
},
"dependencies": {
"async-iterator-all": "^1.0.0",
"async-iterator-batch": "~0.0.1",
"async-iterator-first": "^1.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to get PRs to js-ipfs and js-ipfs-http-client (and any other projects in our dependency tree) to switch them there also so we don't get multiple of the same thing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - I'll do that in the PR that adds support for the new options

"bl": "^4.0.0",
"deep-extend": "~0.6.0",
"err-code": "^2.0.0",
"hamt-sharding": "~0.0.2",
"ipfs-unixfs": "^0.2.0",
"ipld-dag-pb": "^0.18.0",
"it-all": "^1.0.1",
"it-batch": "^1.0.3",
"it-first": "^1.0.1",
"it-parallel-batch": "1.0.2",
"merge-options": "^2.0.0",
"multicodec": "~0.5.1",
"multihashing-async": "^0.8.0",
"rabin-wasm": "~0.0.8",
"superstruct": "^0.8.2"
"rabin-wasm": "~0.0.8"
},
"contributors": [
"Alan Shaw <[email protected]>",
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/balanced.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-batch')

async function * balanced (source, reduce, options) {
yield await reduceToParents(source, reduce, options)
Expand Down
10 changes: 2 additions & 8 deletions src/dag-builder/file/flat.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
'use strict'

const batch = require('async-iterator-batch')
const all = require('it-all')

module.exports = async function * (source, reduce) {
const roots = []

for await (const chunk of batch(source, Infinity)) {
roots.push(await reduce(chunk))
}

yield roots[0]
yield await reduce(await all(source))
}
70 changes: 39 additions & 31 deletions src/dag-builder/file/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,62 @@ const {
DAGNode,
DAGLink
} = require('ipld-dag-pb')
const all = require('async-iterator-all')
const all = require('it-all')
const parallelBatch = require('it-parallel-batch')

const dagBuilders = {
flat: require('./flat'),
balanced: require('./balanced'),
trickle: require('./trickle')
}

async function * buildFile (file, source, ipld, options) {
let count = -1
let previous

async function * importBuffer (file, source, ipld, options) {
for await (const buffer of source) {
count++
options.progress(buffer.length)
let node
let unixfs
yield async () => {
options.progress(buffer.length)
let node
let unixfs

const opts = {
...options
}
const opts = {
...options
}

if (options.rawLeaves) {
node = buffer
if (options.rawLeaves) {
node = buffer

opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)
opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)

if (file.mtime) {
unixfs.mtime = file.mtime
}
if (file.mtime) {
unixfs.mtime = file.mtime
}

if (file.mode) {
unixfs.mode = file.mode
}

if (file.mode) {
unixfs.mode = file.mode
node = new DAGNode(unixfs.marshal())
}

node = new DAGNode(unixfs.marshal())
const cid = await persist(node, ipld, opts)

return {
cid: cid,
unixfs,
node
}
}
}
}

const cid = await persist(node, ipld, opts)
async function * buildFileBatch (file, source, ipld, options) {
let count = -1
let previous

const entry = {
cid: cid,
unixfs,
node
}
for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
count++

if (count === 0) {
previous = entry
Expand Down Expand Up @@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => {
throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY')
}

const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions))
const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options))

if (roots.length > 1) {
throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS')
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/trickle.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-batch')

module.exports = function * trickleReduceToRoot (source, reduce, options) {
yield trickleStream(source, reduce, options)
Expand Down
6 changes: 3 additions & 3 deletions src/dag-builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) {
}
}

const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions)
const chunker = createChunker(options.chunker, validateChunks(source), options)

// item is a file
yield fileBuilder(entry, chunker, ipld, options)
yield () => fileBuilder(entry, chunker, ipld, options)
} else {
// item is a directory
yield dirBuilder(entry, ipld, options)
yield () => dirBuilder(entry, ipld, options)
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/dir-sharded.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const multihashing = require('multihashing-async')
const Dir = require('./dir')
const persist = require('./utils/persist')
const Bucket = require('hamt-sharding')
const extend = require('deep-extend')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const hashFn = async function (value) {
const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128')
Expand All @@ -31,16 +31,20 @@ const hashFn = async function (value) {
hashFn.code = 0x22 // TODO: get this from multihashing-async?

const defaultOptions = {
hashFn: hashFn
hamtHashFn: hashFn,
hamtBucketBits: 8
}

class DirSharded extends Dir {
constructor (props, options) {
options = extend({}, defaultOptions, options)
options = mergeOptions(defaultOptions, options)

super(props, options)

this._bucket = Bucket(options)
this._bucket = Bucket({
hashFn: options.hamtHashFn,
bits: options.hamtBucketBits
})
}

async put (name, value) {
Expand Down Expand Up @@ -139,7 +143,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
const data = Buffer.from(children.bitField().reverse())
const dir = new UnixFS('hamt-sharded-directory', data)
dir.fanout = bucket.tableSize()
dir.hashType = options.hashFn.code
dir.hashType = options.hamtHashFn.code

if (shardRoot && shardRoot.mtime) {
dir.mtime = shardRoot.mtime
Expand Down
89 changes: 26 additions & 63 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,41 @@
'use strict'

const { superstruct } = require('superstruct')
const dagBuilder = require('./dag-builder')
const treeBuilder = require('./tree-builder')
const mh = require('multihashes')
const parallelBatch = require('it-parallel-batch')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const struct = superstruct({
types: {
codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v),
hashAlg: v => Object.keys(mh.names).includes(v),
leafType: v => ['file', 'raw'].includes(v)
}
})

const ChunkerOptions = struct({
minChunkSize: 'number?',
maxChunkSize: 'number?',
avgChunkSize: 'number?',
window: 'number?',
polynomial: 'number?'
}, {
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
})

const BuilderOptions = struct({
maxChildrenPerNode: 'number?',
layerRepeat: 'number?'
}, {
maxChildrenPerNode: 174,
layerRepeat: 4
})

const Options = struct({
chunker: struct.enum(['fixed', 'rabin']),
rawLeaves: 'boolean?',
hashOnly: 'boolean?',
strategy: struct.enum(['balanced', 'flat', 'trickle']),
reduceSingleLeafToSelf: 'boolean?',
codec: 'codec?',
format: 'codec?',
hashAlg: 'hashAlg?',
leafType: 'leafType?',
cidVersion: 'number?',
progress: 'function?',
wrapWithDirectory: 'boolean?',
shardSplitThreshold: 'number?',
onlyHash: 'boolean?',
chunkerOptions: ChunkerOptions,
builderOptions: BuilderOptions,

wrap: 'boolean?',
pin: 'boolean?',
recursive: 'boolean?',
ignore: 'array?',
hidden: 'boolean?',
preload: 'boolean?'
}, {
const defaultOptions = {
chunker: 'fixed',
strategy: 'balanced',
strategy: 'balanced', // 'flat', 'trickle'
rawLeaves: false,
onlyHash: false,
reduceSingleLeafToSelf: true,
codec: 'dag-pb',
hashAlg: 'sha2-256',
leafType: 'file',
leafType: 'file', // 'raw'
cidVersion: 0,
progress: () => () => {},
shardSplitThreshold: 1000
})
shardSplitThreshold: 1000,
fileImportConcurrency: 50,
blockWriteConcurrency: 10,
minChunkSize: 262144,
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
maxChildrenPerNode: 174,
layerRepeat: 4,
wrapWithDirectory: false,
pin: true,
recursive: false,
ignore: null, // []
hidden: false,
preload: true
}

module.exports = async function * (source, ipld, options = {}) {
const opts = Options(options)
const opts = mergeOptions(defaultOptions, options)

if (options.cidVersion > 0 && options.rawLeaves === undefined) {
// if the cid version is 1 or above, use raw leaves as this is
Expand All @@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) {
}

if (options.format) {
options.codec = options.format
opts.codec = options.format
}

for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) {
for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
yield {
cid: entry.cid,
path: entry.path,
Expand Down
Loading