Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

fix: Do not load all of a DAG into memory when pinning #2387

Merged
merged 1 commit into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"joi": "^14.3.1",
"just-flatten-it": "^2.1.0",
"just-safe-set": "^2.1.0",
"kind-of": "^6.0.2",
"libp2p": "~0.25.4",
Expand Down
34 changes: 0 additions & 34 deletions src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
const CID = require('cids')
const pull = require('pull-stream')
const iterToPull = require('async-iterator-to-pull-stream')
const mapAsync = require('async/map')
const setImmediate = require('async/setImmediate')
const flattenDeep = require('just-flatten-it')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down Expand Up @@ -180,38 +178,6 @@ module.exports = function dag (self) {
iterToPull(self._ipld.tree(cid, path, options)),
pull.collect(callback)
)
}),

// TODO - use IPLD selectors once they are implemented
_getRecursive: promisify((multihash, options, callback) => {
// gets flat array of all DAGNodes in tree given by multihash

if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

let cid

try {
cid = new CID(multihash)
} catch (err) {
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
}

self.dag.get(cid, '', options, (err, res) => {
if (err) { return callback(err) }

mapAsync(res.value.Links, (link, cb) => {
self.dag._getRecursive(link.Hash, options, cb)
}, (err, nodes) => {
// console.log('nodes:', nodes)
if (err) return callback(err)
callback(null, flattenDeep([res.value, nodes]))
})
})
})
}
}
4 changes: 2 additions & 2 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module.exports = (self) => {

// entire graph of nested links should be pinned,
// so make sure we have all the objects
dag._getRecursive(key, { preload: options.preload }, (err) => {
pinManager.fetchCompleteDag(key, { preload: options.preload }, (err) => {
if (err) { return cb(err) }
// found all objects, we can add the pin
return cb(null, key)
Expand Down Expand Up @@ -242,7 +242,7 @@ module.exports = (self) => {
)
}
if (type === PinTypes.indirect || type === PinTypes.all) {
pinManager.getIndirectKeys((err, indirects) => {
pinManager.getIndirectKeys(options, (err, indirects) => {
if (err) { return callback(err) }
pins = pins
// if something is pinned both directly and indirectly,
Expand Down
72 changes: 49 additions & 23 deletions src/core/components/pin/pin-manager.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
const { DAGNode, DAGLink } = require('ipld-dag-pb')
const CID = require('cids')
const map = require('async/map')
const series = require('async/series')
const parallel = require('async/parallel')
const eachLimit = require('async/eachLimit')
const waterfall = require('async/waterfall')
const detectLimit = require('async/detectLimit')
const queue = require('async/queue')
const { Key } = require('interface-datastore')
const errCode = require('err-code')
const multicodec = require('multicodec')
Expand Down Expand Up @@ -43,6 +43,34 @@ class PinManager {
this.recursivePins = new Set()
}

_walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
const q = queue(({ cid }, done) => {
this.dag.get(cid, { preload }, (err, result) => {
if (err) {
return done(err)
}

onCid(cid)

if (result.value.Links) {
q.push(result.value.Links.map(link => ({
cid: link.Hash
})))
}

done()
})
}, concurrencyLimit)
q.drain = () => {
cb()
}
q.error = (err) => {
q.kill()
cb(err)
}
q.push({ cid })
}

directKeys () {
return Array.from(this.directPins, key => new CID(key).buffer)
}
Expand All @@ -51,30 +79,21 @@ class PinManager {
return Array.from(this.recursivePins, key => new CID(key).buffer)
}

getIndirectKeys (callback) {
getIndirectKeys ({ preload }, callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to preload when getting indirect keys?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As implemented we leave it up to the caller and it's an undocumented option that defaults to false.

In general I think we want to be really careful with preloading because you can end up inadvertently sending huge amounts of data over the wire, maxing out the number of allowed connections, etc.

const indirectKeys = new Set()
eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => {
this.dag._getRecursive(multihash, (err, nodes) => {
if (err) {
return cb(err)
}

map(nodes, (node, cb) => util.cid(util.serialize(node), {
cidVersion: 0
}).then(cid => cb(null, cid), cb), (err, cids) => {
if (err) {
return cb(err)
this._walkDag({
cid: new CID(multihash),
preload: preload || false,
onCid: (cid) => {
cid = cid.toString()

// recursive pins pre-empt indirect pins
if (!this.recursivePins.has(cid)) {
indirectKeys.add(cid)
}

cids
.map(cid => cid.toString())
// recursive pins pre-empt indirect pins
.filter(key => !this.recursivePins.has(key))
.forEach(key => indirectKeys.add(key))

cb()
})
})
}
}, cb)
}, (err) => {
if (err) { return callback(err) }
callback(null, Array.from(indirectKeys))
Expand Down Expand Up @@ -283,6 +302,13 @@ class PinManager {
})
}

fetchCompleteDag (cid, options, callback) {
this._walkDag({
cid,
preload: options.preload
}, callback)
}

// Returns an error if the pin type is invalid
static checkPinType (type) {
if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) {
Expand Down