Skip to content

Cargo API #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 31, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ So far its been tested in IE6, IE7, IE8, FF3.6 and Chrome 5. Usage:
* [until](#until)
* [waterfall](#waterfall)
* [queue](#queue)
* [cargo](#cargo)
* [auto](#auto)
* [iterator](#iterator)
* [apply](#apply)
Expand Down Expand Up @@ -778,6 +779,63 @@ q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function (err) {

---------------------------------------

<a name="cargo" />
### cargo(worker, [payload])

Creates a cargo object with the specified payload. Tasks added to the
cargo will be processed altogether (up to the payload limit). If the
worker is in progress, the task is queued until it is available. Once
the worker has completed some tasks, each callback of those tasks is called.

__Arguments__

* worker(tasks, callback) - An asynchronous function for processing queued
tasks.
* payload - An optional integer for determining how many tasks should be
process per round, default is unlimited.

__Cargo objects__

The cargo object returned by this function has the following properties and
methods:

* length() - a function returning the number of items waiting to be processed.
* payload - an integer for determining how many tasks should be
process per round. This property can be changed after a cargo is created to
alter the payload on-the-fly.
* push(task, [callback]) - add a new task to the queue, the callback is called
once the worker has finished processing the task.
instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list.
* saturated - a callback that is called when the queue length hits the concurrency and further tasks will be queued
* empty - a callback that is called when the last item from the queue is given to a worker
* drain - a callback that is called when the last item from the queue has returned from the worker

__Example__

```js
// create a cargo object with payload 2

var cargo = async.cargo(function (task, callback) {
console.log('hello ' + task.name);
callback();
}, 2);


// add some items

cargo.push({name: 'foo'}, function (err) {
console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function (err) {
console.log('finished processing bar');
});
cargo.push({name: 'baz'}, function (err) {
console.log('finished processing baz');
});
```

---------------------------------------

<a name="auto" />
### auto(tasks, [callback])

Expand Down
65 changes: 65 additions & 0 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,71 @@
return q;
};

async.cargo = function (worker, payload) {
var working = false,
tasks = [];

var cargo = {
tasks: tasks,
payload: payload,
saturated: null,
empty: null,
drain: null,
push: function (data, callback) {
if(data.constructor !== Array) {
data = [data];
}
_forEach(data, function(task) {
tasks.push({
data: task,
callback: typeof callback === 'function' ? callback : null
});
if (cargo.saturated && tasks.length === payload) {
cargo.saturated();
}
});
async.nextTick(cargo.process);
},
process: function process() {
if (working) return;
if (tasks.length === 0) {
if(cargo.drain) cargo.drain();
return;
}

var ts = typeof payload === 'number'
? tasks.splice(0, payload)
: tasks.splice(0);

var ds = _map(ts, function (task) {
return task.data;
});

if(cargo.empty) cargo.empty();
working = true;
worker(ds, function () {
working = false;

var args = arguments;
_forEach(ts, function (data) {
if (data.callback) {
data.callback.apply(null, args);
}
});

process();
});
},
length: function () {
return tasks.length;
},
running: function () {
return working;
}
};
return cargo;
};

var _console_fn = function (name) {
return function (fn) {
var args = Array.prototype.slice.call(arguments, 1);
Expand Down
134 changes: 134 additions & 0 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,140 @@ exports['queue bulk task'] = function (test) {
}, 800);
};

exports['cargo'] = function (test) {
var call_order = [],
delays = [160, 160, 80];

// worker: --12--34--5-
// order of completion: 1,2,3,4,5

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', 'arg');
}, delays.shift());
}, 2);

c.push(1, function (err, arg) {
test.equal(err, 'error');
test.equal(arg, 'arg');
test.equal(c.length(), 3);
call_order.push('callback ' + 1);
});
c.push(2, function (err, arg) {
test.equal(err, 'error');
test.equal(arg, 'arg');
test.equal(c.length(), 3);
call_order.push('callback ' + 2);
});

test.equal(c.length(), 2);

// async push
setTimeout(function () {
c.push(3, function (err, arg) {
test.equal(err, 'error');
test.equal(arg, 'arg');
test.equal(c.length(), 1);
call_order.push('callback ' + 3);
});
}, 60);
setTimeout(function () {
c.push(4, function (err, arg) {
test.equal(err, 'error');
test.equal(arg, 'arg');
test.equal(c.length(), 1);
call_order.push('callback ' + 4);
});
test.equal(c.length(), 2);
c.push(5, function (err, arg) {
test.equal(err, 'error');
test.equal(arg, 'arg');
test.equal(c.length(), 0);
call_order.push('callback ' + 5);
});
}, 120);


setTimeout(function () {
test.same(call_order, [
'process 1 2', 'callback 1', 'callback 2',
'process 3 4', 'callback 3', 'callback 4',
'process 5' , 'callback 5'
]);
test.equal(c.length(), 0);
test.done();
}, 800);
};

exports['cargo without callback'] = function (test) {
var call_order = [],
delays = [160,80,240,80];

// worker: --1-2---34-5-
// order of completion: 1,2,3,4,5

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', 'arg');
}, delays.shift());
}, 2);

c.push(1);

setTimeout(function () {
c.push(2);
}, 120);
setTimeout(function () {
c.push(3);
c.push(4);
c.push(5);
}, 180);

setTimeout(function () {
test.same(call_order, [
'process 1',
'process 2',
'process 3 4',
'process 5'
]);
test.done();
}, 800);
};

exports['cargo bulk task'] = function (test) {
var call_order = [],
delays = [120,40];

// worker: -123-4-
// order of completion: 1,2,3,4

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', tasks.join(' '));
}, delays.shift());
}, 3);

c.push( [1,2,3,4], function (err, arg) {
test.equal(err, 'error');
call_order.push('callback ' + arg);
});

test.equal(c.length(), 4);

setTimeout(function () {
test.same(call_order, [
'process 1 2 3', 'callback 1 2 3',
'callback 1 2 3', 'callback 1 2 3',
'process 4', 'callback 4',
]);
test.equal(c.length(), 0);
test.done();
}, 800);
};

exports['memoize'] = function (test) {
test.expect(4);
var call_order = [];
Expand Down