Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

fix: Do not load all of a DAG into memory when pinning #2372

Merged
merged 9 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
include:
- stage: check
script:
- npx aegir build --bundlesize
# - npx aegir build --bundlesize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self we need a sister PR that re-enables this after it is merged.

- npx aegir dep-check -- -i wrtc -i electron-webrtc
- npm run lint

Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
"is-pull-stream": "~0.0.0",
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"just-flatten-it": "^2.1.0",
"just-safe-set": "^2.1.0",
"kind-of": "^6.0.2",
"libp2p": "~0.25.4",
Expand Down
34 changes: 0 additions & 34 deletions src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
const CID = require('cids')
const pull = require('pull-stream')
const iterToPull = require('async-iterator-to-pull-stream')
const mapAsync = require('async/map')
const setImmediate = require('async/setImmediate')
const flattenDeep = require('just-flatten-it')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down Expand Up @@ -180,38 +178,6 @@ module.exports = function dag (self) {
iterToPull(self._ipld.tree(cid, path, options)),
pull.collect(callback)
)
}),

// TODO - use IPLD selectors once they are implemented
_getRecursive: promisify((multihash, options, callback) => {
// gets flat array of all DAGNodes in tree given by multihash

if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

let cid

try {
cid = new CID(multihash)
} catch (err) {
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
}

self.dag.get(cid, '', options, (err, res) => {
if (err) { return callback(err) }

mapAsync(res.value.Links, (link, cb) => {
self.dag._getRecursive(link.Hash, options, cb)
}, (err, nodes) => {
// console.log('nodes:', nodes)
if (err) return callback(err)
callback(null, flattenDeep([res.value, nodes]))
})
})
})
}
}
81 changes: 53 additions & 28 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
'use strict'

const promisify = require('promisify-es6')
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
const { DAGNode, DAGLink } = require('ipld-dag-pb')
const CID = require('cids')
const map = require('async/map')
const mapSeries = require('async/mapSeries')
Expand All @@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit')
const waterfall = require('async/waterfall')
const detectLimit = require('async/detectLimit')
const setImmediate = require('async/setImmediate')
const queue = require('async/queue')
const { Key } = require('interface-datastore')
const errCode = require('err-code')
const multibase = require('multibase')
Expand Down Expand Up @@ -52,30 +53,50 @@ module.exports = (self) => {
const recursiveKeys = () =>
Array.from(recursivePins).map(key => new CID(key).buffer)

function getIndirectKeys (callback) {
const indirectKeys = new Set()
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
dag._getRecursive(multihash, (err, nodes) => {
function walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
const q = queue(function ({ cid }, done) {
dag.get(cid, { preload }, function (err, result) {
if (err) {
return cb(err)
return done(err)
}

map(nodes, (node, cb) => util.cid(util.serialize(node), {
cidVersion: 0
}).then(cid => cb(null, cid), cb), (err, cids) => {
if (err) {
return cb(err)
}
onCid(cid)

cids
.map(cid => cid.toString())
// recursive pins pre-empt indirect pins
.filter(key => !recursivePins.has(key))
.forEach(key => indirectKeys.add(key))
if (result.value.Links) {
q.push(result.value.Links.map(link => ({
cid: link.Hash
})))
}

cb()
})
done()
})
}, concurrencyLimit)
q.drain = () => {
cb()
}
q.error = (err) => {
q.kill()
cb(err)
}
q.push({ cid })
}

function getIndirectKeys ({ preload }, callback) {
const indirectKeys = new Set()
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
// load every hash in the graph
walkDag({
cid: new CID(multihash),
preload: preload || false,
onCid: (cid) => {
cid = cid.toString()

// recursive pins pre-empt indirect pins
if (!recursivePins.has(cid)) {
indirectKeys.add(cid)
}
}
}, cb)
}, (err) => {
if (err) { return callback(err) }
callback(null, Array.from(indirectKeys))
Expand Down Expand Up @@ -184,7 +205,9 @@ module.exports = (self) => {

// verify that each hash can be pinned
map(mhs, (multihash, cb) => {
const key = toB58String(multihash)
const cid = new CID(multihash)
const key = cid.toBaseEncodedString()

if (recursive) {
if (recursivePins.has(key)) {
// it's already pinned recursively
Expand All @@ -193,11 +216,11 @@ module.exports = (self) => {

// entire graph of nested links should be pinned,
// so make sure we have all the objects
dag._getRecursive(key, { preload: options.preload }, (err) => {
if (err) { return cb(err) }
// found all objects, we can add the pin
return cb(null, key)
})
walkDag({
dag,
cid,
preload: options.preload
}, (err) => cb(err, key))
} else {
if (recursivePins.has(key)) {
// recursive supersedes direct, can't have both
Expand All @@ -209,8 +232,10 @@ module.exports = (self) => {
}

// make sure we have the object
dag.get(new CID(multihash), { preload: options.preload }, (err) => {
if (err) { return cb(err) }
dag.get(cid, { preload: options.preload }, (err) => {
if (err) {
return cb(err)
}
// found the object, we can add the pin
return cb(null, key)
})
Expand Down Expand Up @@ -374,7 +399,7 @@ module.exports = (self) => {
)
}
if (type === types.indirect || type === types.all) {
getIndirectKeys((err, indirects) => {
getIndirectKeys(options, (err, indirects) => {
if (err) { return callback(err) }
pins = pins
// if something is pinned both directly and indirectly,
Expand Down