Skip to content

Move integration tests and fixtures to test/integration/; simplify unit fixtures #2588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ test: build-integration
pytest $(PYTESTS)

fixture: build-integration
python -m test.fixtures kafka
python -m test.integration.fixtures kafka

cov-local: build-integration
pytest --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
Expand Down Expand Up @@ -99,7 +99,7 @@ servers/%/kafka-bin: servers/dist/$$(call kafka_artifact_name,$$*) | servers/dis
if [[ "$*" < "1" ]]; then make servers/patch-libs/$*; fi

servers/%/api_versions: servers/$$*/kafka-bin
KAFKA_VERSION=$* python -m test.fixtures get_api_versions >$@
KAFKA_VERSION=$* python -m test.integration.fixtures get_api_versions >$@

servers/%/messages: servers/$$*/kafka-bin
cd servers/$*/ && jar xvf kafka-bin/libs/kafka-clients-$*.jar common/message/
Expand Down
179 changes: 7 additions & 172 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,17 @@
from __future__ import absolute_import

import os
import uuid

import pytest

from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from test.testutil import env_kafka_version, random_string
from test.fixtures import KafkaFixture, ZookeeperFixture


@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
yield ZookeeperFixture.instance(host=host, port=port, external=True)
else:
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()


@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
return KafkaFixture.instance(0, host=host, port=port, external=True)
else:
return kafka_broker_factory()


@pytest.fixture(scope="module")
def kafka_broker_factory():
"""Return a Kafka broker fixture factory"""
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'

_brokers = []
def factory(**broker_params):
params = {} if broker_params is None else broker_params.copy()
params.setdefault('partitions', 4)
node_id = params.pop('node_id', 0)
broker = KafkaFixture.instance(node_id, **params)
_brokers.append(broker)
return broker

yield factory

zks = set()
for broker in _brokers:
zks.add(broker.zookeeper)
broker.close()
for zk in zks:
if zk:
zk.close()


@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
yield client
client.close()


@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()


@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(topics=(topic,), **kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
return _consumer[0]

yield factory

if _consumer[0]:
_consumer[0].close()


@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()


@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]

def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]

yield factory

if _producer[0]:
_producer[0].close()


@pytest.fixture
def kafka_admin_client(kafka_admin_client_factory):
"""Return a KafkaAdminClient fixture"""
yield kafka_admin_client_factory()


@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
"""Return a KafkaAdminClient factory fixture"""
_admin_client = [None]

def factory(**kafka_admin_client_params):
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
return _admin_client[0]

yield factory

if _admin_client[0]:
_admin_client[0].close()

def metrics():
from kafka.metrics import Metrics

@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name
metrics = Metrics()
try:
yield metrics
finally:
metrics.close()


@pytest.fixture
Expand Down Expand Up @@ -173,41 +43,6 @@ def _set_conn_state(state):
return conn


@pytest.fixture()
def send_messages(topic, kafka_producer, request):
"""A factory that returns a send_messages function with a pre-populated
topic topic / producer."""

def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
"""
messages is typically `range(0,100)`
partition is an int
"""
messages_and_futures = [] # [(message, produce_future),]
for i in number_range:
# request.node.name provides the test name (including parametrized values)
encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
messages_and_futures.append((encoded_msg, future))
kafka_producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
return [msg for (msg, f) in messages_and_futures]

return _send_messages


@pytest.fixture
def metrics():
from kafka.metrics import Metrics

metrics = Metrics()
try:
yield metrics
finally:
metrics.close()


@pytest.fixture
def client(conn, mocker):
from kafka import KafkaClient
Expand Down
168 changes: 168 additions & 0 deletions test/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from __future__ import absolute_import

import os
import uuid

import pytest

from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from test.testutil import env_kafka_version, random_string
from test.integration.fixtures import KafkaFixture, ZookeeperFixture


@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
yield ZookeeperFixture.instance(host=host, port=port, external=True)
else:
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()


@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
return KafkaFixture.instance(0, host=host, port=port, external=True)
else:
return kafka_broker_factory()


@pytest.fixture(scope="module")
def kafka_broker_factory():
"""Return a Kafka broker fixture factory"""
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'

_brokers = []
def factory(**broker_params):
params = {} if broker_params is None else broker_params.copy()
params.setdefault('partitions', 4)
node_id = params.pop('node_id', 0)
broker = KafkaFixture.instance(node_id, **params)
_brokers.append(broker)
return broker

yield factory

zks = set()
for broker in _brokers:
zks.add(broker.zookeeper)
broker.close()
for zk in zks:
if zk:
zk.close()


@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
yield client
client.close()


@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()


@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(topics=(topic,), **kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
return _consumer[0]

yield factory

if _consumer[0]:
_consumer[0].close()


@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()


@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]

def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]

yield factory

if _producer[0]:
_producer[0].close()


@pytest.fixture
def kafka_admin_client(kafka_admin_client_factory):
"""Return a KafkaAdminClient fixture"""
yield kafka_admin_client_factory()


@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
"""Return a KafkaAdminClient factory fixture"""
_admin_client = [None]

def factory(**kafka_admin_client_params):
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
return _admin_client[0]

yield factory

if _admin_client[0]:
_admin_client[0].close()


@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name


@pytest.fixture()
def send_messages(topic, kafka_producer, request):
"""A factory that returns a send_messages function with a pre-populated
topic topic / producer."""

def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
"""
messages is typically `range(0,100)`
partition is an int
"""
messages_and_futures = [] # [(message, produce_future),]
for i in number_range:
# request.node.name provides the test name (including parametrized values)
encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
messages_and_futures.append((encoded_msg, future))
kafka_producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
return [msg for (msg, f) in messages_and_futures]

return _send_messages
2 changes: 1 addition & 1 deletion test/fixtures.py → test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
project_root = os.environ.get('PROJECT_ROOT',
os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
kafka_root = os.environ.get("KAFKA_ROOT",
os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))

Expand Down
File renamed without changes.
File renamed without changes.
Loading
Loading