From 775213ae54739c771ab72f2894023260a0d17ce4 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 3 Sep 2018 17:16:59 +0100 Subject: [PATCH 01/15] feat: add support for chunked uploads --- package.json | 2 + src/add2/add2.js | 161 +++++++++++++++++++++++++++++++++ src/add2/multipart2.js | 135 +++++++++++++++++++++++++++ src/utils/load-commands.js | 1 + src/utils/prepare-file.js | 74 +++++++++++---- src/utils/send-files-stream.js | 2 +- src/utils/send-request.js | 2 +- 7 files changed, 355 insertions(+), 22 deletions(-) create mode 100644 src/add2/add2.js create mode 100644 src/add2/multipart2.js diff --git a/package.json b/package.json index 3c49b0c1c..39adf7dfa 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "async": "^2.6.1", "big.js": "^5.1.2", "bs58": "^4.0.1", + "buffer-to-stream": "^1.0.0", "cids": "~0.5.3", "concat-stream": "^1.6.2", "debug": "^3.1.0", @@ -58,6 +59,7 @@ "pump": "^3.0.0", "qs": "^6.5.2", "readable-stream": "^2.3.6", + "readable-stream-node-to-web": "^1.0.1", "stream-http": "^2.8.3", "stream-to-pull-stream": "^1.7.2", "streamifier": "~0.1.1", diff --git a/src/add2/add2.js b/src/add2/add2.js new file mode 100644 index 000000000..3070b5bef --- /dev/null +++ b/src/add2/add2.js @@ -0,0 +1,161 @@ +'use strict' + +const { Readable, Transform } = require('stream') +const toStream = require('buffer-to-stream') +const pump = require('pump') +const Multipart = require('./multipart2') +const {prepareWithHeaders} = require('./../utils/prepare-file') + +const arrayToStream = (data) => { + let i = 0 + return new Readable({ + objectMode: true, + read () { + this.push(i < data.length ? data[i++] : null) + } + }) +} + +const prepareTransform = (options) => new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + callback(null, prepareWithHeaders(chunk, options)) + } +}) + +module.exports = (send) => (files, options) => { + const multipart = new Multipart() + + // add pump + arrayToStream([].concat(files)) + .pipe(prepareTransform(options)) + .pipe(multipart) + + return sendChunked(multipart, send, options) +} + +const sendChunked = (multipartStream, send, options) => { + return new Promise((resolve, reject) => { + const boundary = multipartStream._boundary + let index = 0 + let rangeStart = 0 + let rangeEnd = 0 + let size = 0 + let ended = false + let running = false + const name = createName() + + multipartStream.on('end', () => { + ended = true + console.log('end', size) + + // if multipart already ended and no request is pending send last request + if (!running) { + // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) + sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) + .then(rsp => { + resolve(rsp) + }) + } + }) + + multipartStream.on('data', (chunk) => { + console.log('Sending ', chunk.length) + multipartStream.pause() + index++ + rangeEnd = rangeStart + chunk.length + size += chunk.length + running = true + + // sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary) + sendChunkRequest(send, options, chunk, index, rangeStart, rangeEnd, name, boundary) + .then(rsp => { + console.log('Response', rsp) + rangeStart = rangeEnd + multipartStream.resume() + // if multipart already ended send last request + if (ended) { + console.log('sending last') + // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) + sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) + .then(rsp => { + resolve(rsp) + }) + } + running = false + }) + .catch(reject) + }) + }) +} + +const sendChunk = (chunk, id, start, end, name, boundary, size = '*') => { + const url = new URL('http://localhost') + const search = new URLSearchParams() + search.set('stream-channels', true) + url.port = 5002 + url.pathname = 'api/v0/add-chunked' + url.search = search + + return window.fetch(url.href, { + method: 'POST', + body: chunk, + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Range': `bytes ${start}-${end}/${size}`, + 'Ipfs-Chunk-Name': name, + 'Ipfs-Chunk-Id': id, + 'Ipfs-Chunk-Boundary': boundary + } + }) + .then(res => res.json()) +} + +function createName () { + const date = new Date(Date.now()).toISOString() + function chr4 () { + return Math.random().toString(16).slice(-4) + } + return date + '--' + chr4() + chr4() + + '-' + chr4() + + '-' + chr4() + + '-' + chr4() + + '-' + chr4() + chr4() + chr4() +} + +const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, size = '*') => { + return new Promise((resolve, reject) => { + const qs = { + 'cid-version': options['cid-version'], + 'raw-leaves': options['raw-leaves'], + 'only-hash': options.onlyHash, + 'wrap-with-directory': options.wrapWithDirectory, + hash: options.hashAlg || options.hash + } + const args = { + path: 'add-chunked', + qs: qs, + args: options.args, + stream: true, + // recursive: true, + // progress: options.progress, + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Range': `bytes ${start}-${end}/${size}`, + 'Ipfs-Chunk-Name': name, + 'Ipfs-Chunk-Id': id, + 'Ipfs-Chunk-Boundary': boundary + } + } + + const req = send(args, (err, res) => { + if (err) { + return reject(err) + } + + resolve(res) + }) + + pump(toStream(chunk), req) + }) +} diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js new file mode 100644 index 000000000..e6a022ceb --- /dev/null +++ b/src/add2/multipart2.js @@ -0,0 +1,135 @@ +'use strict' + +const { Duplex } = require('stream') +const { isSource } = require('is-pull-stream') +const toStream = require('pull-stream-to-stream') + +const PADDING = '--' +const NEW_LINE = '\r\n' +const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) + +const generateBoundary = () => { + var boundary = '--------------------------' + for (var i = 0; i < 24; i++) { + boundary += Math.floor(Math.random() * 10).toString(16) + } + + return boundary +} + +const leading = (headers = {}, boundary) => { + var leading = [PADDING + boundary] + + Object.keys(headers).forEach((header) => { + leading.push(header + ': ' + headers[header]) + }) + + leading.push('') + leading.push('') + + const leadingStr = leading.join(NEW_LINE) + + return Buffer.from(leadingStr) +} + +class Multipart extends Duplex { + constructor (options) { + super(Object.assign({}, options, { writableObjectMode: true, writableHighWaterMark: 1 })) + + this._boundary = generateBoundary() + this.source = null + this.chunkSize = 256000 + this.buffer = Buffer.alloc(this.chunkSize) + this.bufferOffset = 0 + this.running = true + } + + _read () { + if (this.source) { + this.source.resume() + } + } + + _write (file, encoding, callback) { + this.pushFile(file, () => { + this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE)) + callback() + }) + } + + _final (callback) { + // Flush the rest and finish + if (this.bufferOffset) { + this.push(this.buffer.slice(0, this.bufferOffset)) + this.bufferOffset = 0 + } + this.running = false + this.push(null) + callback() + } + + pushChunk (chunk) { + const bytesNeeded = (this.chunkSize - this.bufferOffset) + let result = true + if (chunk === null) { + return this.push(null) + } + + // If we have enough bytes in this chunk to get buffer up to chunkSize, + // fill in buffer, push it, and reset its offset. + // Otherwise, just copy the entire chunk in to buffer. + if (chunk.length >= bytesNeeded) { + chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) + result = this.push(this.buffer) + this.bufferOffset = 0 + // Handle leftovers from the chunk + const leftovers = chunk.slice(0, chunk.length - bytesNeeded) + let size = leftovers.length + while (size >= this.chunkSize) { + result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize)) + this.bufferOffset += this.chunkSize + size -= this.chunkSize + } + // if we still have anything left copy to the buffer + chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size) + this.bufferOffset = size + } else { + chunk.copy(this.buffer, this.bufferOffset) + this.bufferOffset += chunk.length + } + + return result + } + + pushFile (file, callback) { + this.pushChunk(leading(file.headers, this._boundary)) + + let content = file.content || Buffer.alloc(0) + + if (Buffer.isBuffer(content)) { + this.pushChunk(content) + this.pushChunk(NEW_LINE_BUFFER) + return callback() // early + } + + if (isSource(content)) { + content = toStream.source(content) + } + this.source = content + // From now on we assume content is a stream + + content.on('data', (data) => { + if (!this.pushChunk(data)) { + content.pause() + } + }) + content.once('error', this.emit.bind(this, 'error')) + + content.once('end', () => { + this.pushChunk(NEW_LINE_BUFFER) + callback() + }) + } +} + +module.exports = Multipart diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 55b4570ab..0d84b198a 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -64,6 +64,7 @@ function requireCommands () { addFromStream: require('../files/add')(send), addFromURL: require('../util/url-add')(send), getEndpointConfig: require('../util/get-endpoint-config')(config), + add2: require('./../add2/add2')(send), crypto: require('libp2p-crypto'), isIPFS: require('is-ipfs') } diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index 738c4a4c0..9171933b3 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -77,30 +77,64 @@ function prepareFile (file, opts) { let files = [].concat(file) return flatmap(files, (file) => { - if (typeof file === 'string') { - if (!isNode) { - throw new Error('Can only add file paths in node') - } + return prepare(file, opts) + }) +} - return loadPaths(opts, file) +function prepare (file, opts) { + if (typeof file === 'string') { + if (!isNode) { + throw new Error('Can only add file paths in node') } - if (file.path && !file.content) { - file.dir = true - return file - } + return loadPaths(opts, file) + } - if (file.content || file.dir) { - return file - } + if (file.path && !file.content) { + file.dir = true + return file + } - return { - path: '', - symlink: false, - dir: false, - content: file - } - }) + if (file.content || file.dir) { + return file + } + + return { + path: '', + symlink: false, + dir: false, + content: file + } +} + +function prepareWithHeaders (file, opts) { + const obj = prepare(file, opts) + + obj.headers = headers(obj) + return obj } -exports = module.exports = prepareFile +function headers (file) { + const name = file.path + ? encodeURIComponent(file.path) + : '' + + // console.log('new part', file) + const header = { 'Content-Disposition': `file; filename="${name}"` } + + if (!file.content) { + header['Content-Type'] = 'application/x-directory' + } else if (file.symlink) { + header['Content-Type'] = 'application/symlink' + } else { + header['Content-Type'] = 'application/octet-stream' + } + + return header +} + +module.exports = { + prepareFile, + prepare, + prepareWithHeaders +} diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js index 46a57e383..5ad545461 100644 --- a/src/utils/send-files-stream.js +++ b/src/utils/send-files-stream.js @@ -4,7 +4,7 @@ const Duplex = require('stream').Duplex const eachSeries = require('async/eachSeries') const isStream = require('is-stream') const once = require('once') -const prepareFile = require('./prepare-file') +const {prepareFile} = require('./prepare-file') const Multipart = require('./multipart') function headers (file) { diff --git a/src/utils/send-request.js b/src/utils/send-request.js index c8dfb4dd8..abdb1b107 100644 --- a/src/utils/send-request.js +++ b/src/utils/send-request.js @@ -114,7 +114,7 @@ function requestAPI (config, options, callback) { delete options.qs.followSymlinks const method = 'POST' - const headers = Object.assign({}, config.headers) + const headers = Object.assign({}, config.headers, options.headers) if (isNode) { // Browsers do not allow you to modify the user agent From 01a575e7a8234f99d4b538f6edaddcef4b855c73 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 4 Sep 2018 00:14:44 +0100 Subject: [PATCH 02/15] fix: fix sendChunkRequest --- src/add2/add2.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/add2/add2.js b/src/add2/add2.js index 3070b5bef..bb5a53d44 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -49,7 +49,7 @@ const sendChunked = (multipartStream, send, options) => { ended = true console.log('end', size) - // if multipart already ended and no request is pending send last request + // multipart ended and no request is running send last request if (!running) { // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) @@ -137,8 +137,8 @@ const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, qs: qs, args: options.args, stream: true, - // recursive: true, - // progress: options.progress, + recursive: true, + progress: options.progress, headers: { 'Content-Type': 'application/octet-stream', 'Content-Range': `bytes ${start}-${end}/${size}`, @@ -156,6 +156,7 @@ const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, resolve(res) }) - pump(toStream(chunk), req) + req.write(Buffer.from(chunk)) + req.end() }) } From 695b23ebb9adec52d1f472806cf48d294f8658a8 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 4 Sep 2018 12:10:38 +0100 Subject: [PATCH 03/15] fix: cleanup code, headers, chunkSize options --- package.json | 1 - src/add2/add2.js | 132 ++++++++++++++++++++--------------------- src/add2/multipart2.js | 8 +-- 3 files changed, 67 insertions(+), 74 deletions(-) diff --git a/package.json b/package.json index 39adf7dfa..7bbdf8965 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,6 @@ "async": "^2.6.1", "big.js": "^5.1.2", "bs58": "^4.0.1", - "buffer-to-stream": "^1.0.0", "cids": "~0.5.3", "concat-stream": "^1.6.2", "debug": "^3.1.0", diff --git a/src/add2/add2.js b/src/add2/add2.js index bb5a53d44..4bcba63bb 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -1,7 +1,6 @@ 'use strict' const { Readable, Transform } = require('stream') -const toStream = require('buffer-to-stream') const pump = require('pump') const Multipart = require('./multipart2') const {prepareWithHeaders} = require('./../utils/prepare-file') @@ -24,106 +23,101 @@ const prepareTransform = (options) => new Transform({ }) module.exports = (send) => (files, options) => { - const multipart = new Multipart() - - // add pump - arrayToStream([].concat(files)) - .pipe(prepareTransform(options)) - .pipe(multipart) + const multipart = pump( + arrayToStream([].concat(files)), + prepareTransform(options), + new Multipart(options), + (err) => { + if (err) { + // probably better to create a rejected Promise to return + console.error(err) + } + } + ) return sendChunked(multipart, send, options) } const sendChunked = (multipartStream, send, options) => { return new Promise((resolve, reject) => { - const boundary = multipartStream._boundary - let index = 0 - let rangeStart = 0 - let rangeEnd = 0 - let size = 0 - let ended = false - let running = false - const name = createName() + const state = { + boundary: multipartStream._boundary, + id: uuid(), + index: 0, + rangeStart: 0, + rangeEnd: 0, + rangeTotal: 0, + ended: false, + running: false + } + multipartStream.on('error', reject) multipartStream.on('end', () => { - ended = true - console.log('end', size) + state.ended = true + console.log('end', state.rangeTotal) // multipart ended and no request is running send last request - if (!running) { - // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) - sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) - .then(rsp => { - resolve(rsp) - }) + if (!state.running) { + sendChunkRequest(send, options, '', state) + .then(resolve) + .catch(reject) } }) multipartStream.on('data', (chunk) => { console.log('Sending ', chunk.length) + // stop producing chunks multipartStream.pause() - index++ - rangeEnd = rangeStart + chunk.length - size += chunk.length - running = true + state.index++ + state.rangeEnd = state.rangeStart + chunk.length + state.rangeTotal += chunk.length + state.running = true - // sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary) - sendChunkRequest(send, options, chunk, index, rangeStart, rangeEnd, name, boundary) + sendChunkRequest(send, options, chunk, state) .then(rsp => { console.log('Response', rsp) - rangeStart = rangeEnd + state.running = false + state.rangeStart = state.rangeEnd + // resume producing chunks multipartStream.resume() + // if multipart already ended send last request - if (ended) { - console.log('sending last') - // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) - sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) - .then(rsp => { - resolve(rsp) - }) + if (state.ended) { + return sendChunkRequest(send, options, '', state) + .then(resolve) } - running = false }) .catch(reject) }) }) } -const sendChunk = (chunk, id, start, end, name, boundary, size = '*') => { - const url = new URL('http://localhost') - const search = new URLSearchParams() - search.set('stream-channels', true) - url.port = 5002 - url.pathname = 'api/v0/add-chunked' - url.search = search - - return window.fetch(url.href, { - method: 'POST', - body: chunk, - headers: { - 'Content-Type': 'application/octet-stream', - 'Content-Range': `bytes ${start}-${end}/${size}`, - 'Ipfs-Chunk-Name': name, - 'Ipfs-Chunk-Id': id, - 'Ipfs-Chunk-Boundary': boundary - } - }) - .then(res => res.json()) -} - -function createName () { - const date = new Date(Date.now()).toISOString() +/** + * Poor man's uuid + * + * @returns {String} + */ +function uuid () { function chr4 () { return Math.random().toString(16).slice(-4) } - return date + '--' + chr4() + chr4() + + return chr4() + chr4() + '-' + chr4() + '-' + chr4() + '-' + chr4() + '-' + chr4() + chr4() + chr4() } -const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, size = '*') => { +/** + * Send http request + * + * @param {function} send + * @param {Object} options - http request options + * @param {Uint8Array} chunk - chunk to send + * @param {Object} {id, start, end, name, boundary, size = '*'} - uploading session state + * @returns {Promise} + */ +const sendChunkRequest = (send, options, chunk, { boundary, id, index, rangeStart, rangeEnd, rangeTotal = '*' }) => { return new Promise((resolve, reject) => { const qs = { 'cid-version': options['cid-version'], @@ -141,10 +135,10 @@ const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, progress: options.progress, headers: { 'Content-Type': 'application/octet-stream', - 'Content-Range': `bytes ${start}-${end}/${size}`, - 'Ipfs-Chunk-Name': name, - 'Ipfs-Chunk-Id': id, - 'Ipfs-Chunk-Boundary': boundary + 'Content-Range': `bytes ${rangeStart}-${rangeEnd}/${rangeTotal}`, + 'X-Ipfs-Chunk-Group-Uuid': id, + 'X-Ipfs-Chunk-Index': index, + 'X-Ipfs-Chunk-Boundary': boundary } } @@ -152,10 +146,10 @@ const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, if (err) { return reject(err) } - resolve(res) }) + // write and send req.write(Buffer.from(chunk)) req.end() }) diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js index e6a022ceb..38ef65c34 100644 --- a/src/add2/multipart2.js +++ b/src/add2/multipart2.js @@ -33,12 +33,12 @@ const leading = (headers = {}, boundary) => { } class Multipart extends Duplex { - constructor (options) { - super(Object.assign({}, options, { writableObjectMode: true, writableHighWaterMark: 1 })) + constructor ({ chunkSize = 256000 } = {}) { + super({ writableObjectMode: true, writableHighWaterMark: 1 }) this._boundary = generateBoundary() this.source = null - this.chunkSize = 256000 + this.chunkSize = chunkSize this.buffer = Buffer.alloc(this.chunkSize) this.bufferOffset = 0 this.running = true @@ -116,8 +116,8 @@ class Multipart extends Duplex { content = toStream.source(content) } this.source = content - // From now on we assume content is a stream + // From now on we assume content is a stream content.on('data', (data) => { if (!this.pushChunk(data)) { content.pause() From f9df3268513815b2699cd6208ad15a326000b94c Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 4 Sep 2018 18:37:39 +0100 Subject: [PATCH 04/15] feat: support File directly from input elements --- package.json | 1 + src/add2/add2.js | 9 ++++++--- src/add2/multipart2.js | 2 +- src/utils/prepare-file.js | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 7bbdf8965..3cc2dd6ba 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "concat-stream": "^1.6.2", "debug": "^3.1.0", "detect-node": "^2.0.3", + "filereader-stream": "^2.0.0", "flatmap": "0.0.3", "glob": "^7.1.2", "ipfs-block": "~0.7.1", diff --git a/src/add2/add2.js b/src/add2/add2.js index 4bcba63bb..8acd389a4 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -58,7 +58,7 @@ const sendChunked = (multipartStream, send, options) => { // multipart ended and no request is running send last request if (!state.running) { - sendChunkRequest(send, options, '', state) + sendChunkRequest(send, options, null, state) .then(resolve) .catch(reject) } @@ -83,7 +83,7 @@ const sendChunked = (multipartStream, send, options) => { // if multipart already ended send last request if (state.ended) { - return sendChunkRequest(send, options, '', state) + return sendChunkRequest(send, options, null, state) .then(resolve) } }) @@ -150,7 +150,10 @@ const sendChunkRequest = (send, options, chunk, { boundary, id, index, rangeStar }) // write and send - req.write(Buffer.from(chunk)) + + if (chunk !== null) { + req.write(Buffer.from(chunk)) + } req.end() }) } diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js index 38ef65c34..424bb6de2 100644 --- a/src/add2/multipart2.js +++ b/src/add2/multipart2.js @@ -33,7 +33,7 @@ const leading = (headers = {}, boundary) => { } class Multipart extends Duplex { - constructor ({ chunkSize = 256000 } = {}) { + constructor ({ chunkSize = 1024 * 1024 } = {}) { super({ writableObjectMode: true, writableHighWaterMark: 1 }) this._boundary = generateBoundary() diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index 9171933b3..6aaa9679a 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -2,6 +2,11 @@ const isNode = require('detect-node') const flatmap = require('flatmap') +const fileReaderStream = require('filereader-stream') + +const isBrowser = typeof window === 'object' && + typeof document === 'object' && + document.nodeType === 9 function loadPaths (opts, file) { const path = require('path') @@ -99,6 +104,15 @@ function prepare (file, opts) { return file } + if (isBrowser && file instanceof window.File) { + return { + path: '', + symlink: false, + dir: false, + content: fileReaderStream(file, opts) + } + } + return { path: '', symlink: false, From 52a6117e55f6db5c68871750892fc87551c91722 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Thu, 6 Sep 2018 16:35:41 +0100 Subject: [PATCH 05/15] fix: error handling and stream backpressure --- src/add2/add2.js | 107 +++++++++++++++++++++++++++++--------- src/add2/multipart2.js | 48 ++++++++++++----- src/utils/prepare-file.js | 11 ++++ src/utils/send-request.js | 28 +++++----- 4 files changed, 143 insertions(+), 51 deletions(-) diff --git a/src/add2/add2.js b/src/add2/add2.js index 8acd389a4..888161d6e 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -1,9 +1,12 @@ 'use strict' const { Readable, Transform } = require('stream') +const isStream = require('is-stream') const pump = require('pump') const Multipart = require('./multipart2') -const {prepareWithHeaders} = require('./../utils/prepare-file') +const { prepareWithHeaders } = require('./../utils/prepare-file') + +const noop = () => {} const arrayToStream = (data) => { let i = 0 @@ -18,7 +21,11 @@ const arrayToStream = (data) => { const prepareTransform = (options) => new Transform({ objectMode: true, transform (chunk, encoding, callback) { - callback(null, prepareWithHeaders(chunk, options)) + try { + callback(null, prepareWithHeaders(chunk, options)) + } catch (err) { + callback(err) + } } }) @@ -29,8 +36,7 @@ module.exports = (send) => (files, options) => { new Multipart(options), (err) => { if (err) { - // probably better to create a rejected Promise to return - console.error(err) + multipart.emit('error', err) } } ) @@ -38,8 +44,17 @@ module.exports = (send) => (files, options) => { return sendChunked(multipart, send, options) } +/** + * Send multipart in chunks + * + * @param {Multipart} multipartStream + * @param {function} send + * @param {Object} options + * @returns {Promise} + */ const sendChunked = (multipartStream, send, options) => { return new Promise((resolve, reject) => { + let waiting = null const state = { boundary: multipartStream._boundary, id: uuid(), @@ -47,17 +62,29 @@ const sendChunked = (multipartStream, send, options) => { rangeStart: 0, rangeEnd: 0, rangeTotal: 0, - ended: false, - running: false + running: false, + extraBytes: 0, + totalUp: 0, + totalAdd: 0 } multipartStream.on('error', reject) multipartStream.on('end', () => { - state.ended = true - console.log('end', state.rangeTotal) - - // multipart ended and no request is running send last request - if (!state.running) { + console.log('End', state.rangeTotal) + + // wait for all chunks to be sent + // doing all this in the end should simplify future concurrent chunk uploads + if (state.running && waiting === null) { + waiting = setInterval(() => { + if (!state.running) { + clearInterval(waiting) + waiting = null + sendChunkRequest(send, options, null, state) + .then(resolve) + .catch(reject) + } + }, 100) + } else { sendChunkRequest(send, options, null, state) .then(resolve) .catch(reject) @@ -65,27 +92,21 @@ const sendChunked = (multipartStream, send, options) => { }) multipartStream.on('data', (chunk) => { - console.log('Sending ', chunk.length) + console.log('Send ', chunk.length) // stop producing chunks - multipartStream.pause() + multipartStream.pauseAll() + state.extraBytes = multipartStream.extraBytes state.index++ state.rangeEnd = state.rangeStart + chunk.length state.rangeTotal += chunk.length state.running = true - sendChunkRequest(send, options, chunk, state) .then(rsp => { console.log('Response', rsp) state.running = false state.rangeStart = state.rangeEnd // resume producing chunks - multipartStream.resume() - - // if multipart already ended send last request - if (state.ended) { - return sendChunkRequest(send, options, null, state) - .then(resolve) - } + multipartStream.resumeAll() }) .catch(reject) }) @@ -117,8 +138,23 @@ function uuid () { * @param {Object} {id, start, end, name, boundary, size = '*'} - uploading session state * @returns {Promise} */ -const sendChunkRequest = (send, options, chunk, { boundary, id, index, rangeStart, rangeEnd, rangeTotal = '*' }) => { +const sendChunkRequest = ( + send, + options, + chunk, + { + boundary, + id, + index, + rangeStart, + rangeEnd, + rangeTotal, + extraBytes, + totalUp, + totalAdd + }) => { return new Promise((resolve, reject) => { + const progressFn = options.progress || noop const qs = { 'cid-version': options['cid-version'], 'raw-leaves': options['raw-leaves'], @@ -132,7 +168,7 @@ const sendChunkRequest = (send, options, chunk, { boundary, id, index, rangeStar args: options.args, stream: true, recursive: true, - progress: options.progress, + progress: Boolean(options.progress), headers: { 'Content-Type': 'application/octet-stream', 'Content-Range': `bytes ${rangeStart}-${rangeEnd}/${rangeTotal}`, @@ -146,11 +182,32 @@ const sendChunkRequest = (send, options, chunk, { boundary, id, index, rangeStar if (err) { return reject(err) } - resolve(res) + + // we are in the last request + if (isStream(res)) { + const result = [] + res.on('data', (d) => { + if (d.path) { + // files added reporting + result.push(d) + } else { + // progress reporting + totalAdd = d.Bytes / 2 + progressFn(totalAdd + totalUp) + } + }) + res.on('error', reject) + res.on('end', () => { + resolve(result) + }) + } else { + totalUp = (rangeTotal - extraBytes) / 2 + progressFn(totalUp) + resolve(res) + } }) // write and send - if (chunk !== null) { req.write(Buffer.from(chunk)) } diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js index 424bb6de2..a43f87dcb 100644 --- a/src/add2/multipart2.js +++ b/src/add2/multipart2.js @@ -41,43 +41,67 @@ class Multipart extends Duplex { this.chunkSize = chunkSize this.buffer = Buffer.alloc(this.chunkSize) this.bufferOffset = 0 - this.running = true + this.extraBytes = 0 } _read () { - if (this.source) { + if (this.source && !this.isPaused()) { this.source.resume() } } _write (file, encoding, callback) { this.pushFile(file, () => { - this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE)) callback() }) } _final (callback) { + this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE), true) // Flush the rest and finish - if (this.bufferOffset) { - this.push(this.buffer.slice(0, this.bufferOffset)) + if (this.bufferOffset && !this.destroyed) { + const slice = this.buffer.slice(0, this.bufferOffset) + this.push(slice) this.bufferOffset = 0 } - this.running = false this.push(null) callback() } - pushChunk (chunk) { - const bytesNeeded = (this.chunkSize - this.bufferOffset) + pauseAll () { + this.pause() + if (this.source) { + this.source.pause() + } + } + + resumeAll () { + this.resume() + if (this.source) { + this.source.resume() + } + } + /** + * Push chunk + * + * @param {Buffer} chunk + * @param {boolean} [isExtra=false] + * @return {boolean} + */ + pushChunk (chunk, isExtra = false) { let result = true if (chunk === null) { return this.push(null) } + if (isExtra) { + this.extraBytes += chunk.length + } + // If we have enough bytes in this chunk to get buffer up to chunkSize, // fill in buffer, push it, and reset its offset. // Otherwise, just copy the entire chunk in to buffer. + const bytesNeeded = (this.chunkSize - this.bufferOffset) if (chunk.length >= bytesNeeded) { chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) result = this.push(this.buffer) @@ -102,14 +126,14 @@ class Multipart extends Duplex { } pushFile (file, callback) { - this.pushChunk(leading(file.headers, this._boundary)) + this.pushChunk(leading(file.headers, this._boundary), true) let content = file.content || Buffer.alloc(0) if (Buffer.isBuffer(content)) { this.pushChunk(content) - this.pushChunk(NEW_LINE_BUFFER) - return callback() // early + this.pushChunk(NEW_LINE_BUFFER, true) + return callback() } if (isSource(content)) { @@ -126,7 +150,7 @@ class Multipart extends Duplex { content.once('error', this.emit.bind(this, 'error')) content.once('end', () => { - this.pushChunk(NEW_LINE_BUFFER) + this.pushChunk(NEW_LINE_BUFFER, true) callback() }) } diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index 6aaa9679a..539319882 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -1,6 +1,8 @@ 'use strict' const isNode = require('detect-node') +const { isSource } = require('is-pull-stream') +const isStream = require('is-stream') const flatmap = require('flatmap') const fileReaderStream = require('filereader-stream') @@ -113,6 +115,15 @@ function prepare (file, opts) { } } + if (!isStream(file) && !isSource(file) && !Buffer.isBuffer(file)) { + return { + path: '', + symlink: false, + dir: false, + content: Buffer.from(file) + } + } + return { path: '', symlink: false, diff --git a/src/utils/send-request.js b/src/utils/send-request.js index abdb1b107..81ad8f83e 100644 --- a/src/utils/send-request.js +++ b/src/utils/send-request.js @@ -4,6 +4,7 @@ const Qs = require('qs') const qsDefaultEncoder = require('qs/lib/utils').encode const isNode = require('detect-node') const ndjson = require('ndjson') +const { Transform } = require('stream') const pump = require('pump') const once = require('once') const streamToValue = require('./stream-to-value') @@ -52,22 +53,21 @@ function onRes (buffer, cb) { return cb(null, res) } - // Return a stream of JSON objects if (chunkedObjects && isJson) { - const outputStream = ndjson.parse() - pump(res, outputStream) - res.on('end', () => { - let err = res.trailers['x-stream-error'] - if (err) { - // Not all errors are JSON - try { - err = JSON.parse(err) - } catch (e) { - err = { Message: err } + const outputStream = pump( + res, + ndjson.parse(), + new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + if (chunk.Type && chunk.Type === 'error') { + callback(new Error(chunk.Message)) + } else { + callback(null, chunk) + } } - outputStream.emit('error', new Error(err.Message)) - } - }) + }) + ) return cb(null, outputStream) } From 0da57836e64f5303e51f781a0ca7acf911caa739 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Fri, 7 Sep 2018 18:58:14 +0100 Subject: [PATCH 06/15] feat: refactor to send to SendStream, non-chunked, add*Stream methods --- package.json | 2 +- src/add2/add2.js | 219 +++++------------------------------- src/add2/multipart2.js | 13 ++- src/add2/send-stream.js | 220 +++++++++++++++++++++++++++++++++++++ src/utils/load-commands.js | 2 +- 5 files changed, 258 insertions(+), 198 deletions(-) create mode 100644 src/add2/send-stream.js diff --git a/package.json b/package.json index 3cc2dd6ba..c80c006fa 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,7 @@ "qs": "^6.5.2", "readable-stream": "^2.3.6", "readable-stream-node-to-web": "^1.0.1", - "stream-http": "^2.8.3", + "stream-http": "hugomrdias/stream-http#fix/body-handling", "stream-to-pull-stream": "^1.7.2", "streamifier": "~0.1.1", "tar-stream": "^1.6.1" diff --git a/src/add2/add2.js b/src/add2/add2.js index 888161d6e..d909d1ca4 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -1,12 +1,10 @@ 'use strict' -const { Readable, Transform } = require('stream') -const isStream = require('is-stream') +const { Readable } = require('stream') +const toPull = require('stream-to-pull-stream') +const concatStream = require('concat-stream') const pump = require('pump') -const Multipart = require('./multipart2') -const { prepareWithHeaders } = require('./../utils/prepare-file') - -const noop = () => {} +const SendStream = require('./send-stream') const arrayToStream = (data) => { let i = 0 @@ -18,199 +16,34 @@ const arrayToStream = (data) => { }) } -const prepareTransform = (options) => new Transform({ - objectMode: true, - transform (chunk, encoding, callback) { - try { - callback(null, prepareWithHeaders(chunk, options)) - } catch (err) { - callback(err) - } - } -}) - -module.exports = (send) => (files, options) => { - const multipart = pump( - arrayToStream([].concat(files)), - prepareTransform(options), - new Multipart(options), - (err) => { - if (err) { - multipart.emit('error', err) - } - } - ) - - return sendChunked(multipart, send, options) -} - -/** - * Send multipart in chunks - * - * @param {Multipart} multipartStream - * @param {function} send - * @param {Object} options - * @returns {Promise} - */ -const sendChunked = (multipartStream, send, options) => { +const add = (send) => (files, options) => { + // check if we can receive pull-stream after callbackify + let result = [] return new Promise((resolve, reject) => { - let waiting = null - const state = { - boundary: multipartStream._boundary, - id: uuid(), - index: 0, - rangeStart: 0, - rangeEnd: 0, - rangeTotal: 0, - running: false, - extraBytes: 0, - totalUp: 0, - totalAdd: 0 - } - - multipartStream.on('error', reject) - multipartStream.on('end', () => { - console.log('End', state.rangeTotal) - - // wait for all chunks to be sent - // doing all this in the end should simplify future concurrent chunk uploads - if (state.running && waiting === null) { - waiting = setInterval(() => { - if (!state.running) { - clearInterval(waiting) - waiting = null - sendChunkRequest(send, options, null, state) - .then(resolve) - .catch(reject) - } - }, 100) - } else { - sendChunkRequest(send, options, null, state) - .then(resolve) - .catch(reject) + pump( + arrayToStream([].concat(files)), + new SendStream(send, options), + concatStream(r => (result = r)), + (err) => { + if (err) { + return reject(err) + } + resolve(result) } - }) - - multipartStream.on('data', (chunk) => { - console.log('Send ', chunk.length) - // stop producing chunks - multipartStream.pauseAll() - state.extraBytes = multipartStream.extraBytes - state.index++ - state.rangeEnd = state.rangeStart + chunk.length - state.rangeTotal += chunk.length - state.running = true - sendChunkRequest(send, options, chunk, state) - .then(rsp => { - console.log('Response', rsp) - state.running = false - state.rangeStart = state.rangeEnd - // resume producing chunks - multipartStream.resumeAll() - }) - .catch(reject) - }) + ) }) } -/** - * Poor man's uuid - * - * @returns {String} - */ -function uuid () { - function chr4 () { - return Math.random().toString(16).slice(-4) - } - return chr4() + chr4() + - '-' + chr4() + - '-' + chr4() + - '-' + chr4() + - '-' + chr4() + chr4() + chr4() +const addReadableStream = (send) => (options) => { + return new SendStream(send, options) } -/** - * Send http request - * - * @param {function} send - * @param {Object} options - http request options - * @param {Uint8Array} chunk - chunk to send - * @param {Object} {id, start, end, name, boundary, size = '*'} - uploading session state - * @returns {Promise} - */ -const sendChunkRequest = ( - send, - options, - chunk, - { - boundary, - id, - index, - rangeStart, - rangeEnd, - rangeTotal, - extraBytes, - totalUp, - totalAdd - }) => { - return new Promise((resolve, reject) => { - const progressFn = options.progress || noop - const qs = { - 'cid-version': options['cid-version'], - 'raw-leaves': options['raw-leaves'], - 'only-hash': options.onlyHash, - 'wrap-with-directory': options.wrapWithDirectory, - hash: options.hashAlg || options.hash - } - const args = { - path: 'add-chunked', - qs: qs, - args: options.args, - stream: true, - recursive: true, - progress: Boolean(options.progress), - headers: { - 'Content-Type': 'application/octet-stream', - 'Content-Range': `bytes ${rangeStart}-${rangeEnd}/${rangeTotal}`, - 'X-Ipfs-Chunk-Group-Uuid': id, - 'X-Ipfs-Chunk-Index': index, - 'X-Ipfs-Chunk-Boundary': boundary - } - } - - const req = send(args, (err, res) => { - if (err) { - return reject(err) - } - - // we are in the last request - if (isStream(res)) { - const result = [] - res.on('data', (d) => { - if (d.path) { - // files added reporting - result.push(d) - } else { - // progress reporting - totalAdd = d.Bytes / 2 - progressFn(totalAdd + totalUp) - } - }) - res.on('error', reject) - res.on('end', () => { - resolve(result) - }) - } else { - totalUp = (rangeTotal - extraBytes) / 2 - progressFn(totalUp) - resolve(res) - } - }) +const addPullStream = (send) => (options) => { + return toPull(new SendStream(send, options)) +} - // write and send - if (chunk !== null) { - req.write(Buffer.from(chunk)) - } - req.end() - }) +module.exports = { + add, + addReadableStream, + addPullStream } diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js index a43f87dcb..cd7adf729 100644 --- a/src/add2/multipart2.js +++ b/src/add2/multipart2.js @@ -33,12 +33,15 @@ const leading = (headers = {}, boundary) => { } class Multipart extends Duplex { - constructor ({ chunkSize = 1024 * 1024 } = {}) { - super({ writableObjectMode: true, writableHighWaterMark: 1 }) + constructor ({ chunkSize }) { + super({ + writableObjectMode: true, + writableHighWaterMark: 1 + }) this._boundary = generateBoundary() this.source = null - this.chunkSize = chunkSize + this.chunkSize = chunkSize || 0 this.buffer = Buffer.alloc(this.chunkSize) this.bufferOffset = 0 this.extraBytes = 0 @@ -94,6 +97,10 @@ class Multipart extends Duplex { return this.push(null) } + if (!this.chunkSize) { + return this.push(chunk) + } + if (isExtra) { this.extraBytes += chunk.length } diff --git a/src/add2/send-stream.js b/src/add2/send-stream.js new file mode 100644 index 000000000..ba752005d --- /dev/null +++ b/src/add2/send-stream.js @@ -0,0 +1,220 @@ +'use strict' +const { Duplex, Transform } = require('stream') +const isStream = require('is-stream') +const pump = require('pump') +const Multipart = require('./multipart2') +const { prepareWithHeaders } = require('./../utils/prepare-file') + +const noop = () => {} +/** + * Poor man's uuid + * + * @returns {String} + */ +function uuid () { + function chr4 () { + return Math.random().toString(16).slice(-4) + } + return chr4() + chr4() + + '-' + chr4() + + '-' + chr4() + + '-' + chr4() + + '-' + chr4() + chr4() + chr4() +} + +const prepareTransform = (options) => new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + try { + callback(null, prepareWithHeaders(chunk, options)) + } catch (err) { + callback(err) + } + } +}) + +class SendStream extends Duplex { + constructor (send, options) { + super({ objectMode: true, highWaterMark: 1 }) + this.waiting = null + this.options = options + this.send = send + this.multipart = new Multipart(options) + this.boundary = this.multipart._boundary + this.id = uuid() + this.index = 0 + this.rangeStart = 0 + this.rangeEnd = 0 + this.rangeTotal = 0 + this.running = false + this.extraBytes = 0 + this.totalUp = 0 + this.qs = { + 'cid-version': this.options['cid-version'], + 'raw-leaves': this.options['raw-leaves'], + 'only-hash': this.options.onlyHash, + 'wrap-with-directory': this.options.wrapWithDirectory, + hash: this.options.hashAlg || this.options.hash + } + + this.source = prepareTransform(options) + + pump([ + this.source, + this.multipart, + !options.chunkSize && this.request() + ].filter(Boolean), (err) => { + if (err) { + this.emit('error', err) + } + }) + + if (options.chunkSize) { + this.multipart.on('end', this.onEnd.bind(this)) + this.multipart.on('data', this.onData.bind(this)) + } + } + + _write (chunk, encoding, callback) { + this.source.write(chunk) + callback() + } + + _final () { + this.source.end() + } + + _read (size) { + // read + } + + onEnd () { + console.log('End', this.rangeTotal) + + // wait for all chunks to be sent + // doing all this in the end should simplify future concurrent chunk uploads + if (this.running && this.waiting === null) { + this.waiting = setInterval(() => { + if (!this.running) { + clearInterval(this.waiting) + this.waiting = null + this.requestChunk(null) + } + }, 100) + } else { + this.requestChunk(null) + } + } + + onData (chunk) { + console.log('Send ', chunk.length) + // stop producing chunks + this.multipart.pauseAll() + this.extraBytes = this.multipart.extraBytes + this.index++ + this.rangeEnd = this.rangeStart + chunk.length + this.rangeTotal += chunk.length + this.running = true + this.requestChunk(chunk) + .then(() => { + this.running = false + this.rangeStart = this.rangeEnd + this.multipart.resumeAll() + }) + } + + requestChunk (chunk) { + const args = { + path: 'add-chunked', + qs: this.qs, + args: this.options.args, + stream: true, + recursive: true, + progress: Boolean(this.options.progress), + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Range': `bytes ${this.rangeStart}-${this.rangeEnd}/${this.rangeTotal}`, + 'X-Ipfs-Chunk-Group-Uuid': this.id, + 'X-Ipfs-Chunk-Index': this.index, + 'X-Ipfs-Chunk-Boundary': this.boundary + } + } + return new Promise((resolve, reject) => { + const progressFn = this.options.progress || noop + const req = this.send(args, (err, res) => { + if (err) { + return this.emit('error', err) + } + + // we are in the last request + if (isStream(res)) { + res.on('data', (d) => { + if (d.path) { + // files added reporting + this.push(d) + } else { + // progress add reporting + progressFn((d.Bytes / 2) + this.totalUp) + } + }) + res.on('error', err => this.emit('error', err)) + res.on('end', () => { + resolve() + this.push(null) + }) + } else { + // progress upload reporting + this.totalUp = (this.rangeTotal - this.extraBytes) / 2 + progressFn(this.totalUp) + resolve() + } + }) + + // write and send + if (chunk !== null) { + req.write(Buffer.from(chunk)) + } + req.end() + }) + } + + request (chunk) { + const args = { + path: 'add-chunked', + qs: this.qs, + args: this.options.args, + stream: true, + recursive: true, + progress: Boolean(this.options.progress), + multipart: true, + multipartBoundary: this.boundary, + // remove this when daemon supports getting boundary from content-type + headers: { + 'X-Ipfs-Chunk-Boundary': this.boundary + } + } + + const progressFn = this.options.progress || noop + return this.send(args, (err, res) => { + if (err) { + return this.emit('error', err) + } + + res.on('data', (d) => { + if (d.path) { + // files added reporting + this.push(d) + } else { + // progress add reporting + progressFn(d.Bytes) + } + }) + res.on('error', err => this.emit('error', err)) + res.on('end', () => { + this.push(null) + }) + }) + } +} + +module.exports = SendStream diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 0d84b198a..d6f19246a 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -64,7 +64,7 @@ function requireCommands () { addFromStream: require('../files/add')(send), addFromURL: require('../util/url-add')(send), getEndpointConfig: require('../util/get-endpoint-config')(config), - add2: require('./../add2/add2')(send), + add2: require('./../add2/add2').add(send), crypto: require('libp2p-crypto'), isIPFS: require('is-ipfs') } From d58e8cb01260d4603e2c589bd44869260bf4f871 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Mon, 10 Sep 2018 17:09:20 +0100 Subject: [PATCH 07/15] feat: improved header and curl test files --- package.json | 1 + src/add2/send-stream.js | 55 ++++++++++------------------ src/utils/prepare-file.js | 3 +- test/fixtures/chunked-add/chunk1 | Bin 0 -> 1024 bytes test/fixtures/chunked-add/chunk2 | Bin 0 -> 1024 bytes test/fixtures/chunked-add/chunk3 | 2 + test/fixtures/chunked-add/readme.md | 12 ++++++ 7 files changed, 36 insertions(+), 37 deletions(-) create mode 100644 test/fixtures/chunked-add/chunk1 create mode 100644 test/fixtures/chunked-add/chunk2 create mode 100644 test/fixtures/chunked-add/chunk3 create mode 100644 test/fixtures/chunked-add/readme.md diff --git a/package.json b/package.json index c80c006fa..3242c322d 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "bs58": "^4.0.1", "cids": "~0.5.3", "concat-stream": "^1.6.2", + "content": "^4.0.5", "debug": "^3.1.0", "detect-node": "^2.0.3", "filereader-stream": "^2.0.0", diff --git a/src/add2/send-stream.js b/src/add2/send-stream.js index ba752005d..cf196b3d3 100644 --- a/src/add2/send-stream.js +++ b/src/add2/send-stream.js @@ -41,7 +41,7 @@ class SendStream extends Duplex { this.send = send this.multipart = new Multipart(options) this.boundary = this.multipart._boundary - this.id = uuid() + this.uuid = uuid() this.index = 0 this.rangeStart = 0 this.rangeEnd = 0 @@ -57,6 +57,17 @@ class SendStream extends Duplex { hash: this.options.hashAlg || this.options.hash } + this.args = { + path: 'add-chunked', + qs: this.qs, + args: this.options.args, + stream: true, + recursive: true, + progress: Boolean(this.options.progress), + multipart: true, + multipartBoundary: this.boundary + } + this.source = prepareTransform(options) pump([ @@ -107,7 +118,7 @@ class SendStream extends Duplex { } onData (chunk) { - console.log('Send ', chunk.length) + console.log('Send ', chunk.toString()) // stop producing chunks this.multipart.pauseAll() this.extraBytes = this.multipart.extraBytes @@ -124,24 +135,13 @@ class SendStream extends Duplex { } requestChunk (chunk) { - const args = { - path: 'add-chunked', - qs: this.qs, - args: this.options.args, - stream: true, - recursive: true, - progress: Boolean(this.options.progress), - headers: { - 'Content-Type': 'application/octet-stream', - 'Content-Range': `bytes ${this.rangeStart}-${this.rangeEnd}/${this.rangeTotal}`, - 'X-Ipfs-Chunk-Group-Uuid': this.id, - 'X-Ipfs-Chunk-Index': this.index, - 'X-Ipfs-Chunk-Boundary': this.boundary - } + this.args.headers = { + 'Content-Range': `bytes ${this.rangeStart}-${this.rangeEnd}/${this.rangeTotal}`, + 'X-Chunked-Input': `uuid="${this.uuid}"; index=${this.index}` } return new Promise((resolve, reject) => { const progressFn = this.options.progress || noop - const req = this.send(args, (err, res) => { + const req = this.send(this.args, (err, res) => { if (err) { return this.emit('error', err) } @@ -178,30 +178,15 @@ class SendStream extends Duplex { }) } - request (chunk) { - const args = { - path: 'add-chunked', - qs: this.qs, - args: this.options.args, - stream: true, - recursive: true, - progress: Boolean(this.options.progress), - multipart: true, - multipartBoundary: this.boundary, - // remove this when daemon supports getting boundary from content-type - headers: { - 'X-Ipfs-Chunk-Boundary': this.boundary - } - } - + request () { const progressFn = this.options.progress || noop - return this.send(args, (err, res) => { + return this.send(this.args, (err, res) => { if (err) { return this.emit('error', err) } res.on('data', (d) => { - if (d.path) { + if (d.hash) { // files added reporting this.push(d) } else { diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index 539319882..f8807bf49 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -108,7 +108,7 @@ function prepare (file, opts) { if (isBrowser && file instanceof window.File) { return { - path: '', + path: file.name, symlink: false, dir: false, content: fileReaderStream(file, opts) @@ -144,7 +144,6 @@ function headers (file) { ? encodeURIComponent(file.path) : '' - // console.log('new part', file) const header = { 'Content-Disposition': `file; filename="${name}"` } if (!file.content) { diff --git a/test/fixtures/chunked-add/chunk1 b/test/fixtures/chunked-add/chunk1 new file mode 100644 index 0000000000000000000000000000000000000000..b69365566a4978d92db62a44fdd5fb89de69f02d GIT binary patch literal 1024 zcmV+b1poUjEiElAEiElAEiElAEiElAEiElAEiElAEiElLF*i9dG&nLjIWsUdHZ(Od zI5{&nIW;p43PW#hbY*UIEktQ^aBp*IbZKvHIv{3gY-KwjW@&6?ZeeX@Jt86v3PW#h zbY*UIEmV1MWjY{XaBysCV_|e@Z*DJdV{~P7Epv2oWnpa%3JnTmOPEN%RU-S(Y-Ako zaHcv@84Keczg32$&^nFy(VaVMYvtq=3eq~e;lu|1It>ar#Bv2N;@r1K{rAg--s|r} zJpxzfsVHAg(ZG#k^>Te{yDumBZy7n2elx;s(+#twEL22Rh9;6ES%Do{@BUatu{!|3 z`YC2y^j|4r(D2m&G_tW{aL%Mo^oy%_9g-zYnYqrlfem99_cVo_Mh2VMEb_}dY~=+$ z11`NzIUWXA@`1k2)#Kw}?eBu`_kJ?S*PGMMi?y6KR6!sHLmFSdEH%Vv4DNYggr^F+ zV)Ydb=Jcwr=zxgcOz)&nO;pT{YX4v2F3+{&CzcFnA~igm5)z5w+wRB0!jxnlz=Y{J zBSuD~=Qk2I-}QSuP0P1Vz4Cz4^}wvPjQgh3CK%uLYu#sQ*j|y@izoXE3L^v?iRlN2 zI$4!MzB+^lH++Z2(y(>nx|^}78E&Ml9$wOf@Xh1gU>U)C5gr5PK!y1mJWjq5vvV({ z5BQhCjtPkV5H%_x3oty%G7JQ#gJ_ufn(w+Z@OgLi&4%YM`5m@?$Hb=$%?XN)zT_K2~>q!nUqx*eO$*8(Olnk&kp zoW4_`mHKeVsDQ)rnron`ut`wf5MUW%YtM%KZbwCTHDxiGeN^@jb#c{7W?5smoB2NQ zvPRgY)=lOWCz-yPMCs88`0r|9upkVST`p3I9;VrZkHCv#8oEYxp!Ks}gEboh|5zRj zIo!gdyoTW6N&OpO3@Ub8VHsY7<0ziaqww`g+qN<}&*>~^*lYz)+X+f%x1e=;8np5g ziTc>irlaN~)Pn@Y42$RgCaenG+yZ<72V4gv;JLiFx^>2VbuOICf}sURK!RzDKh0J_ zX9I?F`q{cA4^yRH2Sv_%ci*gpy^vY6p+8t#-JyQYK;Bt=#g9`F7(NX(MFq(0<<>KTP^DwLA!;kl+gO9rhAhaFg|nz u!Tp%leMQFk=Rg8J3lX3x`r>1cbASh$DB}bxPkL!paW-0u4Tcm2X&l*A)ZW1W literal 0 HcmV?d00001 diff --git a/test/fixtures/chunked-add/chunk2 b/test/fixtures/chunked-add/chunk2 new file mode 100644 index 0000000000000000000000000000000000000000..039d4586a403dc9a635e2ab8368f2381a741ee67 GIT binary patch literal 1024 zcmV+b1poVFOPEN%RU-S(Y-AkoaHcv@84Keczg32$&^nFy(VaVMYvtq=3eq~e;lu|1 zIt>ar#Bv2N;@r1K{rAg--s|r}JpxzfsVHAg(ZG#k^>Te{yDumBZy7n2elx;s(+#tw zEL22Rh9;6ES%Do{@BUatu{!|3`YC2y^j|4r(D2m&G_tW{aL%Mo^oy%_9g-zYnYqrl zfem99_cVo_Mh2VMEb_}dY~=+$11`NzIUWXA@`1k2)#Kw}?eBu`_kJ?S*PGMMi?y6K zR6!sHLmFSdEH%Vv4DNYggr^F+V)Ydb=Jcwr=zxgcOz)&nO;pT{YX4v2F3+{&CzcFn zA~igm5)z5w+wRB0!jxnlz=Y{JBSuD~=Qk2I-}QSuP0P1Vz4Cz4^}wvPjQgh3CK%uL zYu#sQ*j|y@izoXE3L^v?iRlN2I$4!MzB+^lH++Z2(y(>nx|^}78E&Ml9$wOf@Xh1g zU>U)C5gr5PK!y1mJWjq5vvV({5BQhCjtPkV5H%_x3oty%G7JQ#gJ_ufn(w+Z@OgLi z&4%YM`5m@?$Hb=$%?XN)zT z_K2~>q!nUqx*eO$*8(Olnk&kpoW4_`mHKeVsDQ)rnron`ut`wf5MUW%YtM%KZbwCT zHDxiGeN^@jb#c{7W?5smoB2NQvPRgY)=lOWCz-yPMCs88`0r|9upkVST`p3I9;VrZ zkHCv#8oEYxp!Ks}gEboh|5zRjIo!gdyoTW6N&OpO3@Ub8VHsY7<0ziaqww`g+qN<} z&*>~^*lYz)+X+f%x1e=;8np5giTc>irlaN~)Pn@Y42$RgCaenG+yZ<72V4gv;JLiF zx^>2VbuOICf}sURK!RzDKh0J_X9I?F`q{cA4^yRH2Sv_%ci*gpy^vY6p+8t#-JyQYK;Bt=#g9`F7(NX(MFq(0<<>K zTP^DwLA!;kl+gO9rhAhaFg|nz!Tp%leMQFk=Rg8J3lX3x`r>1cbASh$DB}bxPkL!p zaW-0u4Tcm2X&l)b$lOd*^@pMAjPNk}m|7AkfyE6#gQJ92wxprj_DGp>7gs69Wm)p= z5$woA8&yOA={{gRVBHI~Bgg)g84hl688P+v Date: Tue, 11 Sep 2018 10:19:33 +0100 Subject: [PATCH 08/15] fix: use nanoid for uuid --- package.json | 1 + src/add2/send-stream.js | 20 +++----------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 3242c322d..ece153563 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "multiaddr": "^5.0.0", "multibase": "~0.4.0", "multihashes": "~0.4.13", + "nanoid": "^1.2.3", "ndjson": "^1.5.0", "once": "^1.4.0", "peer-id": "~0.11.0", diff --git a/src/add2/send-stream.js b/src/add2/send-stream.js index cf196b3d3..15309bd00 100644 --- a/src/add2/send-stream.js +++ b/src/add2/send-stream.js @@ -1,26 +1,12 @@ 'use strict' const { Duplex, Transform } = require('stream') const isStream = require('is-stream') +const nanoid = require('nanoid') const pump = require('pump') const Multipart = require('./multipart2') const { prepareWithHeaders } = require('./../utils/prepare-file') const noop = () => {} -/** - * Poor man's uuid - * - * @returns {String} - */ -function uuid () { - function chr4 () { - return Math.random().toString(16).slice(-4) - } - return chr4() + chr4() + - '-' + chr4() + - '-' + chr4() + - '-' + chr4() + - '-' + chr4() + chr4() + chr4() -} const prepareTransform = (options) => new Transform({ objectMode: true, @@ -41,7 +27,7 @@ class SendStream extends Duplex { this.send = send this.multipart = new Multipart(options) this.boundary = this.multipart._boundary - this.uuid = uuid() + this.uuid = nanoid() this.index = 0 this.rangeStart = 0 this.rangeEnd = 0 @@ -118,7 +104,7 @@ class SendStream extends Duplex { } onData (chunk) { - console.log('Send ', chunk.toString()) + console.log('Send ', chunk.length) // stop producing chunks this.multipart.pauseAll() this.extraBytes = this.multipart.extraBytes From 3cd19e721bb529e36c53e76d835ac8625741e6eb Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 11 Sep 2018 17:25:02 +0100 Subject: [PATCH 09/15] feat: callbackify and top level jsdoc and more --- package.json | 1 + src/add2/add2.js | 85 ++++++++++++++++++++++++++++++++++++++--- src/add2/multipart2.js | 33 ++++++++++++++-- src/add2/send-stream.js | 22 ++++++++++- 4 files changed, 132 insertions(+), 9 deletions(-) diff --git a/package.json b/package.json index ece153563..34dc6e822 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "once": "^1.4.0", "peer-id": "~0.11.0", "peer-info": "~0.14.1", + "promise-nodeify": "^3.0.1", "promisify-es6": "^1.0.3", "pull-defer": "~0.2.2", "pull-pushable": "^2.2.0", diff --git a/src/add2/add2.js b/src/add2/add2.js index d909d1ca4..c0aee05c1 100644 --- a/src/add2/add2.js +++ b/src/add2/add2.js @@ -2,10 +2,20 @@ const { Readable } = require('stream') const toPull = require('stream-to-pull-stream') +const promiseNodeify = require('promise-nodeify') const concatStream = require('concat-stream') const pump = require('pump') const SendStream = require('./send-stream') +/** @module api/add */ + +/** + * Converts an array to a stream + * + * @private + * @param {Array} data + * @returns {Readable} + */ const arrayToStream = (data) => { let i = 0 return new Readable({ @@ -16,10 +26,59 @@ const arrayToStream = (data) => { }) } -const add = (send) => (files, options) => { - // check if we can receive pull-stream after callbackify +/** + * @typedef {Object} AddOptions + * @property {number} chunkSize - Value of array element + * @property {number} [cidVersion=0] - Defaults to 0. The CID version to use when storing the data (storage keys are based on the CID, including it's version) + * @property {function(bytes: number): void} progress - function that will be called with the byte length of chunks as a file is added to ipfs. + * @property {Boolean} recursive - When a Path is passed, this option can be enabled to add recursively all the files. + * @property {string} hashAlg - Multihash hashing algorithm to use. (default: sha2-256) The list of all possible values {@link https://github.com/multiformats/js-multihash/blob/master/src/constants.js#L5-L343 hashAlg values} + * @property {Boolean} wrapWithDirectory - Adds a wrapping node around the content. + * @property {Boolean} onlyHash - Doesn't actually add the file to IPFS, but rather calculates its hash. + * @property {Boolean} [pin=true] - Defaults to true. Pin this object when adding. + * @property {Boolean} [rawLeaves=false] - Defaults to false. If true, DAG leaves will contain raw file data and not be wrapped in a protobuf + * @property {string} [chunker=size-262144] Chunking algorithm used to build ipfs DAGs. Available formats: + * - size-{size} + * - rabin + * - rabin-{avg} + * - rabin-{min}-{avg}-{max} + */ + +/** + * @typedef {Object} AddResult + * @property {string} path + * @property {string} hash + * @property {number} size + */ + +/** + * This callback is displayed as a global member. + * @callback AddCallback + * @param {Error} err + * @param {AddResult[]} res + */ + +/** @typedef {Function} PullStream */ +/** @typedef {(Object[]|Readable|File|PullStream|Buffer)} AddData */ +/** + * @typedef {function(AddData, AddOptions, AddCallback): (Promise.|void)} AddFunction + */ + +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {AddFunction} + * @memberof api/add + */ +const add = (send) => (files, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + let result = [] - return new Promise((resolve, reject) => { + const r = new Promise((resolve, reject) => { pump( arrayToStream([].concat(files)), new SendStream(send, options), @@ -32,13 +91,29 @@ const add = (send) => (files, options) => { } ) }) + + return promiseNodeify(r, callback) } -const addReadableStream = (send) => (options) => { +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {function(AddOptions): Readable} + * @memberof api/add + */ +const addReadableStream = (send) => (options = {}) => { return new SendStream(send, options) } -const addPullStream = (send) => (options) => { +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {function(AddOptions): PullStream} + * @memberof api/add + */ +const addPullStream = (send) => (options = {}) => { return toPull(new SendStream(send, options)) } diff --git a/src/add2/multipart2.js b/src/add2/multipart2.js index cd7adf729..493cae220 100644 --- a/src/add2/multipart2.js +++ b/src/add2/multipart2.js @@ -4,10 +4,18 @@ const { Duplex } = require('stream') const { isSource } = require('is-pull-stream') const toStream = require('pull-stream-to-stream') +/** @private @typedef {import("./add2").AddOptions} AddOptions */ + const PADDING = '--' const NEW_LINE = '\r\n' const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) +/** + * Generate a random boundary to use in a multipart request + * + * @private + * @returns {string} + */ const generateBoundary = () => { var boundary = '--------------------------' for (var i = 0; i < 24; i++) { @@ -17,6 +25,14 @@ const generateBoundary = () => { return boundary } +/** + * Generate leading section for a multipart body + * + * @private + * @param {Object} [headers={}] + * @param {string} boundary + * @returns {string} + */ const leading = (headers = {}, boundary) => { var leading = [PADDING + boundary] @@ -32,8 +48,19 @@ const leading = (headers = {}, boundary) => { return Buffer.from(leadingStr) } +/** + * Multipart class to generate a multipart body chunked and non chunked + * + * @private + * @class Multipart + * @extends {Duplex} + */ class Multipart extends Duplex { - constructor ({ chunkSize }) { + /** + * Creates an instance of Multipart. + * @param {AddOptions} options + */ + constructor (options) { super({ writableObjectMode: true, writableHighWaterMark: 1 @@ -41,7 +68,7 @@ class Multipart extends Duplex { this._boundary = generateBoundary() this.source = null - this.chunkSize = chunkSize || 0 + this.chunkSize = options.chunkSize || 0 this.buffer = Buffer.alloc(this.chunkSize) this.bufferOffset = 0 this.extraBytes = 0 @@ -97,7 +124,7 @@ class Multipart extends Duplex { return this.push(null) } - if (!this.chunkSize) { + if (this.chunkSize === 0) { return this.push(chunk) } diff --git a/src/add2/send-stream.js b/src/add2/send-stream.js index 15309bd00..ccb6c9f46 100644 --- a/src/add2/send-stream.js +++ b/src/add2/send-stream.js @@ -6,8 +6,16 @@ const pump = require('pump') const Multipart = require('./multipart2') const { prepareWithHeaders } = require('./../utils/prepare-file') +/** @private @typedef {import("./add2").AddOptions} AddOptions */ + const noop = () => {} +/** + * Factory for prepare stream + * @private + * @param {*} options + * @returns {Function} + */ const prepareTransform = (options) => new Transform({ objectMode: true, transform (chunk, encoding, callback) { @@ -19,8 +27,20 @@ const prepareTransform = (options) => new Transform({ } }) +/** + * Class to create a stream to send data to the API + * + * @private + * @class SendStream + * @extends {Duplex} + */ class SendStream extends Duplex { - constructor (send, options) { + /** + * Creates an instance of SendStream. + * @param {Function} send + * @param {AddOptions} [options={}] + */ + constructor (send, options = {}) { super({ objectMode: true, highWaterMark: 1 }) this.waiting = null this.options = options From 4191525f69433e16dd09b862821b7f8b95f5ef06 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Wed, 12 Sep 2018 11:03:33 +0100 Subject: [PATCH 10/15] fix: integration with old add --- .../add2.js => files/add-experimental.js} | 11 ++++------- src/files/add-pull-stream.js | 10 +++++----- src/files/add-readable-stream.js | 10 +++++----- src/files/add.js | 19 +++++++++++-------- src/utils/load-commands.js | 1 - .../multipart-experimental.js} | 2 +- .../send-stream-experimental.js} | 6 +++--- 7 files changed, 29 insertions(+), 30 deletions(-) rename src/{add2/add2.js => files/add-experimental.js} (91%) rename src/{add2/multipart2.js => utils/multipart-experimental.js} (98%) rename src/{add2/send-stream.js => utils/send-stream-experimental.js} (96%) diff --git a/src/add2/add2.js b/src/files/add-experimental.js similarity index 91% rename from src/add2/add2.js rename to src/files/add-experimental.js index c0aee05c1..4bb490300 100644 --- a/src/add2/add2.js +++ b/src/files/add-experimental.js @@ -5,7 +5,7 @@ const toPull = require('stream-to-pull-stream') const promiseNodeify = require('promise-nodeify') const concatStream = require('concat-stream') const pump = require('pump') -const SendStream = require('./send-stream') +const SendStream = require('../utils/send-stream-experimental') /** @module api/add */ @@ -52,7 +52,6 @@ const arrayToStream = (data) => { */ /** - * This callback is displayed as a global member. * @callback AddCallback * @param {Error} err * @param {AddResult[]} res @@ -60,9 +59,7 @@ const arrayToStream = (data) => { /** @typedef {Function} PullStream */ /** @typedef {(Object[]|Readable|File|PullStream|Buffer)} AddData */ -/** - * @typedef {function(AddData, AddOptions, AddCallback): (Promise.|void)} AddFunction - */ +/** @typedef {function(AddData, AddOptions, AddCallback): (Promise.|void)} AddFunction */ /** * Add to data to ipfs @@ -71,7 +68,7 @@ const arrayToStream = (data) => { * @returns {AddFunction} * @memberof api/add */ -const add = (send) => (files, options, callback) => { +const add = (send) => (data, options, callback) => { if (typeof options === 'function') { callback = options options = {} @@ -80,7 +77,7 @@ const add = (send) => (files, options, callback) => { let result = [] const r = new Promise((resolve, reject) => { pump( - arrayToStream([].concat(files)), + arrayToStream([].concat(data)), new SendStream(send, options), concatStream(r => (result = r)), (err) => { diff --git a/src/files/add-pull-stream.js b/src/files/add-pull-stream.js index 2076ffa8d..fabf27622 100644 --- a/src/files/add-pull-stream.js +++ b/src/files/add-pull-stream.js @@ -4,10 +4,10 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') const toPull = require('stream-to-pull-stream') -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return toPull(SendFilesStream(send, 'add')({ qs: options })) +module.exports = (send) => (options = {}) => { + if (options.experimental) { + return require('./add-experimental').addPullStream(send)(options) } + options.converter = FileResultStreamConverter + return toPull(SendFilesStream(send, 'add')({ qs: options })) } diff --git a/src/files/add-readable-stream.js b/src/files/add-readable-stream.js index 320abe692..46cddb05e 100644 --- a/src/files/add-readable-stream.js +++ b/src/files/add-readable-stream.js @@ -3,10 +3,10 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return SendFilesStream(send, 'add')(options) +module.exports = (send) => (options = {}) => { + if (options.experimental) { + return require('./add-experimental').addReadableStream(send)(options) } + options.converter = FileResultStreamConverter + return SendFilesStream(send, 'add')(options) } diff --git a/src/files/add.js b/src/files/add.js index f706843a8..1b594905c 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -10,8 +10,6 @@ const FileResultStreamConverter = require('../utils/file-result-stream-converter const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { - const createAddStream = SendFilesStream(send, 'add') - const add = promisify((_files, options, _callback) => { if (typeof options === 'function') { _callback = options @@ -26,11 +24,11 @@ module.exports = (send) => { options.converter = FileResultStreamConverter const ok = Buffer.isBuffer(_files) || - isStream.readable(_files) || - Array.isArray(_files) || - OtherBuffer.isBuffer(_files) || - typeof _files === 'object' || - isSource(_files) + isStream.readable(_files) || + Array.isArray(_files) || + OtherBuffer.isBuffer(_files) || + typeof _files === 'object' || + isSource(_files) if (!ok) { return callback(new Error('first arg must be a buffer, readable stream, pull stream, an object or array of objects')) @@ -38,6 +36,7 @@ module.exports = (send) => { const files = [].concat(_files) + const createAddStream = SendFilesStream(send, 'add') const stream = createAddStream({ qs: options }) const concat = ConcatStream((result) => callback(null, result)) stream.once('error', callback) @@ -47,7 +46,11 @@ module.exports = (send) => { stream.end() }) - return function () { + return function (data, options, callback) { + if (options.experimental) { + return require('./add-experimental').add(send)(data, options, callback) + } + const args = Array.from(arguments) // If we files.add(), then promisify thinks the pull stream is diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index d6f19246a..55b4570ab 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -64,7 +64,6 @@ function requireCommands () { addFromStream: require('../files/add')(send), addFromURL: require('../util/url-add')(send), getEndpointConfig: require('../util/get-endpoint-config')(config), - add2: require('./../add2/add2').add(send), crypto: require('libp2p-crypto'), isIPFS: require('is-ipfs') } diff --git a/src/add2/multipart2.js b/src/utils/multipart-experimental.js similarity index 98% rename from src/add2/multipart2.js rename to src/utils/multipart-experimental.js index 493cae220..ce24502cb 100644 --- a/src/add2/multipart2.js +++ b/src/utils/multipart-experimental.js @@ -4,7 +4,7 @@ const { Duplex } = require('stream') const { isSource } = require('is-pull-stream') const toStream = require('pull-stream-to-stream') -/** @private @typedef {import("./add2").AddOptions} AddOptions */ +/** @private @typedef {import("../files/add-experimental").AddOptions} AddOptions */ const PADDING = '--' const NEW_LINE = '\r\n' diff --git a/src/add2/send-stream.js b/src/utils/send-stream-experimental.js similarity index 96% rename from src/add2/send-stream.js rename to src/utils/send-stream-experimental.js index ccb6c9f46..1ac3add29 100644 --- a/src/add2/send-stream.js +++ b/src/utils/send-stream-experimental.js @@ -3,10 +3,10 @@ const { Duplex, Transform } = require('stream') const isStream = require('is-stream') const nanoid = require('nanoid') const pump = require('pump') -const Multipart = require('./multipart2') -const { prepareWithHeaders } = require('./../utils/prepare-file') +const Multipart = require('./multipart-experimental') +const { prepareWithHeaders } = require('./prepare-file') -/** @private @typedef {import("./add2").AddOptions} AddOptions */ +/** @private @typedef {import("../files/add-experimental").AddOptions} AddOptions */ const noop = () => {} From 0a4f0086a455ff2f9d7ebe214ea4d7773162f6c0 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Sun, 16 Sep 2018 12:10:56 +0100 Subject: [PATCH 11/15] feat: tests, concurrency, simplification --- package.json | 2 +- src/files/add.js | 2 +- src/utils/multipart-experimental.js | 119 +++--- src/utils/prepare-file.js | 12 +- src/utils/send-files-stream.js | 2 +- src/utils/send-stream-experimental.js | 76 ++-- test/add-experimental.spec.js | 516 ++++++++++++++++++++++++++ test/fixtures/2048 | Bin 0 -> 2048 bytes 8 files changed, 620 insertions(+), 109 deletions(-) create mode 100644 test/add-experimental.spec.js create mode 100644 test/fixtures/2048 diff --git a/package.json b/package.json index 34dc6e822..ce970121e 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "async": "^2.6.1", "big.js": "^5.1.2", "bs58": "^4.0.1", + "buffer-to-stream": "^1.0.0", "cids": "~0.5.3", "concat-stream": "^1.6.2", "content": "^4.0.5", @@ -62,7 +63,6 @@ "pump": "^3.0.0", "qs": "^6.5.2", "readable-stream": "^2.3.6", - "readable-stream-node-to-web": "^1.0.1", "stream-http": "hugomrdias/stream-http#fix/body-handling", "stream-to-pull-stream": "^1.7.2", "streamifier": "~0.1.1", diff --git a/src/files/add.js b/src/files/add.js index 1b594905c..8ffd4d3d2 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -47,7 +47,7 @@ module.exports = (send) => { }) return function (data, options, callback) { - if (options.experimental) { + if (options && options.experimental) { return require('./add-experimental').add(send)(data, options, callback) } diff --git a/src/utils/multipart-experimental.js b/src/utils/multipart-experimental.js index ce24502cb..6e7eeb1e3 100644 --- a/src/utils/multipart-experimental.js +++ b/src/utils/multipart-experimental.js @@ -1,8 +1,10 @@ 'use strict' -const { Duplex } = require('stream') +const { Duplex, PassThrough } = require('stream') const { isSource } = require('is-pull-stream') -const toStream = require('pull-stream-to-stream') +const pump = require('pump') +const pullToStream = require('pull-stream-to-stream') +const bufferToStream = require('buffer-to-stream') /** @private @typedef {import("../files/add-experimental").AddOptions} AddOptions */ @@ -63,7 +65,8 @@ class Multipart extends Duplex { constructor (options) { super({ writableObjectMode: true, - writableHighWaterMark: 1 + writableHighWaterMark: 1, + readableHighWaterMark: options.chunkSize ? Math.max(136, options.chunkSize) : 16384 // min is 136 }) this._boundary = generateBoundary() @@ -72,12 +75,11 @@ class Multipart extends Duplex { this.buffer = Buffer.alloc(this.chunkSize) this.bufferOffset = 0 this.extraBytes = 0 + this.sourceReadable = false } _read () { - if (this.source && !this.isPaused()) { - this.source.resume() - } + // empty read } _write (file, encoding, callback) { @@ -87,30 +89,34 @@ class Multipart extends Duplex { } _final (callback) { - this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE), true) // Flush the rest and finish - if (this.bufferOffset && !this.destroyed) { + const tail = Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE) + if (this.chunkSize === 0) { + this.push(tail) + } else { + this.extraBytes += tail.length const slice = this.buffer.slice(0, this.bufferOffset) - this.push(slice) + this.bufferOffset = 0 + this.push(Buffer.concat([slice, tail], slice.length + tail.length)) } + this.push(null) callback() } - pauseAll () { - this.pause() - if (this.source) { - this.source.pause() - } - } + resume () { + super.resume() - resumeAll () { - this.resume() - if (this.source) { - this.source.resume() + // Chunked mode + if (this.chunkSize > 0 && this.sourceReadable) { + let chunk + while (!this.isPaused() && (chunk = this.source.read(this.chunkSize - this.bufferOffset)) !== null) { + this.pushChunk(chunk) + } } } + /** * Push chunk * @@ -119,7 +125,6 @@ class Multipart extends Duplex { * @return {boolean} */ pushChunk (chunk, isExtra = false) { - let result = true if (chunk === null) { return this.push(null) } @@ -132,61 +137,65 @@ class Multipart extends Duplex { this.extraBytes += chunk.length } - // If we have enough bytes in this chunk to get buffer up to chunkSize, - // fill in buffer, push it, and reset its offset. - // Otherwise, just copy the entire chunk in to buffer. + if (this.bufferOffset === 0 && chunk.length === this.chunkSize) { + return this.push(chunk) + } + const bytesNeeded = (this.chunkSize - this.bufferOffset) - if (chunk.length >= bytesNeeded) { - chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) - result = this.push(this.buffer) + // make sure we have the correct amount of bytes + if (chunk.length === bytesNeeded) { + // chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) + const slice = this.buffer.slice(0, this.bufferOffset) this.bufferOffset = 0 - // Handle leftovers from the chunk - const leftovers = chunk.slice(0, chunk.length - bytesNeeded) - let size = leftovers.length - while (size >= this.chunkSize) { - result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize)) - this.bufferOffset += this.chunkSize - size -= this.chunkSize - } - // if we still have anything left copy to the buffer - chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size) - this.bufferOffset = size - } else { - chunk.copy(this.buffer, this.bufferOffset) - this.bufferOffset += chunk.length + return this.push(Buffer.concat([slice, chunk], slice.length + chunk.length)) } - return result + if (chunk.length > bytesNeeded) { + this.emit('error', new RangeError(`Chunk is too big needed ${bytesNeeded} got ${chunk.length}`)) + return false + } + + chunk.copy(this.buffer, this.bufferOffset) + this.bufferOffset += chunk.length + + return true } pushFile (file, callback) { this.pushChunk(leading(file.headers, this._boundary), true) - let content = file.content || Buffer.alloc(0) + this.source = file.content || Buffer.alloc(0) - if (Buffer.isBuffer(content)) { - this.pushChunk(content) - this.pushChunk(NEW_LINE_BUFFER, true) - return callback() + if (Buffer.isBuffer(this.source)) { + this.source = bufferToStream(this.source) } - if (isSource(content)) { - content = toStream.source(content) + if (isSource(file.content)) { + // pull-stream-to-stream doesn't support readable event... + this.source = pump([pullToStream.source(file.content), new PassThrough()]) } - this.source = content - // From now on we assume content is a stream - content.on('data', (data) => { - if (!this.pushChunk(data)) { - content.pause() + this.source.on('readable', () => { + this.sourceReadable = true + let chunk = null + if (this.chunkSize === 0) { + if ((chunk = this.source.read()) !== null) { + this.pushChunk(chunk) + } + } else { + while (!this.isPaused() && (chunk = this.source.read(this.chunkSize - this.bufferOffset)) !== null) { + this.pushChunk(chunk) + } } }) - content.once('error', this.emit.bind(this, 'error')) - content.once('end', () => { + this.source.on('end', () => { + this.sourceReadable = false this.pushChunk(NEW_LINE_BUFFER, true) callback() }) + + this.source.on('error', err => this.emit('error', err)) } } diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index f8807bf49..e4caf4915 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -89,15 +89,13 @@ function prepareFile (file, opts) { } function prepare (file, opts) { + // probably it should be valid and would be handled below with Buffer.from if (typeof file === 'string') { - if (!isNode) { - throw new Error('Can only add file paths in node') - } - - return loadPaths(opts, file) + throw new Error('String isn\'t valid as an input') } - if (file.path && !file.content) { + // needs to test for stream because fs.createReadStream has path prop and would handle here + if (!isStream(file) && file.path && !file.content) { file.dir = true return file } @@ -106,7 +104,7 @@ function prepare (file, opts) { return file } - if (isBrowser && file instanceof window.File) { + if (isBrowser && file instanceof self.File) { return { path: file.name, symlink: false, diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js index 5ad545461..b87a94da2 100644 --- a/src/utils/send-files-stream.js +++ b/src/utils/send-files-stream.js @@ -4,7 +4,7 @@ const Duplex = require('stream').Duplex const eachSeries = require('async/eachSeries') const isStream = require('is-stream') const once = require('once') -const {prepareFile} = require('./prepare-file') +const { prepareFile } = require('./prepare-file') const Multipart = require('./multipart') function headers (file) { diff --git a/src/utils/send-stream-experimental.js b/src/utils/send-stream-experimental.js index 1ac3add29..b019dedb1 100644 --- a/src/utils/send-stream-experimental.js +++ b/src/utils/send-stream-experimental.js @@ -10,6 +10,21 @@ const { prepareWithHeaders } = require('./prepare-file') const noop = () => {} +/** + * Convert back to the proper schema + * + * @private + * @param {Object} data + * @returns {Object} + */ +const convert = (data) => { + return { + path: data.Name, + hash: data.Hash, + size: data.Size + } +} + /** * Factory for prepare stream * @private @@ -42,7 +57,6 @@ class SendStream extends Duplex { */ constructor (send, options = {}) { super({ objectMode: true, highWaterMark: 1 }) - this.waiting = null this.options = options this.send = send this.multipart = new Multipart(options) @@ -52,9 +66,6 @@ class SendStream extends Duplex { this.rangeStart = 0 this.rangeEnd = 0 this.rangeTotal = 0 - this.running = false - this.extraBytes = 0 - this.totalUp = 0 this.qs = { 'cid-version': this.options['cid-version'], 'raw-leaves': this.options['raw-leaves'], @@ -87,11 +98,14 @@ class SendStream extends Duplex { }) if (options.chunkSize) { - this.multipart.on('end', this.onEnd.bind(this)) this.multipart.on('data', this.onData.bind(this)) } } + _read () { + // empty read + } + _write (chunk, encoding, callback) { this.source.write(chunk) callback() @@ -101,42 +115,16 @@ class SendStream extends Duplex { this.source.end() } - _read (size) { - // read - } - - onEnd () { - console.log('End', this.rangeTotal) - - // wait for all chunks to be sent - // doing all this in the end should simplify future concurrent chunk uploads - if (this.running && this.waiting === null) { - this.waiting = setInterval(() => { - if (!this.running) { - clearInterval(this.waiting) - this.waiting = null - this.requestChunk(null) - } - }, 100) - } else { - this.requestChunk(null) - } - } - onData (chunk) { - console.log('Send ', chunk.length) + // this.multipart.pause() // stop producing chunks - this.multipart.pauseAll() - this.extraBytes = this.multipart.extraBytes this.index++ this.rangeEnd = this.rangeStart + chunk.length this.rangeTotal += chunk.length - this.running = true + this.rangeStart = this.rangeEnd this.requestChunk(chunk) .then(() => { - this.running = false - this.rangeStart = this.rangeEnd - this.multipart.resumeAll() + // this.multipart.resume() }) } @@ -152,15 +140,18 @@ class SendStream extends Duplex { return this.emit('error', err) } + // progress upload reporting + const totalUp = Math.max((this.rangeTotal - this.multipart.extraBytes) / 2, 0) + progressFn(totalUp) // we are in the last request if (isStream(res)) { res.on('data', (d) => { - if (d.path) { - // files added reporting - this.push(d) + if (d.Hash) { + // files added reporting + this.push(convert(d)) } else { - // progress add reporting - progressFn((d.Bytes / 2) + this.totalUp) + // progress add reporting + progressFn((d.Bytes / 2) + totalUp) } }) res.on('error', err => this.emit('error', err)) @@ -169,9 +160,6 @@ class SendStream extends Duplex { this.push(null) }) } else { - // progress upload reporting - this.totalUp = (this.rangeTotal - this.extraBytes) / 2 - progressFn(this.totalUp) resolve() } }) @@ -192,9 +180,9 @@ class SendStream extends Duplex { } res.on('data', (d) => { - if (d.hash) { + if (d.Hash) { // files added reporting - this.push(d) + this.push(convert(d)) } else { // progress add reporting progressFn(d.Bytes) diff --git a/test/add-experimental.spec.js b/test/add-experimental.spec.js new file mode 100644 index 000000000..b99522de8 --- /dev/null +++ b/test/add-experimental.spec.js @@ -0,0 +1,516 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const loadFixture = require('aegir/fixtures') +const mh = require('multihashes') +const CID = require('cids') +const path = require('path') +const { Readable } = require('stream') +const pull = require('pull-stream') +const IPFSApi = require('../src') +const f = require('./utils/factory') +const expectTimeout = require('./utils/expect-timeout') +const testfile = loadFixture('test/fixtures/2048') + +// TODO: Test against all algorithms Object.keys(mh.names) +// This subset is known to work with both go-ipfs and js-ipfs as of 2017-09-05 +const HASH_ALGS = [ + 'sha1', + 'sha2-256', + 'sha2-512', + 'keccak-224', + 'keccak-256', + 'keccak-384', + 'keccak-512' +] + +const runs = [ + {name: 'chunked', options: {experimental: true, chunkSize: 2 * 1024}}, + {name: 'non-chunked', options: {experimental: true}}, + {name: 'current non-chunked', options: {}} +] + +describe.only('experimental add', function () { + this.timeout(120 * 1000) + + let ipfsd + let ipfs + + ipfs = IPFSApi('localhost', '5002') + const expectedMultihash = 'QmcfPue16BgM2UqRg7tkoqbLgW4PKZok2HKyn9YEu1Eiyz' + + // before((done) => { + // f.spawn({ initOptions: { bits: 1024 } }, (err, _ipfsd) => { + // expect(err).to.not.exist() + // ipfsd = _ipfsd + // ipfs = IPFSApi(_ipfsd.apiAddr) + // done() + // }) + // }) + + // after((done) => { + // if (!ipfsd) return done() + // ipfsd.stop(done) + // }) + runs.forEach(run => { + it(`files.add ${run.name} - file for testing`, (done) => { + ipfs.files.add(testfile, run.options, (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedMultihash) + expect(res[0].path).to.equal(expectedMultihash) + done() + }) + }) + + it(`files.add ${run.name} - with Buffer module`, (done) => { + let Buffer = require('buffer').Buffer + + let expectedBufferMultihash = 'QmWfVY9y3xjsixTgbd9AorQxH7VtMpzfx2HaWtsoUYecaX' + let file = Buffer.from('hello') + + ipfs.files.add(file, run.options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedBufferMultihash) + expect(res[0].path).to.equal(expectedBufferMultihash) + done() + }) + }) + it(`files.add ${run.name} with empty path and buffer content`, (done) => { + const expectedHash = 'QmWfVY9y3xjsixTgbd9AorQxH7VtMpzfx2HaWtsoUYecaX' + const content = Buffer.from('hello') + + ipfs.files.add([{ path: '', content }], run.options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedHash) + expect(res[0].path).to.equal(expectedHash) + done() + }) + }) + it(`files.add ${run.name} with cid-version=1 and raw-leaves=false`, (done) => { + const expectedCid = 'zdj7WjkeH54wf9wuC9MQrrNgDRuJiFq37DstbjWSwvuiSod9v' + const options = Object.assign({}, run.options, { 'cid-version': 1, 'raw-leaves': false }) + + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedCid) + expect(res[0].path).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} with only-hash=true`, function () { + this.slow(10 * 1000) + const content = String(Math.random() + Date.now()) + const options = Object.assign({}, run.options, { onlyHash: true, experimental: true }) + + return ipfs.files.add(Buffer.from(content), options) + .then(files => { + expect(files).to.have.length(1) + + // 'ipfs.object.get()' should timeout because content wasn't actually added + return expectTimeout(ipfs.object.get(files[0].hash), 4000) + }) + }) + + it(`files.add ${run.name} with options`, (done) => { + const options = Object.assign({}, run.options, { pin: false }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedMultihash) + expect(res[0].path).to.equal(expectedMultihash) + done() + }) + }) + + HASH_ALGS.forEach((name) => { + it(`files.add ${run.name} with hash=${name} and raw-leaves=false`, (done) => { + const content = String(Math.random() + Date.now()) + const file = { + path: content + '.txt', + content: Buffer.from(content) + } + + const options = Object.assign( + {}, + run.options, + { hash: name, 'raw-leaves': false, experimental: true } + ) + + ipfs.files.add([file], options, (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + const cid = new CID(res[0].hash) + expect(mh.decode(cid.multihash).name).to.equal(name) + done() + }) + }) + }) + + it(`files.add ${run.name} file with progress option`, (done) => { + let progress + + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + // TODO: needs to be using a big file + it.skip(`files.add ${run.name} - big file with progress option`, (done) => { + let progress = 0 + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + // TODO: needs to be using a directory + it(`files.add ${run.name} - directory with progress option`, (done) => { + let progress = 0 + + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + it(`files.addPullStream ${run.name} - with object chunks and pull stream content`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + pull( + pull.values([{ content: pull.values([Buffer.from('test')]) }]), + ipfs.files.addPullStream(run.options), + pull.collect((err, res) => { + if (err) return done(err) + + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + ) + }) + + it(`files.add ${run.name} - with pull stream (callback)`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + ipfs.files.add(pull.values([Buffer.from('test')]), run.options, (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + }) + + it(`files.add ${run.name} - with pull stream (promise)`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add(pull.values([Buffer.from('test')]), run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + it(`files.add ${run.name} - with array of objects with pull stream content`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add( + [{ content: pull.values([Buffer.from('test')]) }], + run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + // tests from interface-core add + it(`files.add ${run.name} - should not be able to add by path`, (done) => { + const validPath = path.join(process.cwd() + '/package.json') + + ipfs.files.add(validPath, run.options, (err, res) => { + expect(err).to.exist() + done() + }) + }) + + it(`files.add ${run.name} - should add readable stream`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + ipfs.files.add(rs, run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal(expectedCid) + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with readable stream content`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + const tuple = { path: 'data.txt', content: rs } + + ipfs.files.add([tuple], run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal('data.txt') + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with readable stream content`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + const tuple = { path: 'data.txt', content: rs } + + ipfs.files.add([tuple], run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal('data.txt') + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with pull stream content (promised)`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add([{ content: pull.values([Buffer.from('test')]) }], run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + it(`files.add ${run.name} - should add a nested directory as array of tupples`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const dirs = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + ipfs.files.add(dirs, run.options, (err, res) => { + expect(err).to.not.exist() + const root = res[res.length - 1] + + expect(root.path).to.equal('test-folder') + expect(root.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + }) + }) + + it(`files.add ${run.name} - should fail when passed invalid input`, (done) => { + const nonValid = 'sfdasfasfs' + + ipfs.files.add(nonValid, run.options, (err, result) => { + expect(err).to.exist() + done() + }) + }) + + // TODO: fix current implementation fails here + it.skip(`files.add ${run.name} - should wrap content in a directory`, (done) => { + const data = { path: 'testfile.txt', content: Buffer.from('test') } + const options = Object.assign({}, run.options, { + wrapWithDirectory: true + }) + + ipfs.files.add(data, options, (err, filesAdded) => { + expect(err).to.not.exist() + expect(filesAdded).to.have.length(2) + const file = filesAdded[0] + const wrapped = filesAdded[1] + expect(file.hash).to.equal('QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm') + expect(file.path).to.equal('testfile.txt') + expect(wrapped.path).to.equal('') + done() + }) + }) + + // tests from interface-core add-pullstream + + it(`files.add ${run.name} - should add pull stream of valid files and dirs`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const files = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + const stream = ipfs.files.addPullStream(run.options) + + pull( + pull.values(files), + stream, + pull.collect((err, filesAdded) => { + expect(err).to.not.exist() + + filesAdded.forEach((file) => { + if (file.path === 'test-folder') { + expect(file.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + } + }) + }) + ) + }) + + it(`files.add ${run.name} - should add with object chunks and pull stream content`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + pull( + pull.values([{ content: pull.values([Buffer.from('test')]) }]), + ipfs.files.addPullStream(run.options), + pull.collect((err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + ) + }) + // tests from interface-core add-readable-stream + + it(`files.add ${run.name} - should add readable stream of valid files and dirs`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const files = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + const stream = ipfs.files.addReadableStream(run.options) + + stream.on('error', (err) => { + expect(err).to.not.exist() + }) + + stream.on('data', (file) => { + if (file.path === 'test-folder') { + expect(file.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + } + }) + + files.forEach((file) => stream.write(file)) + stream.end() + }) + // end runs + }) + + it.skip('files.add pins by default', (done) => { + const newContent = Buffer.from(String(Math.random())) + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + const initialPinCount = pins.length + ipfs.files.add(newContent, { experimental: true }, (err, res) => { + expect(err).to.not.exist() + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + expect(pins.length).to.eql(initialPinCount + 1) + done() + }) + }) + }) + }) + + it.skip('files.add with pin=false', (done) => { + const newContent = Buffer.from(String(Math.random())) + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + const initialPinCount = pins.length + ipfs.files.add(newContent, { pin: false, experimental: true }, (err, res) => { + expect(err).to.not.exist() + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + expect(pins.length).to.eql(initialPinCount) + done() + }) + }) + }) + }) +}) diff --git a/test/fixtures/2048 b/test/fixtures/2048 new file mode 100644 index 0000000000000000000000000000000000000000..807b97e3e1785899ae6528f5033f483952e7b54d GIT binary patch literal 2048 zcmV+b2>L93+WT!kmbl6Vv4(kex)Ctkzt+|xc8}ytrhJMn2sbgeOF(|=a z@_+FsSE6tV#esAZVNTDS|F1Q@_urEk1hQ4Z{SBJ%2TI|urrd!{Xaz8i3p$u6l~6qC zzNukwud|`;^Ky#rLswlSX_y&>0}m1pWs~G>+O=~^TF27*aPOH@<}}vExPfT94RcCQ zm!!(_1D`9|M(j9(K~RX^g-Kmv3QCf+d8)X?qs?os?IHV*!C90#xoREZf$#riQ* z6%tMat`6VizZc8xb&ioj_t>H=N3^W`(ODruCw4$lNjxDAEP5479xE;zdcWXDUq^I{ z$VX0co;m*5gC@`GbihU0h3?HUJ^SKA7C8z95|7U7ac@|sdsGJCgbC#lqr`>;MWVIf zH6PnbWT`YS8XgA3+A>m_B11tfv}l-L@Z=~ck4v?jJyMgXi!q1O4y5p(EU77!m)40Z zb{eOp5fesZ%oK>xedWNI#z%Rru<`dZE-c%UBk*lKbq_;k`$^O;NRgTX-M*4+ND<26 zMQXNJf&9mX3yjzF$dp%I+;ZRQxI={5aYYRpU(NZFOJfl4C3{-{_W%`kCRNHWwXz(q zm*+oG*Ayp*4@=r~cF70_?YxsJ<<4RkbF ztHwGE-`D{UG2+cc;b3OJGb+sOg~O$=FX~MM%$bHA>xOCS_vu{g=Kez!4P~1J z^}A!@#v0SKb;PExpC~6{>j?4Vp(sDxk0bm@F?gbNo%Xy)z@Q+HDF-6eM{r1ZSC93c zbKpmHYKZc-BaH$QecD~{4&gL_JsX@US+NpHk@A?h9$HRjW^NYkF~05Jc2qe~Uss>t zc;}9u%RXx{DGlw^cvvtMHwuK+k||+4AUF<|NQIa^RAJzfxh&o89DO1;i&zD3eiJU7 zLp|#+K(u9Wq~m#}vg}U&BPxN*_*x*LJ^vT~gOZKxr$G&C(&wdk!WYCiErMg5krDm? z*s+bB8&o+@jlJeqh`k!^7v1MJxUNYZa=VA@o@G44g!7%l)3k&5sZ?iN?)vFHisBWV z6@g>oRz7_2m61cWoDx`cb^FN!mth1DDfYB~*;_;XhiLTe_SaC-%^k?>;wrUO#cpIU z{F=iz7#F8@XP##Dz^tXfQGcSvN7P9gcb4GoN4aQ7RBLw#eEK6SyREz!oD9kY%c79X z-pv46l{vYZTFngn*c;z#R6c7(2nSRh3z*}$HEums(r2wQ!IY=PmjoC1J`KCBw5%4V z{rGB8h})4i%-2D>n7*c#ny3DTcPU})2d*9iTC0Br1lxet+EFkjZN@3aYx z)gXveo+p=!LWhPb8~7Aol`>>F@ygcD{* zWT{jt+h{<425K-pR@O$how-PV?}@X(gIg>CR(i;1CatgOK?$PKkN3Gkcf)u5H|vw$l3^b;go#t>(Rs5nS}IvDlLB zvv``+djz{-pNJiOgcf+cSJyXKB1OZR&9+1qdITCElsp*oXvN&j9)o$x7hL9gGe0l%@vMWs)f6*e-VWUQxd3wUj;NQ;G=gWCI|8 zzJn6?61to|MVm~rY# z2-uTwj6zTctnQr+46Xtg*%CPW&|iHZ+}?hn(pv_oS+MzBP>R|TR#V4+SnO!rGy&1x z04DP{Zn|S$R5w|;b1Hg4R^JxzsSTvLr&L(Vm(SV{oZGk2=`DALtBv2L6g9KQxFGiC eJI~NQDoaZA={mG>=NMdMyrbjAK Date: Mon, 17 Sep 2018 12:56:20 +0100 Subject: [PATCH 12/15] fix: remove concurrency for now --- src/utils/multipart-experimental.js | 1 - src/utils/send-stream-experimental.js | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/utils/multipart-experimental.js b/src/utils/multipart-experimental.js index 6e7eeb1e3..8a0c73966 100644 --- a/src/utils/multipart-experimental.js +++ b/src/utils/multipart-experimental.js @@ -144,7 +144,6 @@ class Multipart extends Duplex { const bytesNeeded = (this.chunkSize - this.bufferOffset) // make sure we have the correct amount of bytes if (chunk.length === bytesNeeded) { - // chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) const slice = this.buffer.slice(0, this.bufferOffset) this.bufferOffset = 0 return this.push(Buffer.concat([slice, chunk], slice.length + chunk.length)) diff --git a/src/utils/send-stream-experimental.js b/src/utils/send-stream-experimental.js index b019dedb1..fc4ff1d0c 100644 --- a/src/utils/send-stream-experimental.js +++ b/src/utils/send-stream-experimental.js @@ -116,16 +116,16 @@ class SendStream extends Duplex { } onData (chunk) { - // this.multipart.pause() + this.multipart.pause() // stop producing chunks this.index++ this.rangeEnd = this.rangeStart + chunk.length this.rangeTotal += chunk.length - this.rangeStart = this.rangeEnd this.requestChunk(chunk) .then(() => { - // this.multipart.resume() + this.multipart.resume() }) + this.rangeStart = this.rangeEnd } requestChunk (chunk) { @@ -145,7 +145,7 @@ class SendStream extends Duplex { progressFn(totalUp) // we are in the last request if (isStream(res)) { - res.on('data', (d) => { + res.on('data', d => { if (d.Hash) { // files added reporting this.push(convert(d)) From d5962951b141813e88de52882bbe3e1492bdaa4a Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 18 Sep 2018 16:24:33 +0100 Subject: [PATCH 13/15] fix: feedback changes --- package.json | 1 + src/files/add-experimental.js | 2 +- src/files/add-pull-stream.js | 3 +- src/files/add-readable-stream.js | 3 +- src/utils/multipart-experimental.js | 18 ++-- src/utils/prepare-file.js | 124 +++++--------------------- src/utils/send-stream-experimental.js | 10 +-- 7 files changed, 38 insertions(+), 123 deletions(-) diff --git a/package.json b/package.json index ce970121e..ecf20a945 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "content": "^4.0.5", "debug": "^3.1.0", "detect-node": "^2.0.3", + "err-code": "^1.1.2", "filereader-stream": "^2.0.0", "flatmap": "0.0.3", "glob": "^7.1.2", diff --git a/src/files/add-experimental.js b/src/files/add-experimental.js index 4bb490300..9d8c48b9a 100644 --- a/src/files/add-experimental.js +++ b/src/files/add-experimental.js @@ -12,7 +12,7 @@ const SendStream = require('../utils/send-stream-experimental') /** * Converts an array to a stream * - * @private + * @ignore * @param {Array} data * @returns {Readable} */ diff --git a/src/files/add-pull-stream.js b/src/files/add-pull-stream.js index fabf27622..1b0875f21 100644 --- a/src/files/add-pull-stream.js +++ b/src/files/add-pull-stream.js @@ -3,10 +3,11 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') const toPull = require('stream-to-pull-stream') +const { addPullStream } = require('./add-experimental') module.exports = (send) => (options = {}) => { if (options.experimental) { - return require('./add-experimental').addPullStream(send)(options) + return addPullStream(send)(options) } options.converter = FileResultStreamConverter return toPull(SendFilesStream(send, 'add')({ qs: options })) diff --git a/src/files/add-readable-stream.js b/src/files/add-readable-stream.js index 46cddb05e..6dbb687bc 100644 --- a/src/files/add-readable-stream.js +++ b/src/files/add-readable-stream.js @@ -2,10 +2,11 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') +const { addReadableStream } = require('./add-experimental') module.exports = (send) => (options = {}) => { if (options.experimental) { - return require('./add-experimental').addReadableStream(send)(options) + return addReadableStream(send)(options) } options.converter = FileResultStreamConverter return SendFilesStream(send, 'add')(options) diff --git a/src/utils/multipart-experimental.js b/src/utils/multipart-experimental.js index 8a0c73966..3bab0897f 100644 --- a/src/utils/multipart-experimental.js +++ b/src/utils/multipart-experimental.js @@ -6,7 +6,7 @@ const pump = require('pump') const pullToStream = require('pull-stream-to-stream') const bufferToStream = require('buffer-to-stream') -/** @private @typedef {import("../files/add-experimental").AddOptions} AddOptions */ +/** @ignore @typedef {import("../files/add-experimental").AddOptions} AddOptions */ const PADDING = '--' const NEW_LINE = '\r\n' @@ -15,7 +15,7 @@ const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) /** * Generate a random boundary to use in a multipart request * - * @private + * @ignore * @returns {string} */ const generateBoundary = () => { @@ -30,7 +30,7 @@ const generateBoundary = () => { /** * Generate leading section for a multipart body * - * @private + * @ignore * @param {Object} [headers={}] * @param {string} boundary * @returns {string} @@ -53,7 +53,7 @@ const leading = (headers = {}, boundary) => { /** * Multipart class to generate a multipart body chunked and non chunked * - * @private + * @ignore * @class Multipart * @extends {Duplex} */ @@ -125,11 +125,7 @@ class Multipart extends Duplex { * @return {boolean} */ pushChunk (chunk, isExtra = false) { - if (chunk === null) { - return this.push(null) - } - - if (this.chunkSize === 0) { + if (chunk === null || this.chunkSize === 0) { return this.push(chunk) } @@ -167,9 +163,7 @@ class Multipart extends Duplex { if (Buffer.isBuffer(this.source)) { this.source = bufferToStream(this.source) - } - - if (isSource(file.content)) { + } else if (isSource(file.content)) { // pull-stream-to-stream doesn't support readable event... this.source = pump([pullToStream.source(file.content), new PassThrough()]) } diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index e4caf4915..3bed54d79 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -1,6 +1,6 @@ 'use strict' -const isNode = require('detect-node') +const errcode = require('err-code') const { isSource } = require('is-pull-stream') const isStream = require('is-stream') const flatmap = require('flatmap') @@ -10,76 +10,6 @@ const isBrowser = typeof window === 'object' && typeof document === 'object' && document.nodeType === 9 -function loadPaths (opts, file) { - const path = require('path') - const fs = require('fs') - const glob = require('glob') - - const followSymlinks = opts.followSymlinks != null ? opts.followSymlinks : true - - file = path.resolve(file) - const stats = fs.statSync(file) - - if (stats.isDirectory() && !opts.recursive) { - throw new Error('Can only add directories using --recursive') - } - - if (stats.isDirectory() && opts.recursive) { - // glob requires a POSIX filename - file = file.split(path.sep).join('/') - const fullDir = file + (file.endsWith('/') ? '' : '/') - let dirName = fullDir.split('/') - dirName = dirName[dirName.length - 2] + '/' - const mg = new glob.sync.GlobSync('**/*', { - cwd: file, - follow: followSymlinks, - dot: opts.hidden, - ignore: opts.ignore - }) - - return mg.found - .map((name) => { - const fqn = fullDir + name - // symlinks - if (mg.symlinks[fqn] === true) { - return { - path: dirName + name, - symlink: true, - dir: false, - content: fs.readlinkSync(fqn) - } - } - - // files - if (mg.cache[fqn] === 'FILE') { - return { - path: dirName + name, - symlink: false, - dir: false, - content: fs.createReadStream(fqn) - } - } - - // directories - if (mg.cache[fqn] === 'DIR' || mg.cache[fqn] instanceof Array) { - return { - path: dirName + name, - symlink: false, - dir: true - } - } - // files inside symlinks and others - }) - // filter out null files - .filter(Boolean) - } - - return { - path: path.basename(file), - content: fs.createReadStream(file) - } -} - function prepareFile (file, opts) { let files = [].concat(file) @@ -89,45 +19,33 @@ function prepareFile (file, opts) { } function prepare (file, opts) { + const result = { + path: '', + symlink: false, + dir: false, + content: null + } // probably it should be valid and would be handled below with Buffer.from if (typeof file === 'string') { - throw new Error('String isn\'t valid as an input') + throw errcode(new Error('String isn\'t valid as an input'), 'ERR_INVALID_INPUT') } // needs to test for stream because fs.createReadStream has path prop and would handle here - if (!isStream(file) && file.path && !file.content) { - file.dir = true - return file - } - - if (file.content || file.dir) { - return file - } - - if (isBrowser && file instanceof self.File) { - return { - path: file.name, - symlink: false, - dir: false, - content: fileReaderStream(file, opts) - } + if (!isStream(file) && file.path && !file.content) { // {path, content} input with no content so we assume directory + result.dir = true + } else if (file.content || file.dir) { // {path, content} input with content or dir just copy + result.content = file.content + result.dir = file.dir + } else if (isBrowser && file instanceof self.File) { // browser File input we create a stream from it + result.path = file.name + result.content = fileReaderStream(file, opts) + } else if (!isStream(file) && !isSource(file) && !Buffer.isBuffer(file)) { // if not pull-stream, stream or buffer try to create a buffer from input + result.content = Buffer.from(file) + } else { // here we only have pull-stream, stream or buffer so we just set content to input + result.content = file } - if (!isStream(file) && !isSource(file) && !Buffer.isBuffer(file)) { - return { - path: '', - symlink: false, - dir: false, - content: Buffer.from(file) - } - } - - return { - path: '', - symlink: false, - dir: false, - content: file - } + return result } function prepareWithHeaders (file, opts) { diff --git a/src/utils/send-stream-experimental.js b/src/utils/send-stream-experimental.js index fc4ff1d0c..b5866aeba 100644 --- a/src/utils/send-stream-experimental.js +++ b/src/utils/send-stream-experimental.js @@ -6,14 +6,14 @@ const pump = require('pump') const Multipart = require('./multipart-experimental') const { prepareWithHeaders } = require('./prepare-file') -/** @private @typedef {import("../files/add-experimental").AddOptions} AddOptions */ +/** @ignore @typedef {import("../files/add-experimental").AddOptions} AddOptions */ const noop = () => {} /** * Convert back to the proper schema * - * @private + * @ignore * @param {Object} data * @returns {Object} */ @@ -27,7 +27,7 @@ const convert = (data) => { /** * Factory for prepare stream - * @private + * @ignore * @param {*} options * @returns {Function} */ @@ -45,7 +45,7 @@ const prepareTransform = (options) => new Transform({ /** * Class to create a stream to send data to the API * - * @private + * @ignore * @class SendStream * @extends {Duplex} */ @@ -103,7 +103,7 @@ class SendStream extends Duplex { } _read () { - // empty read + // duplex stream needs to implement _read() } _write (chunk, encoding, callback) { From 7489447bfc960fcb5c36373292231a6855ddd7c5 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 18 Sep 2018 16:27:31 +0100 Subject: [PATCH 14/15] fix: add some property descriptions --- src/files/add-experimental.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/files/add-experimental.js b/src/files/add-experimental.js index 9d8c48b9a..53cdc3ebe 100644 --- a/src/files/add-experimental.js +++ b/src/files/add-experimental.js @@ -46,9 +46,9 @@ const arrayToStream = (data) => { /** * @typedef {Object} AddResult - * @property {string} path - * @property {string} hash - * @property {number} size + * @property {string} path - Object path + * @property {string} hash - Object CID + * @property {number} size - Object size */ /** From a3bf0a6c633cfd9d544cdcbab420c396a0dec881 Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Tue, 18 Sep 2018 16:31:54 +0100 Subject: [PATCH 15/15] fix: add jsdoc to pushFile --- src/utils/multipart-experimental.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/utils/multipart-experimental.js b/src/utils/multipart-experimental.js index 3bab0897f..59b83ec44 100644 --- a/src/utils/multipart-experimental.js +++ b/src/utils/multipart-experimental.js @@ -156,6 +156,13 @@ class Multipart extends Duplex { return true } + /** + * Consume file and push to readable stream + * + * @param {(Buffer|Readable|PullStream)} file + * @param {function(): void} callback + * @return {void} + */ pushFile (file, callback) { this.pushChunk(leading(file.headers, this._boundary), true)