diff --git a/src/config.cc b/src/config.cc index adbe6e78..b9f41b6f 100644 --- a/src/config.cc +++ b/src/config.cc @@ -9,6 +9,7 @@ #include #include +#include #include "src/config.h" @@ -25,6 +26,17 @@ namespace NodeKafka { namespace Config { +void DumpConfig(std::list *dump) { + for (std::list::iterator it = dump->begin(); + it != dump->end(); ) { + std::cout << *it << " = "; + it++; + std::cout << *it << std::endl; + it++; + } + std::cout << std::endl; +} + template void LoadParameter(v8::Local object, std::string field, const T &to) { // NOLINT to = GetParameter(object, field, to); diff --git a/src/config.h b/src/config.h index 210140a2..6e1928e2 100644 --- a/src/config.h +++ b/src/config.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "deps/librdkafka/src-cpp/rdkafkacpp.h" #include "src/common.h" @@ -20,6 +21,7 @@ namespace NodeKafka { namespace Config { +void DumpConfig(std::list *); template void LoadParameter(v8::Local, std::string, T &); // NOLINT std::string GetValue(RdKafka::Conf*, const std::string); RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local, std::string &); // NOLINT diff --git a/src/connection.cc b/src/connection.cc index 93ebe8ef..3f4aa692 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -8,7 +8,6 @@ */ #include -#include #include "src/connection.h" #include "src/workers.h" @@ -57,13 +56,14 @@ Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig): Connection::~Connection() { uv_mutex_destroy(&m_connection_lock); - if (m_gconfig) { - delete m_gconfig; - } if (m_tconfig) { delete m_tconfig; } + + if (m_gconfig) { + delete m_gconfig; + } } RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) { @@ -78,17 +78,6 @@ bool Connection::IsConnected() { return !m_is_closing && m_client != NULL; } -void Connection::DumpConfig(std::list *dump) { - for (std::list::iterator it = dump->begin(); - it != dump->end(); ) { - std::cout << *it << " = "; - it++; - std::cout << *it << std::endl; - it++; - } - std::cout << std::endl; -} - RdKafka::Handle* Connection::GetClient() { return m_client; } diff --git a/src/connection.h b/src/connection.h index f576a501..e3ec9dff 100644 --- a/src/connection.h +++ b/src/connection.h @@ -13,7 +13,6 @@ #include #include #include -#include #include "deps/librdkafka/src-cpp/rdkafkacpp.h" @@ -67,8 +66,6 @@ class Connection : public Nan::ObjectWrap { Connection(RdKafka::Conf*, RdKafka::Conf*); ~Connection(); - void DumpConfig(std::list *); - static Nan::Persistent constructor; static void New(const Nan::FunctionCallbackInfo& info); diff --git a/src/consumer.cc b/src/consumer.cc index bb00a7cc..8175c27c 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -95,8 +95,6 @@ Baton Consumer::Disconnect() { delete m_client; m_client = NULL; - - RdKafka::wait_destroyed(1000); } } diff --git a/src/producer.cc b/src/producer.cc index faee0258..d4813a0f 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -34,6 +34,7 @@ Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig): m_partitioner_cb() { std::string errstr; + m_gconfig->set("default_topic_conf", m_tconfig, errstr); m_gconfig->set("dr_cb", &m_dr_cb, errstr); } @@ -124,7 +125,7 @@ void Producer::New(const Nan::FunctionCallbackInfo& info) { return Nan::ThrowError(errstr.c_str()); } - Producer* producer = new Producer(gconfig, gconfig); + Producer* producer = new Producer(gconfig, tconfig); // Wrap it producer->Wrap(info.This()); @@ -190,8 +191,6 @@ void Producer::Disconnect() { delete m_client; m_client = NULL; } - - RdKafka::wait_destroyed(1000); } Baton Producer::Produce(ProducerMessage* msg) { @@ -199,23 +198,6 @@ Baton Producer::Produce(ProducerMessage* msg) { msg->partition, msg->key); } -Baton Producer::Produce(void* message, size_t size, std::string topic_name, - int32_t partition, std::string *key) { - std::string errstr; - - RdKafka::Topic* topic = - RdKafka::Topic::create(m_client, topic_name, m_tconfig, errstr); - - if (errstr.empty()) { - // Cede ownership of the pointer to this function - return Produce(message, size, topic, partition, key); - } - - // We own the pointer here so we need to free it - free(message); - return Baton(RdKafka::ERR__INVALID_ARG); -} - Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic, int32_t partition, std::string *key) { RdKafka::ErrorCode response_code; diff --git a/src/producer.h b/src/producer.h index d9b37148..edcee4dd 100644 --- a/src/producer.h +++ b/src/producer.h @@ -56,7 +56,6 @@ class Producer : public Connection { void Poll(); Baton Produce(ProducerMessage* msg); - Baton Produce(void*, size_t, std::string, int32_t, std::string*); Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*); std::string Name();