Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

perf: concurrent file import #41

Merged
merged 3 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
- `fixed`
- `rabin`
- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we get an acccompanying PR to js-ipfs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirBuilder options (listed later here) have not been flattened...intentional?

Copy link
Collaborator Author

@achingbrain achingbrain Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a mistake. Internally it's been flattened for ages so the dirBuilder options haven't worked for a while. Oops.

- `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
- `flat`: flat list of chunks
- `balanced`: builds a balanced tree
Expand All @@ -144,6 +143,8 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
- `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version)
- `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes
- `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`)
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).

[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
Expand Down
18 changes: 10 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,35 @@
"homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme",
"devDependencies": {
"aegir": "^20.0.0",
"async-iterator-buffer-stream": "^1.0.0",
"async-iterator-last": "^1.0.0",
"chai": "^4.2.0",
"cids": "~0.7.1",
"deep-extend": "~0.6.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"ipfs-unixfs-exporter": "^0.39.0",
"ipld": "^0.25.0",
"ipld-in-memory": "^3.0.0",
"it-buffer-stream": "^1.0.0",
"it-last": "^1.0.0",
"multihashes": "~0.4.14",
"nyc": "^14.0.0",
"sinon": "^7.1.0"
},
"dependencies": {
"async-iterator-all": "^1.0.0",
"async-iterator-batch": "~0.0.1",
"async-iterator-first": "^1.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to get PRs to js-ipfs and js-ipfs-http-client (and any other projects in our dependency tree) to switch them there also so we don't get multiple of the same thing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - I'll do that in the PR that adds support for the new options

"bl": "^4.0.0",
"deep-extend": "~0.6.0",
"err-code": "^2.0.0",
"hamt-sharding": "~0.0.2",
"ipfs-unixfs": "^0.2.0",
"ipld-dag-pb": "^0.18.0",
"it-all": "^1.0.1",
"it-batch": "^1.0.3",
"it-first": "^1.0.1",
"it-flat-batch": "^1.0.2",
"it-parallel-batch": "1.0.2",
"merge-options": "^2.0.0",
"multicodec": "~0.5.1",
"multihashing-async": "^0.8.0",
"rabin-wasm": "~0.0.8",
"superstruct": "^0.8.2"
"rabin-wasm": "~0.0.8"
},
"contributors": [
"Alan Shaw <[email protected]>",
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/balanced.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-flat-batch')

async function * balanced (source, reduce, options) {
yield await reduceToParents(source, reduce, options)
Expand Down
4 changes: 2 additions & 2 deletions src/dag-builder/file/flat.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-flat-batch')

module.exports = async function * (source, reduce) {
const roots = []

for await (const chunk of batch(source, Infinity)) {
for await (const chunk of batch(source, Number.MAX_SAFE_INTEGER)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use it-all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...I mean unless you actually want to receive a batch of Number.MAX_SAFE_INTEGER items at maximum 🤪

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically I think the assumption has been that source could return variable length arrays so we use a batcher that flattens them first, though the tests seem to pass with a regular batcher so I'm not sure that it's true any more.

In fact I think it only ever worked because of how Array.concat treats its arguments (e.g. not rejecting non-array arguments).

it-batch (internal buffer uses Array.push and yields when the buffer gets big enough)

input                         batcher                         output
child, child, child, child -> [child, child, child, child] -> [child, child, child, child] 
it-flat-batch (internal buffer uses Array.concat)

input                             batcher                         output
[child, child, child], [child] -> [child, child, child, child] -> [child, child, child, child] 
it-all (like it-batch but with an unbounded buffer size)

input                         output
child, child, child, child -> [child, child, child, child]

What fun.

TBH I don't know why we have this builder, it's not terribly useful and you can construct the same DAG with the balanced builder and a really high maxChildrenPerNode value.

roots.push(await reduce(chunk))
}

Expand Down
70 changes: 39 additions & 31 deletions src/dag-builder/file/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,62 @@ const {
DAGNode,
DAGLink
} = require('ipld-dag-pb')
const all = require('async-iterator-all')
const all = require('it-all')
const parallelBatch = require('it-parallel-batch')

const dagBuilders = {
flat: require('./flat'),
balanced: require('./balanced'),
trickle: require('./trickle')
}

async function * buildFile (file, source, ipld, options) {
let count = -1
let previous

async function * importBuffer (file, source, ipld, options) {
for await (const buffer of source) {
count++
options.progress(buffer.length)
let node
let unixfs
yield async () => {
options.progress(buffer.length)
let node
let unixfs

const opts = {
...options
}
const opts = {
...options
}

if (options.rawLeaves) {
node = buffer
if (options.rawLeaves) {
node = buffer

opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)
opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)

if (file.mtime) {
unixfs.mtime = file.mtime
}
if (file.mtime) {
unixfs.mtime = file.mtime
}

if (file.mode) {
unixfs.mode = file.mode
}

if (file.mode) {
unixfs.mode = file.mode
node = new DAGNode(unixfs.marshal())
}

node = new DAGNode(unixfs.marshal())
const cid = await persist(node, ipld, opts)

return {
cid: cid,
unixfs,
node
}
}
}
}

const cid = await persist(node, ipld, opts)
async function * buildFileBatch (file, source, ipld, options) {
let count = -1
let previous

const entry = {
cid: cid,
unixfs,
node
}
for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
count++

if (count === 0) {
previous = entry
Expand Down Expand Up @@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => {
throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY')
}

const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions))
const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options))

if (roots.length > 1) {
throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS')
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/trickle.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-flat-batch')

module.exports = function * trickleReduceToRoot (source, reduce, options) {
yield trickleStream(source, reduce, options)
Expand Down
6 changes: 3 additions & 3 deletions src/dag-builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) {
}
}

const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions)
const chunker = createChunker(options.chunker, validateChunks(source), options)

// item is a file
yield fileBuilder(entry, chunker, ipld, options)
yield () => fileBuilder(entry, chunker, ipld, options)
} else {
// item is a directory
yield dirBuilder(entry, ipld, options)
yield () => dirBuilder(entry, ipld, options)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/dir-sharded.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const multihashing = require('multihashing-async')
const Dir = require('./dir')
const persist = require('./utils/persist')
const Bucket = require('hamt-sharding')
const extend = require('deep-extend')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const hashFn = async function (value) {
const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128')
Expand All @@ -36,7 +36,7 @@ const defaultOptions = {

class DirSharded extends Dir {
constructor (props, options) {
options = extend({}, defaultOptions, options)
options = mergeOptions(defaultOptions, options)

super(props, options)

Expand Down
89 changes: 26 additions & 63 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,41 @@
'use strict'

const { superstruct } = require('superstruct')
const dagBuilder = require('./dag-builder')
const treeBuilder = require('./tree-builder')
const mh = require('multihashes')
const parallelBatch = require('it-parallel-batch')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const struct = superstruct({
types: {
codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v),
hashAlg: v => Object.keys(mh.names).includes(v),
leafType: v => ['file', 'raw'].includes(v)
}
})

const ChunkerOptions = struct({
minChunkSize: 'number?',
maxChunkSize: 'number?',
avgChunkSize: 'number?',
window: 'number?',
polynomial: 'number?'
}, {
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
})

const BuilderOptions = struct({
maxChildrenPerNode: 'number?',
layerRepeat: 'number?'
}, {
maxChildrenPerNode: 174,
layerRepeat: 4
})

const Options = struct({
chunker: struct.enum(['fixed', 'rabin']),
rawLeaves: 'boolean?',
hashOnly: 'boolean?',
strategy: struct.enum(['balanced', 'flat', 'trickle']),
reduceSingleLeafToSelf: 'boolean?',
codec: 'codec?',
format: 'codec?',
hashAlg: 'hashAlg?',
leafType: 'leafType?',
cidVersion: 'number?',
progress: 'function?',
wrapWithDirectory: 'boolean?',
shardSplitThreshold: 'number?',
onlyHash: 'boolean?',
chunkerOptions: ChunkerOptions,
builderOptions: BuilderOptions,

wrap: 'boolean?',
pin: 'boolean?',
recursive: 'boolean?',
ignore: 'array?',
hidden: 'boolean?',
preload: 'boolean?'
}, {
const defaultOptions = {
chunker: 'fixed',
strategy: 'balanced',
strategy: 'balanced', // 'flat', 'trickle'
rawLeaves: false,
onlyHash: false,
reduceSingleLeafToSelf: true,
codec: 'dag-pb',
hashAlg: 'sha2-256',
leafType: 'file',
leafType: 'file', // 'raw'
cidVersion: 0,
progress: () => () => {},
shardSplitThreshold: 1000
})
shardSplitThreshold: 1000,
fileImportConcurrency: 50,
blockWriteConcurrency: 10,
minChunkSize: 262144,
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
maxChildrenPerNode: 174,
layerRepeat: 4,
wrapWithDirectory: false,
pin: true,
recursive: false,
ignore: null, // []
hidden: false,
preload: true
}

module.exports = async function * (source, ipld, options = {}) {
const opts = Options(options)
const opts = mergeOptions(defaultOptions, options)

if (options.cidVersion > 0 && options.rawLeaves === undefined) {
// if the cid version is 1 or above, use raw leaves as this is
Expand All @@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) {
}

if (options.format) {
options.codec = options.format
opts.codec = options.format
}

for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) {
for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
yield {
cid: entry.cid,
path: entry.path,
Expand Down
5 changes: 4 additions & 1 deletion src/tree-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const flatToShard = require('./flat-to-shard')
const Dir = require('./dir')
const toPathComponents = require('./utils/to-path-components')
const errCode = require('err-code')
const first = require('async-iterator-first')
const first = require('it-first')

async function addToTree (elem, tree, options) {
const pathElems = toPathComponents(elem.path || '')
Expand Down Expand Up @@ -61,6 +61,9 @@ async function * treeBuilder (source, ipld, options) {
}, options)

for await (const entry of source) {
if (!entry) {
continue
}
tree = await addToTree(entry, tree, options)

yield entry
Expand Down
4 changes: 2 additions & 2 deletions test/benchmark.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const importer = require('../src')

const IPLD = require('ipld')
const inMemory = require('ipld-in-memory')
const bufferStream = require('async-iterator-buffer-stream')
const all = require('async-iterator-all')
const bufferStream = require('it-buffer-stream')
const all = require('it-all')

const REPEATS = 10
const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB
Expand Down
2 changes: 1 addition & 1 deletion test/builder-balanced.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const builder = require('../src/dag-builder/file/balanced')
const all = require('async-iterator-all')
const all = require('it-all')

function reduce (leaves) {
if (leaves.length > 1) {
Expand Down
4 changes: 2 additions & 2 deletions test/builder-dir-sharding.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ chai.use(require('dirty-chai'))
const expect = chai.expect
const IPLD = require('ipld')
const inMemory = require('ipld-in-memory')
const all = require('async-iterator-all')
const last = require('async-iterator-last')
const all = require('it-all')
const last = require('it-last')

describe('builder: directory sharding', () => {
let ipld
Expand Down
Loading