Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ producer.send("key2", "this methode")
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
```

## Multiprocess consumer
```python
from kafka.consume import MultiProcessConsumer

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2)

# This will spawn processes such that each handles 2 partitions max
consumer = MultiProcessConsumer(kafka, "my-topic", "my-group",
partitions_per_proc=2)

for message in consumer:
print(message)

for message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
```

## Low level

```python
Expand Down
5 changes: 3 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
create_message, create_gzip_message, create_snappy_message
)
from kafka.producer import SimpleProducer
from kafka.consumer import SimpleConsumer
from kafka.consumer import SimpleConsumer, MultiProcessConsumer

__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer',
'KafkaClient', 'KafkaConnection', 'SimpleProducer',
'SimpleConsumer', 'MultiProcessConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
14 changes: 11 additions & 3 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ def close(self):
for conn in self.conns.values():
conn.close()

def reinit(self):
for conn in self.conns.values():
conn.reinit()

def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Expand Down Expand Up @@ -221,15 +225,19 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
return out

def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None):
callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest

Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
resps = self._send_broker_aware_request(payloads,
KafkaProtocol.encode_fetch_request,

encoder = partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)

resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)

out = []
Expand Down
9 changes: 9 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@ def recv(self, requestId):
def close(self):
"Close this connection"
self._sock.close()

def reinit(self):
"""
Re-initialize the socket connection
"""
self._sock.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
Loading