Skip to content

Add 'codec' parameter to Producer #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ class ConsumerNoMoreData(KafkaError):
class ProtocolError(KafkaError):
pass


class UnsupportedCodecError(KafkaError):
pass


kafka_errors = {
-1 : UnknownError,
1 : OffsetOutOfRangeError,
Expand All @@ -187,6 +192,7 @@ class ProtocolError(KafkaError):
13 : StaleLeaderEpochCodeError,
}


def check_error(response):
error = kafka_errors.get(response.error)
if error:
Expand Down
33 changes: 23 additions & 10 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from itertools import cycle
from multiprocessing import Queue, Process

from kafka.common import ProduceRequest, TopicAndPartition
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set

log = logging.getLogger("kafka")

Expand All @@ -21,7 +23,7 @@
STOP_ASYNC_PRODUCER = -1


def _send_upstream(queue, client, batch_time, batch_size,
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout):
"""
Listen on the queue for a specified number of messages or till
Expand Down Expand Up @@ -62,7 +64,8 @@ def _send_upstream(queue, client, batch_time, batch_size,

# Send collected requests upstream
reqs = []
for topic_partition, messages in msgset.items():
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
Expand Down Expand Up @@ -102,6 +105,7 @@ class Producer(object):
def __init__(self, client, async=False,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
Expand All @@ -119,11 +123,19 @@ def __init__(self, client, async=False,
self.req_acks = req_acks
self.ack_timeout = ack_timeout

if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

self.codec = codec

if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
Expand All @@ -139,11 +151,10 @@ def send_messages(self, topic, partition, *msg):
"""
if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition),
create_message(m)))
self.queue.put((TopicAndPartition(topic, partition), m))
resp = []
else:
messages = [create_message(m) for m in msg]
messages = create_message_set(msg, self.codec)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
Expand All @@ -168,7 +179,7 @@ def stop(self, timeout=1):

class SimpleProducer(Producer):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
A simple, round-robin producer. Each message goes to exactly one partition

Params:
client - The Kafka client instance to use
Expand All @@ -189,14 +200,15 @@ class SimpleProducer(Producer):
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=False):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)

Expand Down Expand Up @@ -241,6 +253,7 @@ class KeyedProducer(Producer):
def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
Expand All @@ -250,7 +263,7 @@ def __init__(self, client, partitioner=None, async=False,
self.partitioners = {}

super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)

Expand Down
42 changes: 30 additions & 12 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
UnsupportedCodecError
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
Expand All @@ -18,6 +19,12 @@

log = logging.getLogger("kafka")

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)


class KafkaProtocol(object):
"""
Expand All @@ -32,11 +39,6 @@ class KafkaProtocol(object):
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02

###################
# Private API #
###################
Expand Down Expand Up @@ -150,17 +152,17 @@ def _decode_message(cls, data, offset):
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)

codec = att & KafkaProtocol.ATTRIBUTE_CODEC_MASK
codec = att & ATTRIBUTE_CODEC_MASK

if codec == KafkaProtocol.CODEC_NONE:
if codec == CODEC_NONE:
yield (offset, Message(magic, att, key, value))

elif codec == KafkaProtocol.CODEC_GZIP:
elif codec == CODEC_GZIP:
gz = gzip_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, msg)

elif codec == KafkaProtocol.CODEC_SNAPPY:
elif codec == CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, msg)
Expand Down Expand Up @@ -543,7 +545,7 @@ def create_gzip_message(payloads, key=None):
[create_message(payload) for payload in payloads])

gzipped = gzip_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP

return Message(0, 0x00 | codec, key, gzipped)

Expand All @@ -564,6 +566,22 @@ def create_snappy_message(payloads, key=None):
[create_message(payload) for payload in payloads])

snapped = snappy_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY

return Message(0, 0x00 | codec, key, snapped)


def create_message_set(messages, codec=CODEC_NONE):
"""Create a message set using the given codec.

If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
return a list containing a single codec-encoded message.
"""
if codec == CODEC_NONE:
return [create_message(m) for m in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages)]
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
69 changes: 60 additions & 9 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import contextlib
from contextlib import contextmanager
import struct
import unittest2

import mock
from mock import sentinel

from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError, ProtocolError,
LeaderUnavailableError, PartitionUnavailableError
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
UnsupportedCodecError
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
import kafka.protocol
from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
create_message, create_gzip_message, create_snappy_message,
create_message_set
)

class TestProtocol(unittest2.TestCase):
Expand All @@ -33,8 +41,7 @@ def test_create_gzip(self):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_GZIP)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
Expand Down Expand Up @@ -63,8 +70,7 @@ def test_create_snappy(self):
payloads = ["v1", "v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = "".join([
Expand Down Expand Up @@ -692,3 +698,48 @@ def test_decode_offset_fetch_response(self):
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
]))

@contextmanager
def mock_create_message_fns(self):
patches = contextlib.nested(
mock.patch.object(kafka.protocol, "create_message",
return_value=sentinel.message),
mock.patch.object(kafka.protocol, "create_gzip_message",
return_value=sentinel.gzip_message),
mock.patch.object(kafka.protocol, "create_snappy_message",
return_value=sentinel.snappy_message),
)

with patches:
yield

def test_create_message_set(self):
messages = [1, 2, 3]

# Default codec is CODEC_NONE. Expect list of regular messages.
expect = [sentinel.message] * len(messages)
with self.mock_create_message_fns():
message_set = create_message_set(messages)
self.assertEqual(message_set, expect)

# CODEC_NONE: Expect list of regular messages.
expect = [sentinel.message] * len(messages)
with self.mock_create_message_fns():
message_set = create_message_set(messages, CODEC_NONE)
self.assertEqual(message_set, expect)

# CODEC_GZIP: Expect list of one gzip-encoded message.
expect = [sentinel.gzip_message]
with self.mock_create_message_fns():
message_set = create_message_set(messages, CODEC_GZIP)
self.assertEqual(message_set, expect)

# CODEC_SNAPPY: Expect list of one snappy-encoded message.
expect = [sentinel.snappy_message]
with self.mock_create_message_fns():
message_set = create_message_set(messages, CODEC_SNAPPY)
self.assertEqual(message_set, expect)

# Unknown codec should raise UnsupportedCodecError.
with self.assertRaises(UnsupportedCodecError):
create_message_set(messages, -1)