Skip to content

Cleanups #394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import functools
import logging
import time
import kafka.common

import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
Expand All @@ -22,7 +22,7 @@

class KafkaClient(object):

CLIENT_ID = b"kafka-python"
CLIENT_ID = b'kafka-python'

# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
Expand Down Expand Up @@ -50,7 +50,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
Expand Down Expand Up @@ -111,6 +111,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
for (host, port) in self.hosts:
requestId = self._next_id()
log.debug('Request %s: %s', requestId, payloads)
try:
conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
Expand All @@ -119,13 +120,15 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):

conn.send(requestId, request)
response = conn.recv(requestId)
return decoder_fn(response)
decoded = decoder_fn(response)
log.debug('Response %s: %s', requestId, decoded)
return decoded

except Exception:
log.exception("Could not send request [%r] to server %s:%i, "
"trying next server" % (requestId, host, port))
log.exception('Error sending request [%s] to server %s:%s, '
'trying next server', requestId, host, port)

raise KafkaUnavailableError("All servers failed to process request")
raise KafkaUnavailableError('All servers failed to process request')

def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
Expand All @@ -150,9 +153,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

List of response objects in the same order as the supplied payloads
"""

log.debug("Sending Payloads: %s" % payloads)

# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
Expand All @@ -170,6 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

Expand All @@ -180,7 +181,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

except ConnectionError as e:
broker_failures.append(broker)
log.warning("Could not send request [%s] to server %s: %s",
log.warning('Could not send request [%s] to server %s: %s',
binascii.b2a_hex(request), broker, e)

for payload in payloads:
Expand All @@ -201,15 +202,14 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
log.warning("Could not receive response to request [%s] "
"from server %s: %s",
log.warning('Could not receive response to request [%s] '
'from server %s: %s',
binascii.b2a_hex(request), conn, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))

else:

for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)

Expand All @@ -223,7 +223,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
log.debug('Responses: %s' % responses_by_payload)
return responses_by_payload

def __repr__(self):
Expand Down Expand Up @@ -254,8 +253,11 @@ def close(self):

def copy(self):
"""
Create an inactive copy of the client object
A reinit() has to be done on the copy before it can be used again
Create an inactive copy of the client object, suitable for passing
to a separate thread.

Note that the copied connections are not initialized, so reinit() must
be called on the returned copy.
"""
c = copy.deepcopy(self)
for key in c.conns:
Expand Down Expand Up @@ -297,7 +299,7 @@ def ensure_topic_exists(self, topic, timeout = 30):

while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
Expand Down Expand Up @@ -345,8 +347,8 @@ def load_metadata_for_topics(self, *topics):

resp = self.send_metadata_request(topics)

log.debug("Received new broker metadata: %s", resp.brokers)
log.debug("Received new topic metadata: %s", resp.topics)
log.debug('Received new broker metadata: %s', resp.brokers)
log.debug('Received new topic metadata: %s', resp.topics)

self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
Expand All @@ -365,7 +367,7 @@ def load_metadata_for_topics(self, *topics):
raise

# Otherwise, just log a warning
log.error("Error loading topic metadata for %s: %s", topic, type(e))
log.error('Error loading topic metadata for %s: %s', topic, type(e))
continue

self.topic_partitions[topic] = {}
Expand Down Expand Up @@ -406,7 +408,6 @@ def load_metadata_for_topics(self, *topics):

def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):

encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response

Expand Down
8 changes: 5 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ def recv(self, request_id):

def copy(self):
"""
Create an inactive copy of the connection object
A reinit() has to be done on the copy before it can be used again
return a new KafkaConnection object
Create an inactive copy of the connection object, suitable for
passing to a background thread.

The returned copy is not connected; you must call reinit() before
using.
"""
c = copy.deepcopy(self)
# Python 3 doesn't copy custom attributes of the threadlocal subclass
Expand Down
15 changes: 7 additions & 8 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from __future__ import absolute_import

import logging
import time

from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager

try:
from Queue import Empty, Full
except ImportError: # python 2
from queue import Empty, Full
from Queue import Empty, Full # python 3
except ImportError:
from queue import Empty, Full # python 2
import time

from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS
)
from .simple import Consumer, SimpleConsumer
from .simple import SimpleConsumer


log = logging.getLogger(__name__)
Expand Down
27 changes: 13 additions & 14 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,18 @@

try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError: # python 2
from itertools import izip_longest as izip_longest, repeat
except ImportError:
from itertools import izip_longest as izip_longest, repeat # python 2
import logging
try:
from Queue import Empty, Queue # python 3
except ImportError:
from queue import Empty, Queue # python 2
import sys
import time

import six
import sys

try:
from Queue import Empty, Queue
except ImportError: # python 2
from queue import Empty, Queue

from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
FETCH_DEFAULT_BLOCK_TIMEOUT,
Expand All @@ -33,6 +26,12 @@
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)


log = logging.getLogger(__name__)
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class Producer(object):

Arguments:
client (KafkaClient): instance to use for broker communications.
If async=True, the background thread will use client.copy(),
which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
Expand Down
53 changes: 10 additions & 43 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
import logging
import warnings

from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring
from .base import Producer
from ..partitioner import HashedPartitioner
from ..util import kafka_bytestring

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)

log = logging.getLogger(__name__)

Expand All @@ -19,46 +15,17 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key

Arguments:
client: The kafka client instance
See Producer class for Arguments

Keyword Arguments:
Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send: If True, messages are send in batches
batch_send_every_n: If set, messages are send in batches of this size
batch_send_every_t: If set, messages are send after this timeout
to send the message to. Must be derived from Partitioner.
Defaults to HashedPartitioner.
"""
def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
def __init__(self, *args, **kwargs):
self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
self.partitioners = {}

super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)
super(KeyedProducer, self).__init__(*args, **kwargs)

def _next_partition(self, topic, key):
if topic not in self.partitioners:
Expand Down
Loading