Skip to content

Optimize replication by sending multiple smaller requests to the server #2961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions lib/persisted-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough;
var utils = require('./utils');
var REPLICATION_CHUNK_SIZE = -1;

module.exports = function(registry) {
var Model = registry.getModel('Model');
Expand Down Expand Up @@ -1192,6 +1193,11 @@ module.exports = function(registry) {
var Change = sourceModel.getChangeModel();
var TargetChange = targetModel.getChangeModel();
var changeTrackingEnabled = Change && TargetChange;
var replicationChunkSize = REPLICATION_CHUNK_SIZE;

if (sourceModel.settings && sourceModel.settings.replicationChunkSize) {
replicationChunkSize = sourceModel.settings.replicationChunkSize;
}

assert(
changeTrackingEnabled,
Expand All @@ -1211,7 +1217,13 @@ module.exports = function(registry) {
async.waterfall(tasks, done);

function getSourceChanges(cb) {
sourceModel.changes(since.source, options.filter, debug.enabled ? log : cb);
utils.downloadInChunks(
options.filter,
replicationChunkSize,
function(filter, pagingCallback) {
sourceModel.changes(since.source, filter, pagingCallback);
},
debug.enabled ? log : cb);

function log(err, result) {
if (err) return cb(err);
Expand All @@ -1222,7 +1234,13 @@ module.exports = function(registry) {
}

function getDiffFromTarget(sourceChanges, cb) {
targetModel.diff(since.target, sourceChanges, debug.enabled ? log : cb);
utils.uploadInChunks(
sourceChanges,
replicationChunkSize,
function(smallArray, chunkCallback) {
return targetModel.diff(since.target, smallArray, chunkCallback);
},
debug.enabled ? log : cb);

function log(err, result) {
if (err) return cb(err);
Expand All @@ -1241,9 +1259,16 @@ module.exports = function(registry) {
function createSourceUpdates(_diff, cb) {
diff = _diff;
diff.conflicts = diff.conflicts || [];

if (diff && diff.deltas && diff.deltas.length) {
debug('\tbuilding a list of updates');
sourceModel.createUpdates(diff.deltas, cb);
utils.uploadInChunks(
diff.deltas,
replicationChunkSize,
function(smallArray, chunkCallback) {
return sourceModel.createUpdates(smallArray, chunkCallback);
},
cb);
} else {
// nothing to replicate
done();
Expand All @@ -1253,20 +1278,29 @@ module.exports = function(registry) {
function bulkUpdate(_updates, cb) {
debug('\tstarting bulk update');
updates = _updates;
targetModel.bulkUpdate(updates, options, function(err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
utils.uploadInChunks(
updates,
replicationChunkSize,
function(smallArray, chunkCallback) {
return targetModel.bulkUpdate(smallArray, options, function(err) {
// bulk update is a special case where we want to process all chunks and aggregate all errors
chunkCallback(null, err);
});
return cb();
}
cb(err);
});
},
function(notUsed, err) {
var conflicts = err && err.details && err.details.conflicts;
if (conflicts && err.statusCode == 409) {
diff.conflicts = conflicts;
// filter out updates that were not applied
updates = updates.filter(function(u) {
return conflicts
.filter(function(d) { return d.modelId === u.change.modelId; })
.length === 0;
});
return cb();
}
cb(err);
});
}

function checkpoints() {
Expand Down
120 changes: 120 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
// License text available at https://opensource.org/licenses/MIT

'use strict';

exports.createPromiseCallback = createPromiseCallback;
exports.uploadInChunks = uploadInChunks;
exports.downloadInChunks = downloadInChunks;
exports.concatResults = concatResults;

var Promise = require('bluebird');
var async = require('async');

function createPromiseCallback() {
var cb;
Expand All @@ -18,3 +24,117 @@ function createPromiseCallback() {
cb.promise = promise;
return cb;
}

function throwPromiseNotDefined() {
throw new Error(
'Your Node runtime does support ES6 Promises. ' +
'Set "global.Promise" to your preferred implementation of promises.');
}

/**
* Divide an async call with large array into multiple calls using smaller chunks
* @param {Array} largeArray - the large array to be chunked
* @param {Number} chunkSize - size of each chunks
* @param {Function} processFunction - the function to be called multiple times
* @param {Function} cb - the callback
*/
function uploadInChunks(largeArray, chunkSize, processFunction, cb) {
var chunkArrays = [];

if (!chunkSize || chunkSize < 1 || largeArray.length <= chunkSize) {
// if chunking not required
processFunction(largeArray, cb);
} else {
// copying so that the largeArray object does not get affected during splice
var copyOfLargeArray = [].concat(largeArray);

// chunking to smaller arrays
while (copyOfLargeArray.length > 0) {
chunkArrays.push(copyOfLargeArray.splice(0, chunkSize));
}

var tasks = chunkArrays.map(function(chunkArray) {
return function(previousResults, chunkCallback) {
var lastArg = arguments[arguments.length - 1];

if (typeof lastArg === 'function') {
chunkCallback = lastArg;
}

processFunction(chunkArray, function(err, results) {
if (err) {
return chunkCallback(err);
}

// if this is the first async waterfall call or if previous results was not defined
if (typeof previousResults === 'function' || typeof previousResults === 'undefined' ||
previousResults === null) {
previousResults = results;
} else if (results) {
previousResults = concatResults(previousResults, results);
}

chunkCallback(err, previousResults);
});
};
});

async.waterfall(tasks, cb);
}
}

/**
* Page async download calls
* @param {Object} filter - filter object used for the async call
* @param {Number} chunkSize - size of each chunks
* @param {Function} processFunction - the function to be called multiple times
* @param {Function} cb - the callback
*/
function downloadInChunks(filter, chunkSize, processFunction, cb) {
var results = [];
filter = filter ? JSON.parse(JSON.stringify(filter)) : {};

if (!chunkSize || chunkSize < 1) {
// if chunking not required
processFunction(filter, cb);
} else {
filter.skip = 0;
filter.limit = chunkSize;

processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults);
}

function pageAndConcatResults(err, pagedResults) {
if (err) {
return cb(err);
} else {
results = concatResults(results, pagedResults);
if (pagedResults.length >= chunkSize) {
filter.skip += pagedResults.length;
processFunction(JSON.parse(JSON.stringify(filter)), pageAndConcatResults);
} else {
cb(null, results);
}
}
}
}

/**
* Concat current results into previous results
* Assumption made here that the previous results and current results are homogeneous
* @param {Object|Array} previousResults
* @param {Object|Array} currentResults
*/
function concatResults(previousResults, currentResults) {
if (Array.isArray(currentResults)) {
previousResults = previousResults.concat(currentResults);
} else if (typeof currentResults === 'object') {
Object.keys(currentResults).forEach(function(key) {
previousResults[key] = concatResults(previousResults[key], currentResults[key]);
});
} else {
previousResults = currentResults;
}

return previousResults;
}
Loading