From 0b2a79dbe331d9be66cb8e5fee602678606bd3ab Mon Sep 17 00:00:00 2001 From: haad Date: Thu, 24 Nov 2016 12:56:07 +0100 Subject: [PATCH 1/8] refactor(streams): Refactor response stream handling. Add dagnode-stream to transform file results to DAGNode objects. Add stream-to-value to convert a response stream to a single value. Refactor request-api so that chunked JSON objects are not buffered anymore. This touches a bunch of the files pushing the transform function down to the individual commands. --- src/api/add.js | 18 +++-- src/api/dht.js | 7 +- src/api/get.js | 15 ++-- src/api/ping.js | 32 ++++++-- src/api/refs.js | 16 +++- src/api/util/fs-add.js | 12 +-- src/api/util/url-add.js | 12 +-- src/dagnode-stream.js | 55 ++++++++++++++ src/get-dagnode.js | 6 +- src/request-api.js | 103 ++++++++------------------ src/stream-to-json-value.js | 34 +++++++++ src/stream-to-value.js | 12 +++ src/stringlist-to-array.js | 9 +++ src/tar-stream-to-objects.js | 69 +++++++++-------- test/interface-ipfs-core/ping.spec.js | 8 ++ test/interface-ipfs-core/refs.spec.js | 1 - test/ipfs-api/util.spec.js | 1 - test/setup/spawn-daemons.js | 8 +- 18 files changed, 269 insertions(+), 149 deletions(-) create mode 100644 src/dagnode-stream.js create mode 100644 src/stream-to-json-value.js create mode 100644 src/stream-to-value.js create mode 100644 src/stringlist-to-array.js diff --git a/src/api/add.js b/src/api/add.js index 1379a4296..1b41f7578 100644 --- a/src/api/add.js +++ b/src/api/add.js @@ -1,24 +1,26 @@ 'use strict' const isStream = require('isstream') -const addToDagNodesTransform = require('../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../dagnode-stream') module.exports = (send) => { return promisify((files, callback) => { - const good = Buffer.isBuffer(files) || + const ok = Buffer.isBuffer(files) || isStream.isReadable(files) || Array.isArray(files) - if (!good) { - callback(new Error('"files" must be a buffer, readable stream, or array of objects')) + if (!ok) { + return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - return sendWithTransform({ + const request = { path: 'add', files: files - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/dht.js b/src/api/dht.js index fe42146ec..dd78e7e0e 100644 --- a/src/api/dht.js +++ b/src/api/dht.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -19,11 +20,13 @@ module.exports = (send) => { opts = {} } - send({ + const request = { path: 'dht/findprovs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }), get: promisify((key, opts, callback) => { if (typeof opts === 'function' && diff --git a/src/api/get.js b/src/api/get.js index fa7ba1250..9634b7235 100644 --- a/src/api/get.js +++ b/src/api/get.js @@ -1,11 +1,11 @@ 'use strict' -const tarStreamToObjects = require('../tar-stream-to-objects') -const cleanMultihash = require('../clean-multihash') const promisify = require('promisify-es6') +const cleanMultihash = require('../clean-multihash') +const TarStreamToObjects = require('../tar-stream-to-objects') module.exports = (send) => { - return promisify(function get (path, opts, callback) { + return promisify((path, opts, callback) => { if (typeof opts === 'function' && !callback) { callback = opts @@ -26,12 +26,13 @@ module.exports = (send) => { return callback(err) } - var sendWithTransform = send.withTransform(tarStreamToObjects) - - sendWithTransform({ + const request = { path: 'get', args: path, qs: opts - }, callback) + } + + // Convert the response stream to TarStream objects + send.andTransform(request, TarStreamToObjects.from, callback) }) } diff --git a/src/api/ping.js b/src/api/ping.js index 5e7c74f6f..eeaa3125a 100644 --- a/src/api/ping.js +++ b/src/api/ping.js @@ -1,18 +1,36 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return promisify((id, callback) => { - send({ + const request = { path: 'ping', args: id, qs: { n: 1 } - }, function (err, res) { - if (err) { - return callback(err, null) - } - callback(null, res[1]) - }) + } + + // Transform the response stream to a value: + // { Success: , Time: , Text: } + const transform = (res, callback) => { + streamToValue(res, (err, res) => { + if (err) { + return callback(err) + } + + // go-ipfs http api currently returns 3 lines for a ping. + // they're a little messed, so take the correct values from each lines. + const pingResult = { + Success: res[1].Success, + Time: res[1].Time, + Text: res[2].Text + } + + callback(null, pingResult) + }) + } + + send.andTransform(request, transform, callback) }) } diff --git a/src/api/refs.js b/src/api/refs.js index 318ccb59f..56958ca7f 100644 --- a/src/api/refs.js +++ b/src/api/refs.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { const refs = promisify((args, opts, callback) => { @@ -8,21 +9,28 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) + refs.local = promisify((opts, callback) => { if (typeof (opts) === 'function') { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) return refs diff --git a/src/api/util/fs-add.js b/src/api/util/fs-add.js index c78dda13a..6c63cf1ab 100644 --- a/src/api/util/fs-add.js +++ b/src/api/util/fs-add.js @@ -1,8 +1,8 @@ 'use strict' const isNode = require('detect-node') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((path, opts, callback) => { @@ -28,12 +28,14 @@ module.exports = (send) => { return callback(new Error('"path" must be a string')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - sendWithTransform({ + const request = { path: 'add', qs: opts, files: path - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/util/url-add.js b/src/api/util/url-add.js index 7dc7694b2..7db525ad7 100644 --- a/src/api/util/url-add.js +++ b/src/api/util/url-add.js @@ -3,9 +3,8 @@ const promisify = require('promisify-es6') const once = require('once') const parseUrl = require('url').parse - const request = require('../../request') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((url, opts, callback) => { @@ -28,7 +27,6 @@ module.exports = (send) => { return callback(new Error('"url" param must be an http(s) url')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) callback = once(callback) request(parseUrl(url).protocol)(url, (res) => { @@ -37,11 +35,15 @@ module.exports = (send) => { return callback(new Error(`Failed to download with ${res.statusCode}`)) } - sendWithTransform({ + const params = { path: 'add', qs: opts, files: res - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(params, transform, callback) }).end() }) } diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js new file mode 100644 index 000000000..35cb12c44 --- /dev/null +++ b/src/dagnode-stream.js @@ -0,0 +1,55 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const streamToValue = require('./stream-to-value') +const getDagNode = require('./get-dagnode') + +/* + Transforms a stream of objects to DAGNodes and outputs them as objects. + + Usage: inputStream.pipe(DAGNodeStream({ send: send })) + + Input object format: + { + Name: '/path/to/file/foo.txt', + Hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP' + } + + Output object format: + { + path: '/path/to/file/foo.txt', + hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP', + size: 20 + } +*/ +class DAGNodeStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + this._send = opts.send + } + + static streamToValue (send, inputStream, callback) { + const outputStream = inputStream.pipe(new DAGNodeStream({ send: send })) + streamToValue(outputStream, callback) + } + + _transform (obj, enc, callback) { + getDagNode(this._send, obj.Hash, (err, node) => { + if (err) { + return callback(err) + } + + const dag = { + path: obj.Name, + hash: obj.Hash, + size: node.size + } + + this.push(dag) + callback(null) + }) + } +} + +module.exports = DAGNodeStream diff --git a/src/get-dagnode.js b/src/get-dagnode.js index 75cd0886c..d9942d99c 100644 --- a/src/get-dagnode.js +++ b/src/get-dagnode.js @@ -1,8 +1,8 @@ 'use strict' const DAGNode = require('ipld-dag-pb').DAGNode -const bl = require('bl') const parallel = require('async/parallel') +const streamToValue = require('./stream-to-value') module.exports = function (send, hash, callback) { // Retrieve the object and its data in parallel, then produce a DAGNode @@ -36,12 +36,12 @@ module.exports = function (send, hash, callback) { if (Buffer.isBuffer(stream)) { DAGNode.create(stream, object.Links, callback) } else { - stream.pipe(bl(function (err, data) { + streamToValue(stream, (err, data) => { if (err) { return callback(err) } DAGNode.create(data, object.Links, callback) - })) + }) } }) } diff --git a/src/request-api.js b/src/request-api.js index 5eb150269..b980bc4ac 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -1,50 +1,28 @@ 'use strict' const Qs = require('qs') -const ndjson = require('ndjson') const isNode = require('detect-node') +const ndjson = require('ndjson') const once = require('once') -const concat = require('concat-stream') - const getFilesStream = require('./get-files-stream') +const streamToValue = require('./stream-to-value') +const streamToJsonValue = require('./stream-to-json-value') const request = require('./request') // -- Internal -function parseChunkedJson (res, cb) { - res - .pipe(ndjson.parse()) - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseRaw (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseJson (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => { - if (!data || data.length === 0) { - return cb() - } - - if (Buffer.isBuffer(data)) { - data = data.toString() - } - - let res - try { - res = JSON.parse(data) - } catch (err) { - return cb(err) - } - - cb(null, res) - })) +function parseError (res, cb) { + const error = new Error(`Server responded with ${res.statusCode}`) + streamToJsonValue(res, (err, payload) => { + if (err) { + return cb(err) + } + if (payload) { + error.code = payload.Code + error.message = payload.Message || payload.toString() + } + cb(error) + }) } function onRes (buffer, cb) { @@ -55,33 +33,26 @@ function onRes (buffer, cb) { res.headers['content-type'].indexOf('application/json') === 0 if (res.statusCode >= 400 || !res.statusCode) { - const error = new Error(`Server responded with ${res.statusCode}`) - - parseJson(res, (err, payload) => { - if (err) { - return cb(err) - } - if (payload) { - error.code = payload.Code - error.message = payload.Message || payload.toString() - } - cb(error) - }) + return parseError(res, cb) } + // Return the response stream directly if (stream && !buffer) { return cb(null, res) } + // Return a stream of JSON objects if (chunkedObjects && isJson) { - return parseChunkedJson(res, cb) + return cb(null, res.pipe(ndjson.parse())) } + // Return a JSON object if (isJson) { - return parseJson(res, cb) + return streamToJsonValue(res, cb) } - parseRaw(res, cb) + // Return a value + return streamToValue(res, cb) } } @@ -163,7 +134,7 @@ function requestAPI (config, options, callback) { // // -- Module Interface -exports = module.exports = function getRequestAPI (config) { +exports = module.exports = (config) => { /* * options: { * path: // API path (like /add or /config) - type: string @@ -173,7 +144,7 @@ exports = module.exports = function getRequestAPI (config) { * buffer: // buffer the request before sending it - type: bool * } */ - const send = function (options, callback) { + const send = (options, callback) => { if (typeof options !== 'object') { return callback(new Error('no options were passed')) } @@ -181,25 +152,13 @@ exports = module.exports = function getRequestAPI (config) { return requestAPI(config, options, callback) } - // Wraps the 'send' function such that an asynchronous - // transform may be applied to its result before - // passing it on to either its callback or promise. - send.withTransform = function (transform) { - return function (options, callback) { - if (typeof options !== 'object') { - return callback(new Error('no options were passed')) + send.andTransform = (options, transform, callback) => { + return send(options, (err, res) => { + if (err) { + return callback(err) } - - send(options, wrap(callback)) - - function wrap (func) { - if (func) { - return function (err, res) { - transform(err, res, send, func) - } - } - } - } + transform(res, callback) + }) } return send diff --git a/src/stream-to-json-value.js b/src/stream-to-json-value.js new file mode 100644 index 000000000..e42de2fc6 --- /dev/null +++ b/src/stream-to-json-value.js @@ -0,0 +1,34 @@ +'use strict' + +const streamToValue = require('./stream-to-value') + +/* + Converts a stream to a single JSON value +*/ +function streamToJsonValue (res, cb) { + streamToValue(res, (err, data) => { + if (err) { + return cb(err) + } + + if (!data || data.length === 0) { + return cb() + } + + // TODO: check if needed, afaik JSON.parse can parse Buffers + if (Buffer.isBuffer(data)) { + data = data.toString() + } + + let res + try { + res = JSON.parse(data) + } catch (err) { + return cb(err) + } + + cb(null, res) + }) +} + +module.exports = streamToJsonValue diff --git a/src/stream-to-value.js b/src/stream-to-value.js new file mode 100644 index 000000000..66e8e2ad6 --- /dev/null +++ b/src/stream-to-value.js @@ -0,0 +1,12 @@ +'use strict' + +const concat = require('concat-stream') + +/* + Concatenate a stream to a single value. +*/ +function streamToValue (res, callback) { + res.pipe(concat((data) => callback(null, data))) +} + +module.exports = streamToValue diff --git a/src/stringlist-to-array.js b/src/stringlist-to-array.js new file mode 100644 index 000000000..df28ee6df --- /dev/null +++ b/src/stringlist-to-array.js @@ -0,0 +1,9 @@ +'use strict' + +// Converts a go-ipfs "stringList" to an array +// { Strings: ['A', 'B'] } --> ['A', 'B'] +function stringlistToArray (res, cb) { + cb(null, res.Strings || []) +} + +module.exports = stringlistToArray diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index acab14658..aabea2745 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -1,38 +1,45 @@ 'use strict' const tar = require('tar-stream') -const Readable = require('readable-stream') +const ReadableStream = require('readable-stream').Readable +/* + Transform tar stream into a stream of objects: -// transform tar stream into readable stream of -// { path: 'string', content: Readable } -module.exports = (err, res, send, done) => { - if (err) { - return done(err) + Output format: + { path: 'string', content: Readable } +*/ +class TarStreamToObjects extends ReadableStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) } - const objStream = new Readable({ objectMode: true }) - objStream._read = function noop () {} - - res - .pipe(tar.extract()) - .on('entry', (header, stream, next) => { - stream.on('end', next) - - if (header.type !== 'directory') { - objStream.push({ - path: header.name, - content: stream - }) - } else { - objStream.push({ - path: header.name - }) - stream.resume() - } - }) - .on('finish', () => { - objStream.push(null) - }) - - done(null, objStream) + static from (inputStream, callback) { + let outputStream = new TarStreamToObjects() + + inputStream + .pipe(tar.extract()) + .on('entry', (header, stream, next) => { + stream.on('end', next) + + if (header.type !== 'directory') { + outputStream.push({ + path: header.name, + content: stream + }) + } else { + outputStream.push({ + path: header.name + }) + stream.resume() + } + }) + .on('finish', () => outputStream.push(null)) + + callback(null, outputStream) + } + + _read () {} } + +module.exports = TarStreamToObjects diff --git a/test/interface-ipfs-core/ping.spec.js b/test/interface-ipfs-core/ping.spec.js index bf4e2e606..a7dbb4a28 100644 --- a/test/interface-ipfs-core/ping.spec.js +++ b/test/interface-ipfs-core/ping.spec.js @@ -12,6 +12,10 @@ describe('.ping', () => { apiClients.a.ping(id.id, (err, res) => { expect(err).to.not.exist expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') done() }) }) @@ -25,6 +29,10 @@ describe('.ping', () => { }) .then((res) => { expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') }) }) }) diff --git a/test/interface-ipfs-core/refs.spec.js b/test/interface-ipfs-core/refs.spec.js index 5b33662b0..3a38abe1f 100644 --- a/test/interface-ipfs-core/refs.spec.js +++ b/test/interface-ipfs-core/refs.spec.js @@ -65,7 +65,6 @@ describe('.refs', () => { ipfs.refs(folder, {format: ' '}, (err, objs) => { expect(err).to.not.exist expect(objs).to.eql(result) - done() }) }) diff --git a/test/ipfs-api/util.spec.js b/test/ipfs-api/util.spec.js index e2e9dede2..d103844bd 100644 --- a/test/ipfs-api/util.spec.js +++ b/test/ipfs-api/util.spec.js @@ -65,7 +65,6 @@ describe('.util', () => { ipfs.util.addFromFs(filePath, (err, result) => { expect(err).to.not.exist expect(result.length).to.be.above(5) - done() }) }) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 5a2d3b659..2131fed82 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -28,9 +28,11 @@ function startDisposableDaemons (callback) { const configValues = { Bootstrap: [], Discovery: {}, - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + API: { + 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + } } eachSeries(Object.keys(configValues), (configKey, cb) => { From ecdbf6a6d77b873775e59eb352a9a6bd28d6c31e Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 09:23:35 +0100 Subject: [PATCH 2/8] Improve function descriptions --- src/dagnode-stream.js | 3 ++- src/request-api.js | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 35cb12c44..150ddfff6 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -5,7 +5,8 @@ const streamToValue = require('./stream-to-value') const getDagNode = require('./get-dagnode') /* - Transforms a stream of objects to DAGNodes and outputs them as objects. + Transforms a stream of {Name, Hash} objects to include size + of the DAG object. Usage: inputStream.pipe(DAGNodeStream({ send: send })) diff --git a/src/request-api.js b/src/request-api.js index b980bc4ac..e7389d021 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -152,6 +152,10 @@ exports = module.exports = (config) => { return requestAPI(config, options, callback) } + // Send a HTTP request and pass via a transform function + // to convert the response data to wanted format before + // returning it to the callback. + // Eg. send.andTransform({}, (e) => JSON.parse(e), (err, res) => ...) send.andTransform = (options, transform, callback) => { return send(options, (err, res) => { if (err) { From 738056fc294ad85d15a3082a93706ea32605bd55 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 09:57:42 +0100 Subject: [PATCH 3/8] Change variable name to be more descriptive in dagnode-stream --- src/dagnode-stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 150ddfff6..19b9b073e 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -41,13 +41,13 @@ class DAGNodeStream extends TransformStream { return callback(err) } - const dag = { + const result = { path: obj.Name, hash: obj.Hash, size: node.size } - this.push(dag) + this.push(result) callback(null) }) } From eb8822714abb4db1e0c01b7e45063d98835c79df Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 10:05:21 +0100 Subject: [PATCH 4/8] Refactor tar-stream-to-objects to better show intent --- src/api/get.js | 2 +- src/tar-stream-to-objects.js | 63 ++++++++++++++++++------------------ 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/api/get.js b/src/api/get.js index 9634b7235..3b4c0c983 100644 --- a/src/api/get.js +++ b/src/api/get.js @@ -33,6 +33,6 @@ module.exports = (send) => { } // Convert the response stream to TarStream objects - send.andTransform(request, TarStreamToObjects.from, callback) + send.andTransform(request, TarStreamToObjects, callback) }) } diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index aabea2745..b68c887fb 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -2,44 +2,45 @@ const tar = require('tar-stream') const ReadableStream = require('readable-stream').Readable -/* - Transform tar stream into a stream of objects: - Output format: - { path: 'string', content: Readable } -*/ -class TarStreamToObjects extends ReadableStream { +class ObjectsStreams extends ReadableStream { constructor (options) { const opts = Object.assign(options || {}, { objectMode: true }) super(opts) } - static from (inputStream, callback) { - let outputStream = new TarStreamToObjects() - - inputStream - .pipe(tar.extract()) - .on('entry', (header, stream, next) => { - stream.on('end', next) - - if (header.type !== 'directory') { - outputStream.push({ - path: header.name, - content: stream - }) - } else { - outputStream.push({ - path: header.name - }) - stream.resume() - } - }) - .on('finish', () => outputStream.push(null)) - - callback(null, outputStream) - } + _read () {} +} - _read () {} +/* + Transform a tar stream into a stream of objects: + + Output format: + { path: 'string', content: Stream } +*/ +const TarStreamToObjects = (inputStream, callback) => { + let outputStream = new ObjectsStreams() + + inputStream + .pipe(tar.extract()) + .on('entry', (header, stream, next) => { + stream.on('end', next) + + if (header.type !== 'directory') { + outputStream.push({ + path: header.name, + content: stream + }) + } else { + outputStream.push({ + path: header.name + }) + stream.resume() + } + }) + .on('finish', () => outputStream.push(null)) + + callback(null, outputStream) } module.exports = TarStreamToObjects From 25061cfc690a9daed12a8bcebcec78543acf9349 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 19 Dec 2016 12:31:40 +0100 Subject: [PATCH 5/8] Use stream-to-value in Block command instead of bl --- src/api/block.js | 55 ++++++++++++++++++++---------------- src/api/log.js | 4 ++- src/api/object.js | 4 +-- src/dagnode-stream.js | 5 +++- src/request-api.js | 4 ++- src/stream-to-value.js | 6 +++- src/tar-stream-to-objects.js | 8 ++++-- 7 files changed, 52 insertions(+), 34 deletions(-) diff --git a/src/api/block.js b/src/api/block.js index 35cc8af3e..73d745063 100644 --- a/src/api/block.js +++ b/src/api/block.js @@ -1,10 +1,10 @@ 'use strict' const promisify = require('promisify-es6') -const bl = require('bl') const Block = require('ipfs-block') const multihash = require('multihashes') const CID = require('cids') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -21,25 +21,27 @@ module.exports = (send) => { opts = {} } - return send({ - path: 'block/get', - args: args, - qs: opts - }, (err, res) => { - if (err) { - return callback(err) - } + // Transform the response from Buffer or a Stream to a Block + const transform = (res, callback) => { if (Buffer.isBuffer(res)) { callback(null, new Block(res)) } else { - res.pipe(bl((err, data) => { + streamToValue(res, (err, data) => { if (err) { return callback(err) } callback(null, new Block(data)) - })) + }) } - }) + } + + const request = { + path: 'block/get', + args: args, + qs: opts + } + + send.andTransform(request, transform, callback) }), stat: promisify((args, opts, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -51,19 +53,22 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'block/stat', args: args, qs: opts - }, (err, stats) => { - if (err) { - return callback(err) - } + } + + // Transform the response from { Key, Size } objects to { key, size } objects + const transform = (stats, callback) => { callback(null, { key: stats.Key, size: stats.Size }) - }) + } + + send.andTransform(request, transform, callback) }), put: promisify((block, cid, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -81,15 +86,15 @@ module.exports = (send) => { block = block.data } - return send({ + const request = { path: 'block/put', files: block - }, (err, blockInfo) => { - if (err) { - return callback(err) - } - callback(null, new Block(block)) - }) + } + + // Transform the response to a Block + const transform = (blockInfo, callback) => callback(null, new Block(block)) + + send.andTransform(request, transform, callback) }) } } diff --git a/src/api/log.js b/src/api/log.js index f7a77aafd..74c750d5e 100644 --- a/src/api/log.js +++ b/src/api/log.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const ndjson = require('ndjson') const promisify = require('promisify-es6') @@ -12,7 +13,8 @@ module.exports = (send) => { if (err) { return callback(err) } - callback(null, response.pipe(ndjson.parse())) + const outputStream = pump(response, ndjson.parse()) + callback(null, outputStream) }) }) } diff --git a/src/api/object.js b/src/api/object.js index 1f403bc03..f7cd675a8 100644 --- a/src/api/object.js +++ b/src/api/object.js @@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode const DAGLink = dagPB.DAGLink const promisify = require('promisify-es6') const bs58 = require('bs58') -const bl = require('bl') +const streamToValue = require('../stream-to-value') const cleanMultihash = require('../clean-multihash') const LRU = require('lru-cache') const lruOptions = { @@ -188,7 +188,7 @@ module.exports = (send) => { } if (typeof result.pipe === 'function') { - result.pipe(bl(callback)) + streamToValue(result, callback) } else { callback(null, result) } diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index 19b9b073e..bba259cb3 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const TransformStream = require('readable-stream').Transform const streamToValue = require('./stream-to-value') const getDagNode = require('./get-dagnode') @@ -31,7 +32,9 @@ class DAGNodeStream extends TransformStream { } static streamToValue (send, inputStream, callback) { - const outputStream = inputStream.pipe(new DAGNodeStream({ send: send })) + const outputStream = pump(inputStream, new DAGNodeStream({ send: send }), (err) => { + if (err) callback(err) + }) streamToValue(outputStream, callback) } diff --git a/src/request-api.js b/src/request-api.js index e7389d021..b12e44cdd 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -3,6 +3,7 @@ const Qs = require('qs') const isNode = require('detect-node') const ndjson = require('ndjson') +const pump = require('pump') const once = require('once') const getFilesStream = require('./get-files-stream') const streamToValue = require('./stream-to-value') @@ -43,7 +44,8 @@ function onRes (buffer, cb) { // Return a stream of JSON objects if (chunkedObjects && isJson) { - return cb(null, res.pipe(ndjson.parse())) + const outputStream = pump(res, ndjson.parse()) + return cb(null, outputStream) } // Return a JSON object diff --git a/src/stream-to-value.js b/src/stream-to-value.js index 66e8e2ad6..6cf0c1ec5 100644 --- a/src/stream-to-value.js +++ b/src/stream-to-value.js @@ -1,12 +1,16 @@ 'use strict' +const pump = require('pump') const concat = require('concat-stream') /* Concatenate a stream to a single value. */ function streamToValue (res, callback) { - res.pipe(concat((data) => callback(null, data))) + const done = (data) => callback(null, data) + pump(res, concat(done), (err) => { + if (err) callback(err) + }) } module.exports = streamToValue diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index b68c887fb..6d7765a03 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const tar = require('tar-stream') const ReadableStream = require('readable-stream').Readable @@ -9,7 +10,7 @@ class ObjectsStreams extends ReadableStream { super(opts) } - _read () {} + _read () {} } /* @@ -20,9 +21,9 @@ class ObjectsStreams extends ReadableStream { */ const TarStreamToObjects = (inputStream, callback) => { let outputStream = new ObjectsStreams() + let extractStream = tar.extract() - inputStream - .pipe(tar.extract()) + extractStream .on('entry', (header, stream, next) => { stream.on('end', next) @@ -40,6 +41,7 @@ const TarStreamToObjects = (inputStream, callback) => { }) .on('finish', () => outputStream.push(null)) + pump(inputStream, extractStream) callback(null, outputStream) } From c71eb5832647f0b6a3f8163d3de40dbea8f39732 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 19 Dec 2016 13:15:50 +0100 Subject: [PATCH 6/8] test: fix daemon config setting --- test/setup/spawn-daemons.js | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 2131fed82..a747ffc2f 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -28,15 +28,13 @@ function startDisposableDaemons (callback) { const configValues = { Bootstrap: [], Discovery: {}, - API: { - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] - } + 'API.HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'API.HTTPHeaders.Access-Control-Allow-Credentials': 'true', + 'API.HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] } eachSeries(Object.keys(configValues), (configKey, cb) => { - nodes[key].setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb) + nodes[key].setConfig(configKey, JSON.stringify(configValues[configKey]), cb) }, (err) => { if (err) { return cb(err) From 2031362adf606b4d048131717c90617fb8df3cec Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 21 Dec 2016 11:23:59 +0100 Subject: [PATCH 7/8] Fix deps after rebase and use pump 1.0.2 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0026989a0..4e482398a 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ }, "dependencies": { "async": "^2.1.4", - "bl": "^1.1.2", "bs58": "^4.0.0", "concat-stream": "^1.6.0", "detect-node": "^2.0.3", @@ -43,6 +42,7 @@ "peer-id": "^0.8.1", "peer-info": "^0.8.1", "promisify-es6": "^1.0.2", + "pump": "^1.0.2", "qs": "^6.3.0", "readable-stream": "1.1.14", "stream-http": "^2.5.0", From da14a60c4650311006e8f15db5436e4a9baa8582 Mon Sep 17 00:00:00 2001 From: haad Date: Wed, 21 Dec 2016 11:29:28 +0100 Subject: [PATCH 8/8] Add curlies around error callback --- src/dagnode-stream.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js index bba259cb3..15198649c 100644 --- a/src/dagnode-stream.js +++ b/src/dagnode-stream.js @@ -33,7 +33,9 @@ class DAGNodeStream extends TransformStream { static streamToValue (send, inputStream, callback) { const outputStream = pump(inputStream, new DAGNodeStream({ send: send }), (err) => { - if (err) callback(err) + if (err) { + callback(err) + } }) streamToValue(outputStream, callback) }