Skip to content

Commit 31bee50

Browse files
committed
Add timeout parameter to all consume method calls
1 parent b9cf763 commit 31bee50

File tree

5 files changed

+84
-37
lines changed

5 files changed

+84
-37
lines changed

lib/kafka-consumer.js

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,18 @@ function KafkaConsumer(conf, topicConf) {
8787

8888
this.globalConfig = conf;
8989
this.topicConfig = topicConf;
90+
91+
this._consumeTimeout = 1000;
9092
}
9193

94+
/**
95+
* Set the default consume timeout provided to c++land
96+
* @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched
97+
*/
98+
KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
99+
this._consumeTimeout = timeoutMs;
100+
};
101+
92102
/**
93103
* Get a stream representation of this KafkaConsumer
94104
*
@@ -201,6 +211,8 @@ KafkaConsumer.prototype.unsubscribe = function() {
201211
* is fetched.
202212
*/
203213
KafkaConsumer.prototype.consume = function(topics, cb) {
214+
var timeoutMs = this._consumeTimeout || 1000;
215+
204216
var self = this;
205217
// If topics is set and is an array or a string, or if we have both
206218
// parameters, run it like this.
@@ -216,7 +228,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
216228
cb = function() {};
217229
}
218230

219-
this._consumeLoop(topics, cb);
231+
this._consumeLoop(timeoutMs, topics, cb);
220232

221233
} else if ((topics && typeof topics === 'number') || (topics && cb)) {
222234

@@ -227,7 +239,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
227239
throw new TypeError('Callback must be a function');
228240
}
229241

230-
this._consumeNum(numMessages, cb);
242+
this._consumeNum(timeoutMs, numMessages, cb);
231243

