Skip to content

Commit a74fa06

Browse files
committed
Merge pull request #397 from dpkp/test_logging
Test logging
2 parents 742cd5e + 0e416d5 commit a74fa06

File tree

6 files changed

+36
-17
lines changed

6 files changed

+36
-17
lines changed

test/service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def _spawn(self):
5959
self.alive = True
6060

6161
def _despawn(self):
62-
self.child.terminate()
62+
if self.child.poll() is None:
63+
self.child.terminate()
6364
self.alive = False
6465
for _ in range(50):
6566
if self.child.poll() is not None:

test/test_conn.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import socket
23
import struct
34
from threading import Thread
@@ -10,6 +11,10 @@
1011

1112
class ConnTest(unittest.TestCase):
1213
def setUp(self):
14+
15+
# kafka.conn debug logging is verbose, so only enable in conn tests
16+
logging.getLogger('kafka.conn').setLevel(logging.DEBUG)
17+
1318
self.config = {
1419
'host': 'localhost',
1520
'port': 9090,

test/test_consumer_integration.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,11 @@ def test_simple_consumer__seek(self):
170170
def test_simple_consumer_blocking(self):
171171
consumer = self.consumer()
172172

173-
# Ask for 5 messages, nothing in queue, block 5 seconds
173+
# Ask for 5 messages, nothing in queue, block 1 second
174174
with Timer() as t:
175-
messages = consumer.get_messages(block=True, timeout=5)
175+
messages = consumer.get_messages(block=True, timeout=1)
176176
self.assert_message_count(messages, 0)
177-
self.assertGreaterEqual(t.interval, 5)
177+
self.assertGreaterEqual(t.interval, 1)
178178

179179
self.send_messages(0, range(0, 10))
180180

@@ -184,11 +184,11 @@ def test_simple_consumer_blocking(self):
184184
self.assert_message_count(messages, 5)
185185
self.assertLessEqual(t.interval, 1)
186186

187-
# Ask for 10 messages, get 5 back, block 5 seconds
187+
# Ask for 10 messages, get 5 back, block 1 second
188188
with Timer() as t:
189-
messages = consumer.get_messages(count=10, block=True, timeout=5)
189+
messages = consumer.get_messages(count=10, block=True, timeout=1)
190190
self.assert_message_count(messages, 5)
191-
self.assertGreaterEqual(t.interval, 5)
191+
self.assertGreaterEqual(t.interval, 1)
192192

193193
consumer.stop()
194194

@@ -236,12 +236,12 @@ def test_multi_process_consumer(self):
236236
def test_multi_process_consumer_blocking(self):
237237
consumer = self.consumer(consumer = MultiProcessConsumer)
238238

239-
# Ask for 5 messages, No messages in queue, block 5 seconds
239+
# Ask for 5 messages, No messages in queue, block 1 second
240240
with Timer() as t:
241-
messages = consumer.get_messages(block=True, timeout=5)
241+
messages = consumer.get_messages(block=True, timeout=1)
242242
self.assert_message_count(messages, 0)
243243

244-
self.assertGreaterEqual(t.interval, 5)
244+
self.assertGreaterEqual(t.interval, 1)
245245

246246
# Send 10 messages
247247
self.send_messages(0, range(0, 10))
@@ -252,11 +252,11 @@ def test_multi_process_consumer_blocking(self):
252252
self.assert_message_count(messages, 5)
253253
self.assertLessEqual(t.interval, 1)
254254

255-
# Ask for 10 messages, 5 in queue, block 5 seconds
255+
# Ask for 10 messages, 5 in queue, block 1 second
256256
with Timer() as t:
257-
messages = consumer.get_messages(count=10, block=True, timeout=5)
257+
messages = consumer.get_messages(count=10, block=True, timeout=1)
258258
self.assert_message_count(messages, 5)
259-
self.assertGreaterEqual(t.interval, 4.95)
259+
self.assertGreaterEqual(t.interval, 1)
260260

261261
consumer.stop()
262262

@@ -450,7 +450,7 @@ def test_kafka_consumer__blocking(self):
450450
consumer = self.kafka_consumer(auto_offset_reset='smallest',
451451
consumer_timeout_ms=TIMEOUT_MS)
452452

453-
# Ask for 5 messages, nothing in queue, block 5 seconds
453+
# Ask for 5 messages, nothing in queue, block 500ms
454454
with Timer() as t:
455455
with self.assertRaises(ConsumerTimeout):
456456
msg = consumer.next()
@@ -467,7 +467,7 @@ def test_kafka_consumer__blocking(self):
467467
self.assertEqual(len(messages), 5)
468468
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
469469

470-
# Ask for 10 messages, get 5 back, block 5 seconds
470+
# Ask for 10 messages, get 5 back, block 500ms
471471
messages = set()
472472
with Timer() as t:
473473
with self.assertRaises(ConsumerTimeout):

test/test_failover_integration.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,14 @@ def test_switch_leader_async(self):
9898

9999
# Test the base class Producer -- send_messages to a specific partition
100100
producer = Producer(self.client, async=True,
101-
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
101+
batch_send_every_n=15,
102+
batch_send_every_t=3,
103+
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
104+
async_log_messages_on_error=False)
102105

103106
# Send 10 random messages
104107
self._send_random_messages(producer, topic, partition, 10)
108+
self._send_random_messages(producer, topic, partition + 1, 10)
105109

106110
# kill leader for partition
107111
self._kill_leader(topic, partition)
@@ -110,9 +114,11 @@ def test_switch_leader_async(self):
110114

111115
# in async mode, this should return immediately
112116
producer.send_messages(topic, partition, b'success')
117+
producer.send_messages(topic, partition + 1, b'success')
113118

114119
# send to new leader
115120
self._send_random_messages(producer, topic, partition, 10)
121+
self._send_random_messages(producer, topic, partition + 1, 10)
116122

117123
# Stop the producer and wait for it to shutdown
118124
producer.stop()
@@ -129,6 +135,8 @@ def test_switch_leader_async(self):
129135
# Should be equal to 10 before + 1 recovery + 10 after
130136
self.assert_message_count(topic, 21, partitions=(partition,),
131137
at_least=True)
138+
self.assert_message_count(topic, 21, partitions=(partition + 1,),
139+
at_least=True)
132140

133141
@kafka_versions("all")
134142
def test_switch_leader_keyed_producer(self):

test/testutil.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,8 @@ def __exit__(self, *args):
113113
self.interval = self.end - self.start
114114

115115
logging.basicConfig(level=logging.DEBUG)
116+
logging.getLogger('test.fixtures').setLevel(logging.ERROR)
117+
logging.getLogger('test.service').setLevel(logging.ERROR)
118+
119+
# kafka.conn debug logging is verbose, disable in tests by default
120+
logging.getLogger('kafka.conn').setLevel(logging.INFO)

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ deps =
1111
mock
1212
python-snappy
1313
commands =
14-
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
14+
nosetests {posargs:-v -x --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
1515
setenv =
1616
PROJECT_ROOT = {toxinidir}
1717
passenv = KAFKA_VERSION

0 commit comments

Comments
 (0)