diff --git a/README.md b/README.md index 1430894b6..56857ec4e 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ So far its been tested in IE6, IE7, IE8, FF3.6 and Chrome 5. Usage: * [until](#until) * [waterfall](#waterfall) * [queue](#queue) +* [cargo](#cargo) * [auto](#auto) * [iterator](#iterator) * [apply](#apply) @@ -778,6 +779,63 @@ q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function (err) { --------------------------------------- + +### cargo(worker, [payload]) + +Creates a cargo object with the specified payload. Tasks added to the +cargo will be processed altogether (up to the payload limit). If the +worker is in progress, the task is queued until it is available. Once +the worker has completed some tasks, each callback of those tasks is called. + +__Arguments__ + +* worker(tasks, callback) - An asynchronous function for processing queued + tasks. +* payload - An optional integer for determining how many tasks should be + process per round, default is unlimited. + +__Cargo objects__ + +The cargo object returned by this function has the following properties and +methods: + +* length() - a function returning the number of items waiting to be processed. +* payload - an integer for determining how many tasks should be + process per round. This property can be changed after a cargo is created to + alter the payload on-the-fly. +* push(task, [callback]) - add a new task to the queue, the callback is called + once the worker has finished processing the task. + instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list. +* saturated - a callback that is called when the queue length hits the concurrency and further tasks will be queued +* empty - a callback that is called when the last item from the queue is given to a worker +* drain - a callback that is called when the last item from the queue has returned from the worker + +__Example__ + +```js +// create a cargo object with payload 2 + +var cargo = async.cargo(function (task, callback) { + console.log('hello ' + task.name); + callback(); +}, 2); + + +// add some items + +cargo.push({name: 'foo'}, function (err) { + console.log('finished processing foo'); +}); +cargo.push({name: 'bar'}, function (err) { + console.log('finished processing bar'); +}); +cargo.push({name: 'baz'}, function (err) { + console.log('finished processing baz'); +}); +``` + +--------------------------------------- + ### auto(tasks, [callback]) diff --git a/lib/async.js b/lib/async.js index bbbd05c47..e4edb85b0 100644 --- a/lib/async.js +++ b/lib/async.js @@ -618,6 +618,71 @@ return q; }; + async.cargo = function (worker, payload) { + var working = false, + tasks = []; + + var cargo = { + tasks: tasks, + payload: payload, + saturated: null, + empty: null, + drain: null, + push: function (data, callback) { + if(data.constructor !== Array) { + data = [data]; + } + _forEach(data, function(task) { + tasks.push({ + data: task, + callback: typeof callback === 'function' ? callback : null + }); + if (cargo.saturated && tasks.length === payload) { + cargo.saturated(); + } + }); + async.nextTick(cargo.process); + }, + process: function process() { + if (working) return; + if (tasks.length === 0) { + if(cargo.drain) cargo.drain(); + return; + } + + var ts = typeof payload === 'number' + ? tasks.splice(0, payload) + : tasks.splice(0); + + var ds = _map(ts, function (task) { + return task.data; + }); + + if(cargo.empty) cargo.empty(); + working = true; + worker(ds, function () { + working = false; + + var args = arguments; + _forEach(ts, function (data) { + if (data.callback) { + data.callback.apply(null, args); + } + }); + + process(); + }); + }, + length: function () { + return tasks.length; + }, + running: function () { + return working; + } + }; + return cargo; + }; + var _console_fn = function (name) { return function (fn) { var args = Array.prototype.slice.call(arguments, 1); diff --git a/test/test-async.js b/test/test-async.js index 39ec47d90..368608ff9 100644 --- a/test/test-async.js +++ b/test/test-async.js @@ -1419,6 +1419,140 @@ exports['queue bulk task'] = function (test) { }, 800); }; +exports['cargo'] = function (test) { + var call_order = [], + delays = [160, 160, 80]; + + // worker: --12--34--5- + // order of completion: 1,2,3,4,5 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', 'arg'); + }, delays.shift()); + }, 2); + + c.push(1, function (err, arg) { + test.equal(err, 'error'); + test.equal(arg, 'arg'); + test.equal(c.length(), 3); + call_order.push('callback ' + 1); + }); + c.push(2, function (err, arg) { + test.equal(err, 'error'); + test.equal(arg, 'arg'); + test.equal(c.length(), 3); + call_order.push('callback ' + 2); + }); + + test.equal(c.length(), 2); + + // async push + setTimeout(function () { + c.push(3, function (err, arg) { + test.equal(err, 'error'); + test.equal(arg, 'arg'); + test.equal(c.length(), 1); + call_order.push('callback ' + 3); + }); + }, 60); + setTimeout(function () { + c.push(4, function (err, arg) { + test.equal(err, 'error'); + test.equal(arg, 'arg'); + test.equal(c.length(), 1); + call_order.push('callback ' + 4); + }); + test.equal(c.length(), 2); + c.push(5, function (err, arg) { + test.equal(err, 'error'); + test.equal(arg, 'arg'); + test.equal(c.length(), 0); + call_order.push('callback ' + 5); + }); + }, 120); + + + setTimeout(function () { + test.same(call_order, [ + 'process 1 2', 'callback 1', 'callback 2', + 'process 3 4', 'callback 3', 'callback 4', + 'process 5' , 'callback 5' + ]); + test.equal(c.length(), 0); + test.done(); + }, 800); +}; + +exports['cargo without callback'] = function (test) { + var call_order = [], + delays = [160,80,240,80]; + + // worker: --1-2---34-5- + // order of completion: 1,2,3,4,5 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', 'arg'); + }, delays.shift()); + }, 2); + + c.push(1); + + setTimeout(function () { + c.push(2); + }, 120); + setTimeout(function () { + c.push(3); + c.push(4); + c.push(5); + }, 180); + + setTimeout(function () { + test.same(call_order, [ + 'process 1', + 'process 2', + 'process 3 4', + 'process 5' + ]); + test.done(); + }, 800); +}; + +exports['cargo bulk task'] = function (test) { + var call_order = [], + delays = [120,40]; + + // worker: -123-4- + // order of completion: 1,2,3,4 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', tasks.join(' ')); + }, delays.shift()); + }, 3); + + c.push( [1,2,3,4], function (err, arg) { + test.equal(err, 'error'); + call_order.push('callback ' + arg); + }); + + test.equal(c.length(), 4); + + setTimeout(function () { + test.same(call_order, [ + 'process 1 2 3', 'callback 1 2 3', + 'callback 1 2 3', 'callback 1 2 3', + 'process 4', 'callback 4', + ]); + test.equal(c.length(), 0); + test.done(); + }, 800); +}; + exports['memoize'] = function (test) { test.expect(4); var call_order = [];