Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <string>
#include <vector>
#include <list>

#include "src/config.h"

Expand All @@ -25,6 +26,17 @@ namespace NodeKafka {

namespace Config {

void DumpConfig(std::list<std::string> *dump) {
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end(); ) {
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}

template<typename T>
void LoadParameter(v8::Local<v8::Object> object, std::string field, const T &to) { // NOLINT
to = GetParameter<T>(object, field, to);
Expand Down
2 changes: 2 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
#include <nan.h>
#include <iostream>
#include <vector>
#include <list>

#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
#include "src/common.h"

namespace NodeKafka {
namespace Config {

void DumpConfig(std::list<std::string> *);
template<typename T> void LoadParameter(v8::Local<v8::Object>, std::string, T &); // NOLINT
std::string GetValue(RdKafka::Conf*, const std::string);
RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT
Expand Down
19 changes: 4 additions & 15 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

#include <string>
#include <list>

#include "src/connection.h"
#include "src/workers.h"
Expand Down Expand Up @@ -57,13 +56,14 @@ Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):

Connection::~Connection() {
uv_mutex_destroy(&m_connection_lock);
if (m_gconfig) {
delete m_gconfig;
}

if (m_tconfig) {
delete m_tconfig;
}

if (m_gconfig) {
delete m_gconfig;
}
}

RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
Expand All @@ -78,17 +78,6 @@ bool Connection::IsConnected() {
return !m_is_closing && m_client != NULL;
}

void Connection::DumpConfig(std::list<std::string> *dump) {
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end(); ) {
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}

RdKafka::Handle* Connection::GetClient() {
return m_client;
}
Expand Down
3 changes: 0 additions & 3 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <nan.h>
#include <iostream>
#include <string>
#include <list>

#include "deps/librdkafka/src-cpp/rdkafkacpp.h"

Expand Down Expand Up @@ -67,8 +66,6 @@ class Connection : public Nan::ObjectWrap {
Connection(RdKafka::Conf*, RdKafka::Conf*);
~Connection();

void DumpConfig(std::list<std::string> *);

static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);

Expand Down
2 changes: 0 additions & 2 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ Baton Consumer::Disconnect() {

delete m_client;
m_client = NULL;

RdKafka::wait_destroyed(1000);
}
}

Expand Down
22 changes: 2 additions & 20 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
m_partitioner_cb() {
std::string errstr;

m_gconfig->set("default_topic_conf", m_tconfig, errstr);
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
}

Expand Down Expand Up @@ -124,7 +125,7 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
return Nan::ThrowError(errstr.c_str());
}

Producer* producer = new Producer(gconfig, gconfig);
Producer* producer = new Producer(gconfig, tconfig);

// Wrap it
producer->Wrap(info.This());
Expand Down Expand Up @@ -190,32 +191,13 @@ void Producer::Disconnect() {
delete m_client;
m_client = NULL;
}

RdKafka::wait_destroyed(1000);
}

Baton Producer::Produce(ProducerMessage* msg) {
return Produce(msg->Payload(), msg->Size(), msg->GetTopic(),
msg->partition, msg->key);
}

Baton Producer::Produce(void* message, size_t size, std::string topic_name,
int32_t partition, std::string *key) {
std::string errstr;

RdKafka::Topic* topic =
RdKafka::Topic::create(m_client, topic_name, m_tconfig, errstr);

if (errstr.empty()) {
// Cede ownership of the pointer to this function
return Produce(message, size, topic, partition, key);
}

// We own the pointer here so we need to free it
free(message);
return Baton(RdKafka::ERR__INVALID_ARG);
}

Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic,
int32_t partition, std::string *key) {
RdKafka::ErrorCode response_code;
Expand Down
1 change: 0 additions & 1 deletion src/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class Producer : public Connection {
void Poll();

Baton Produce(ProducerMessage* msg);
Baton Produce(void*, size_t, std::string, int32_t, std::string*);
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
std::string Name();

Expand Down