232244
} else {
233245
if (topics === undefined) {
@@ -238,7 +250,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
238250
cb = topics;
239251
}
240252

241-
this._consumeOne(cb);
253+
this._consumeOne(timeoutMs, cb);
242254
}
243255
return this;
244256
};
@@ -251,7 +263,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
251263
* @private
252264
* @see consume
253265
*/
254-
KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
266+
KafkaConsumer.prototype._consumeLoop = function(timeoutMs, topics, cb) {
255267
var self = this;
256268

257269
this._client.subscribe(topics, function subscribeCallback(err) {
@@ -261,7 +273,7 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
261273
cb(err);
262274
} else {
263275

264-
self._client.consumeLoop(function readCallback(err, message) {
276+
self._client.consumeLoop(timeoutMs, function readCallback(err, message) {
265277

266278
if (err) {
267279
// A few different types of errors here
@@ -297,11 +309,11 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
297309
* @private
298310
* @see consume
299311
*/
300-
KafkaConsumer.prototype._consumeNum = function(numMessages, cb) {
312+
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
301313
var self = this;
302314

303315
try {
304-
this._client.consume(numMessages, function(err, messages) {
316+
this._client.consume(timeoutMs, numMessages, function(err, messages) {
305317
if (err) {
306318
err = new LibrdKafkaError(err);
307319
if (cb) {
@@ -342,12 +354,12 @@ KafkaConsumer.prototype._consumeNum = function(numMessages, cb) {
342354
* @private
343355
* @see consume
344356
*/
345-
KafkaConsumer.prototype._consumeOne = function(cb) {
357+
KafkaConsumer.prototype._consumeOne = function(timeoutMs, cb) {
346358
// Otherwise, we run this method
347359
var self = this;
348360

349361
try {
350-
this._client.consume(function(err, message) {
362+
this._client.consume(timeoutMs, function(err, message) {
351363
if (err) {
352364
err = new LibrdKafkaError(err);
353365
if (cb) {

src/consumer.cc

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ Baton Consumer::Subscribe(std::vector<std::string> topics) {
244244
return Baton(RdKafka::ERR_NO_ERROR);
245245
}
246246

247-
NodeKafka::Message* Consumer::Consume() {
247+
NodeKafka::Message* Consumer::Consume(int timeout_ms) {
248248
NodeKafka::Message* m;
249249

250250
if (IsConnected()) {
@@ -254,7 +254,7 @@ NodeKafka::Message* Consumer::Consume() {
254254
} else {
255255
RdKafka::KafkaConsumer* consumer =
256256
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
257-
m = new NodeKafka::Message(consumer->consume(1000));
257+
m = new NodeKafka::Message(consumer->consume(timeout_ms));
258258

259259
if (m->ConsumerShouldStop()) {
260260
Unsubscribe();
@@ -720,39 +720,64 @@ NAN_METHOD(Consumer::NodeSubscribeSync) {
720720
NAN_METHOD(Consumer::NodeConsumeLoop) {
721721
Nan::HandleScope scope;
722722

723-
if (info.Length() < 1) {
723+
if (info.Length() < 2) {
724724
// Just throw an exception
725725
return Nan::ThrowError("Invalid number of parameters");
726726
}
727727

728-
if (!info[0]->IsFunction()) {
728+
if (!info[0]->IsNumber()) {
729+
return Nan::ThrowError("Need to specify a timeout");
730+
}
731+
732+
if (!info[1]->IsFunction()) {
729733
return Nan::ThrowError("Need to specify a callback");
730734
}
731735

736+
int timeout_ms;
737+
Nan::Maybe<uint32_t> maybeTimeout =
738+
Nan::To<uint32_t>(info[0].As<v8::Number>());
739+
740+
if (maybeTimeout.IsNothing()) {
741+
timeout_ms = 1000;
742+
} else {
743+
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
744+
}
745+
732746
Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());
733747

734-
v8::Local<v8::Function> cb = info[0].As<v8::Function>();
748+
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
735749

736750
Nan::Callback *callback = new Nan::Callback(cb);
737-
Nan::AsyncQueueWorker(new Workers::ConsumerConsumeLoop(callback, consumer));
751+
Nan::AsyncQueueWorker(
752+
new Workers::ConsumerConsumeLoop(callback, consumer, timeout_ms));
738753

739754
info.GetReturnValue().Set(Nan::Null());
740755
}
741756

742757
NAN_METHOD(Consumer::NodeConsume) {
743758
Nan::HandleScope scope;
744759

745-
if (info.Length() < 1) {
760+
if (info.Length() < 2) {
746761
// Just throw an exception
747762
return Nan::ThrowError("Invalid number of parameters");
748763
}
749764

750-
if (info[0]->IsNumber()) {
751-
if (!info[1]->IsFunction()) {
765+
int timeout_ms;
766+
Nan::Maybe<uint32_t> maybeTimeout =
767+
Nan::To<uint32_t>(info[0].As<v8::Number>());
768+
769+
if (maybeTimeout.IsNothing()) {
770+
timeout_ms = 1000;
771+
} else {
772+
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
773+
}
774+
775+
if (info[1]->IsNumber()) {
776+
if (!info[2]->IsFunction()) {
752777
return Nan::ThrowError("Need to specify a callback");
753778
}
754779

755-
v8::Local<v8::Number> numMessagesNumber = info[0].As<v8::Number>();
780+
v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
756781
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT
757782

758783
uint32_t numMessages;
@@ -764,21 +789,22 @@ NAN_METHOD(Consumer::NodeConsume) {
764789

765790
Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());
766791

767-
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
792+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
768793
Nan::Callback *callback = new Nan::Callback(cb);
769794
Nan::AsyncQueueWorker(
770-
new Workers::ConsumerConsumeNum(callback, consumer, numMessages));
795+
new Workers::ConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT
771796

772797
} else {
773-
if (!info[0]->IsFunction()) {
798+
if (!info[1]->IsFunction()) {
774799
return Nan::ThrowError("Need to specify a callback");
775800
}
776801

777802
Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());
778803

779-
v8::Local<v8::Function> cb = info[0].As<v8::Function>();
804+
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
780805
Nan::Callback *callback = new Nan::Callback(cb);
781-
Nan::AsyncQueueWorker(new Workers::ConsumerConsume(callback, consumer));
806+
Nan::AsyncQueueWorker(
807+
new Workers::ConsumerConsume(callback, consumer, timeout_ms));
782808
}
783809

784810
info.GetReturnValue().Set(Nan::Null());

src/consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Consumer : public Connection {
7070
std::string Name();
7171

7272
Baton Subscribe(std::vector<std::string>);
73-
NodeKafka::Message* Consume();
73+
NodeKafka::Message* Consume(int timeout_ms);
7474

7575
void ActivateDispatchers();
7676
void DeactivateDispatchers();

src/workers.cc

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -388,16 +388,18 @@ void ConsumerUnsubscribe::HandleErrorCallback() {
388388
*/
389389

390390
ConsumerConsumeLoop::ConsumerConsumeLoop(Nan::Callback *callback,
391-
Consumer* consumer) :
391+
Consumer* consumer,
392+
const int & timeout_ms) :
392393
MessageWorker(callback),
393-
consumer(consumer) {}
394+
consumer(consumer),
395+
m_timeout_ms(timeout_ms) {}
394396

395397
ConsumerConsumeLoop::~ConsumerConsumeLoop() {}
396398

397399
void ConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
398400
// Do one check here before we move forward
399401
while (consumer->IsConnected()) {
400-
NodeKafka::Message* message = consumer->Consume();
402+
NodeKafka::Message* message = consumer->Consume(m_timeout_ms);
401403
if (message->errcode() == RdKafka::ERR__PARTITION_EOF) {
402404
delete message;
403405
usleep(1*1000);
@@ -466,18 +468,20 @@ void ConsumerConsumeLoop::HandleErrorCallback() {
466468

467469
ConsumerConsumeNum::ConsumerConsumeNum(Nan::Callback *callback,
468470
Consumer* consumer,
469-
const uint32_t & num_messages) :
471+
const uint32_t & num_messages,
472+
const int & timeout_ms) :
470473
ErrorAwareWorker(callback),
471474
m_consumer(consumer),
472-
m_num_messages(num_messages) {}
475+
m_num_messages(num_messages),
476+
m_timeout_ms(timeout_ms) {}
473477

474478
ConsumerConsumeNum::~ConsumerConsumeNum() {}
475479

476480
void ConsumerConsumeNum::Execute() {
477481
const int max = static_cast<int>(m_num_messages);
478482
for (int i = 0; i < max; i++) {
479483
// Get a message
480-
NodeKafka::Message* message = m_consumer->Consume();
484+
NodeKafka::Message* message = m_consumer->Consume(m_timeout_ms);
481485
if (message->IsError()) {
482486
if (message->errcode() != RdKafka::ERR__TIMED_OUT &&
483487
message->errcode() != RdKafka::ERR__PARTITION_EOF) {
@@ -543,14 +547,16 @@ void ConsumerConsumeNum::HandleErrorCallback() {
543547
*/
544548

545549
ConsumerConsume::ConsumerConsume(Nan::Callback *callback,
546-
Consumer* consumer) :
550+
Consumer* consumer,
551+
const int & timeout_ms) :
547552
ErrorAwareWorker(callback),
548-
consumer(consumer) {}
553+
consumer(consumer),
554+
m_timeout_ms(timeout_ms) {}
549555

550556
ConsumerConsume::~ConsumerConsume() {}
551557

552558
void ConsumerConsume::Execute() {
553-
_message = consumer->Consume();
559+
_message = consumer->Consume(m_timeout_ms);
554560
if (_message->IsError()) {
555561
if (_message->errcode() != RdKafka::ERR__TIMED_OUT ||
556562
_message->errcode() != RdKafka::ERR__PARTITION_EOF) {

src/workers.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ class ConsumerUnsubscribe : public ErrorAwareWorker {
259259

260260
class ConsumerConsumeLoop : public MessageWorker {
261261
public:
262-
ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*);
262+
ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*, const int &);
263263
~ConsumerConsumeLoop();
264264

265265
void Execute(const ExecutionMessageBus&);
@@ -268,24 +268,26 @@ class ConsumerConsumeLoop : public MessageWorker {
268268
void HandleMessageCallback(NodeKafka::Message*);
269269
private:
270270
NodeKafka::Consumer * consumer;
271+
const int m_timeout_ms;
271272
};
272273

273274
class ConsumerConsume : public ErrorAwareWorker {
274275
public:
275-
ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*);
276+
ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*, const int &);
276277
~ConsumerConsume();
277278

278279
void Execute();
279280
void HandleOKCallback();
280281
void HandleErrorCallback();
281282
private:
282283
NodeKafka::Consumer * consumer;
284+
const int m_timeout_ms;
283285
NodeKafka::Message* _message;
284286
};
285287

286288
class ConsumerConsumeNum : public ErrorAwareWorker {
287289
public:
288-
ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &);
290+
ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &, const int &); // NOLINT
289291
~ConsumerConsumeNum();
290292

291293
void Execute();
@@ -294,6 +296,7 @@ class ConsumerConsumeNum : public ErrorAwareWorker {
294296
private:
295297
NodeKafka::Consumer * m_consumer;
296298
const uint32_t m_num_messages;
299+
const int m_timeout_ms;
297300
std::vector<NodeKafka::Message*> m_messages;
298301
};
299302

0 commit comments

Comments
 (0)