diff --git a/Makefile b/Makefile index a624b833f..30da9cf91 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ test: build-integration pytest $(PYTESTS) fixture: build-integration - python -m test.fixtures kafka + python -m test.integration.fixtures kafka cov-local: build-integration pytest --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \ @@ -99,7 +99,7 @@ servers/%/kafka-bin: servers/dist/$$(call kafka_artifact_name,$$*) | servers/dis if [[ "$*" < "1" ]]; then make servers/patch-libs/$*; fi servers/%/api_versions: servers/$$*/kafka-bin - KAFKA_VERSION=$* python -m test.fixtures get_api_versions >$@ + KAFKA_VERSION=$* python -m test.integration.fixtures get_api_versions >$@ servers/%/messages: servers/$$*/kafka-bin cd servers/$*/ && jar xvf kafka-bin/libs/kafka-clients-$*.jar common/message/ diff --git a/test/conftest.py b/test/conftest.py index ba76d6cc5..b65593a86 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,147 +1,17 @@ from __future__ import absolute_import -import os -import uuid - import pytest -from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 -from test.testutil import env_kafka_version, random_string -from test.fixtures import KafkaFixture, ZookeeperFixture - - -@pytest.fixture(scope="module") -def zookeeper(): - """Return a Zookeeper fixture""" - if "ZOOKEEPER_URI" in os.environ: - parse = urlparse(os.environ["ZOOKEEPER_URI"]) - (host, port) = (parse.hostname, parse.port) - yield ZookeeperFixture.instance(host=host, port=port, external=True) - else: - zk_instance = ZookeeperFixture.instance() - yield zk_instance - zk_instance.close() - - -@pytest.fixture(scope="module") -def kafka_broker(kafka_broker_factory): - """Return a Kafka broker fixture""" - if "KAFKA_URI" in os.environ: - parse = urlparse(os.environ["KAFKA_URI"]) - (host, port) = (parse.hostname, parse.port) - return KafkaFixture.instance(0, host=host, port=port, external=True) - else: - return kafka_broker_factory() - - -@pytest.fixture(scope="module") -def kafka_broker_factory(): - """Return a Kafka broker fixture factory""" - assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' - - _brokers = [] - def factory(**broker_params): - params = {} if broker_params is None else broker_params.copy() - params.setdefault('partitions', 4) - node_id = params.pop('node_id', 0) - broker = KafkaFixture.instance(node_id, **params) - _brokers.append(broker) - return broker - - yield factory - - zks = set() - for broker in _brokers: - zks.add(broker.zookeeper) - broker.close() - for zk in zks: - if zk: - zk.close() - - -@pytest.fixture -def kafka_client(kafka_broker, request): - """Return a KafkaClient fixture""" - (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) - yield client - client.close() - - -@pytest.fixture -def kafka_consumer(kafka_consumer_factory): - """Return a KafkaConsumer fixture""" - return kafka_consumer_factory() - - -@pytest.fixture -def kafka_consumer_factory(kafka_broker, topic, request): - """Return a KafkaConsumer factory fixture""" - _consumer = [None] - - def factory(topics=(topic,), **kafka_consumer_params): - params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() - params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) - params.setdefault('auto_offset_reset', 'earliest') - _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params)) - return _consumer[0] - - yield factory - - if _consumer[0]: - _consumer[0].close() - - -@pytest.fixture -def kafka_producer(kafka_producer_factory): - """Return a KafkaProducer fixture""" - yield kafka_producer_factory() - - -@pytest.fixture -def kafka_producer_factory(kafka_broker, request): - """Return a KafkaProduce factory fixture""" - _producer = [None] - - def factory(**kafka_producer_params): - params = {} if kafka_producer_params is None else kafka_producer_params.copy() - params.setdefault('client_id', 'producer_%s' % (request.node.name,)) - _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) - return _producer[0] - - yield factory - - if _producer[0]: - _producer[0].close() - - -@pytest.fixture -def kafka_admin_client(kafka_admin_client_factory): - """Return a KafkaAdminClient fixture""" - yield kafka_admin_client_factory() - @pytest.fixture -def kafka_admin_client_factory(kafka_broker): - """Return a KafkaAdminClient factory fixture""" - _admin_client = [None] - - def factory(**kafka_admin_client_params): - params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() - _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) - return _admin_client[0] - - yield factory - - if _admin_client[0]: - _admin_client[0].close() - +def metrics(): + from kafka.metrics import Metrics -@pytest.fixture -def topic(kafka_broker, request): - """Return a topic fixture""" - topic_name = '%s_%s' % (request.node.name, random_string(10)) - kafka_broker.create_topics([topic_name]) - return topic_name + metrics = Metrics() + try: + yield metrics + finally: + metrics.close() @pytest.fixture @@ -173,41 +43,6 @@ def _set_conn_state(state): return conn -@pytest.fixture() -def send_messages(topic, kafka_producer, request): - """A factory that returns a send_messages function with a pre-populated - topic topic / producer.""" - - def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): - """ - messages is typically `range(0,100)` - partition is an int - """ - messages_and_futures = [] # [(message, produce_future),] - for i in number_range: - # request.node.name provides the test name (including parametrized values) - encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') - future = kafka_producer.send(topic, value=encoded_msg, partition=partition) - messages_and_futures.append((encoded_msg, future)) - kafka_producer.flush() - for (msg, f) in messages_and_futures: - assert f.succeeded() - return [msg for (msg, f) in messages_and_futures] - - return _send_messages - - -@pytest.fixture -def metrics(): - from kafka.metrics import Metrics - - metrics = Metrics() - try: - yield metrics - finally: - metrics.close() - - @pytest.fixture def client(conn, mocker): from kafka import KafkaClient diff --git a/test/integration/conftest.py b/test/integration/conftest.py new file mode 100644 index 000000000..8af729296 --- /dev/null +++ b/test/integration/conftest.py @@ -0,0 +1,168 @@ +from __future__ import absolute_import + +import os +import uuid + +import pytest + +from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from test.testutil import env_kafka_version, random_string +from test.integration.fixtures import KafkaFixture, ZookeeperFixture + + +@pytest.fixture(scope="module") +def zookeeper(): + """Return a Zookeeper fixture""" + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + yield ZookeeperFixture.instance(host=host, port=port, external=True) + else: + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() + + +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): + """Return a Kafka broker fixture""" + if "KAFKA_URI" in os.environ: + parse = urlparse(os.environ["KAFKA_URI"]) + (host, port) = (parse.hostname, parse.port) + return KafkaFixture.instance(0, host=host, port=port, external=True) + else: + return kafka_broker_factory() + + +@pytest.fixture(scope="module") +def kafka_broker_factory(): + """Return a Kafka broker fixture factory""" + assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' + + _brokers = [] + def factory(**broker_params): + params = {} if broker_params is None else broker_params.copy() + params.setdefault('partitions', 4) + node_id = params.pop('node_id', 0) + broker = KafkaFixture.instance(node_id, **params) + _brokers.append(broker) + return broker + + yield factory + + zks = set() + for broker in _brokers: + zks.add(broker.zookeeper) + broker.close() + for zk in zks: + if zk: + zk.close() + + +@pytest.fixture +def kafka_client(kafka_broker, request): + """Return a KafkaClient fixture""" + (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) + yield client + client.close() + + +@pytest.fixture +def kafka_consumer(kafka_consumer_factory): + """Return a KafkaConsumer fixture""" + return kafka_consumer_factory() + + +@pytest.fixture +def kafka_consumer_factory(kafka_broker, topic, request): + """Return a KafkaConsumer factory fixture""" + _consumer = [None] + + def factory(topics=(topic,), **kafka_consumer_params): + params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() + params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + params.setdefault('auto_offset_reset', 'earliest') + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params)) + return _consumer[0] + + yield factory + + if _consumer[0]: + _consumer[0].close() + + +@pytest.fixture +def kafka_producer(kafka_producer_factory): + """Return a KafkaProducer fixture""" + yield kafka_producer_factory() + + +@pytest.fixture +def kafka_producer_factory(kafka_broker, request): + """Return a KafkaProduce factory fixture""" + _producer = [None] + + def factory(**kafka_producer_params): + params = {} if kafka_producer_params is None else kafka_producer_params.copy() + params.setdefault('client_id', 'producer_%s' % (request.node.name,)) + _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) + return _producer[0] + + yield factory + + if _producer[0]: + _producer[0].close() + + +@pytest.fixture +def kafka_admin_client(kafka_admin_client_factory): + """Return a KafkaAdminClient fixture""" + yield kafka_admin_client_factory() + + +@pytest.fixture +def kafka_admin_client_factory(kafka_broker): + """Return a KafkaAdminClient factory fixture""" + _admin_client = [None] + + def factory(**kafka_admin_client_params): + params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() + _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) + return _admin_client[0] + + yield factory + + if _admin_client[0]: + _admin_client[0].close() + + +@pytest.fixture +def topic(kafka_broker, request): + """Return a topic fixture""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name + + +@pytest.fixture() +def send_messages(topic, kafka_producer, request): + """A factory that returns a send_messages function with a pre-populated + topic topic / producer.""" + + def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): + """ + messages is typically `range(0,100)` + partition is an int + """ + messages_and_futures = [] # [(message, produce_future),] + for i in number_range: + # request.node.name provides the test name (including parametrized values) + encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') + future = kafka_producer.send(topic, value=encoded_msg, partition=partition) + messages_and_futures.append((encoded_msg, future)) + kafka_producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + return [msg for (msg, f) in messages_and_futures] + + return _send_messages diff --git a/test/fixtures.py b/test/integration/fixtures.py similarity index 99% rename from test/fixtures.py rename to test/integration/fixtures.py index 3adb87a97..b9baf5223 100644 --- a/test/fixtures.py +++ b/test/integration/fixtures.py @@ -66,7 +66,7 @@ class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') project_root = os.environ.get('PROJECT_ROOT', - os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) diff --git a/test/test_admin_integration.py b/test/integration/test_admin_integration.py similarity index 100% rename from test/test_admin_integration.py rename to test/integration/test_admin_integration.py diff --git a/test/test_consumer_group.py b/test/integration/test_consumer_group.py similarity index 100% rename from test/test_consumer_group.py rename to test/integration/test_consumer_group.py diff --git a/test/test_consumer_integration.py b/test/integration/test_consumer_integration.py similarity index 100% rename from test/test_consumer_integration.py rename to test/integration/test_consumer_integration.py diff --git a/test/test_producer.py b/test/integration/test_producer_integration.py similarity index 100% rename from test/test_producer.py rename to test/integration/test_producer_integration.py diff --git a/test/test_sasl_integration.py b/test/integration/test_sasl_integration.py similarity index 100% rename from test/test_sasl_integration.py rename to test/integration/test_sasl_integration.py diff --git a/test/test_client_async.py b/test/test_client_async.py index 276926116..acc400f9c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -23,12 +23,29 @@ @pytest.fixture -def cli(mocker, conn): +def client_poll_mocked(mocker): + cli = KafkaClient(request_timeout_ms=9999999, + reconnect_backoff_ms=2222, + connections_max_idle_ms=float('inf'), + api_version=(0, 9)) + mocker.patch.object(cli, '_poll') + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + try: + yield cli + finally: + cli._close() + + +@pytest.fixture +def client_selector_mocked(mocker, conn): client = KafkaClient(api_version=(0, 9)) mocker.patch.object(client, '_selector') client.poll(future=client.cluster.request_update()) - return client - + try: + yield client + finally: + client._close() def test_bootstrap(mocker, conn): conn.state = ConnectionStates.CONNECTED @@ -49,185 +66,181 @@ def test_bootstrap(mocker, conn): BrokerMetadata(1, 'bar', 34, None)]) -def test_can_connect(cli, conn): +def test_can_connect(client_selector_mocked, conn): # Node is not in broker metadata - can't connect - assert not cli._can_connect(2) + assert not client_selector_mocked._can_connect(2) # Node is in broker metadata but not in _conns - assert 0 not in cli._conns - assert cli._can_connect(0) + assert 0 not in client_selector_mocked._conns + assert client_selector_mocked._can_connect(0) # Node is connected, can't reconnect - assert cli._init_connect(0) is True - assert not cli._can_connect(0) + assert client_selector_mocked._init_connect(0) is True + assert not client_selector_mocked._can_connect(0) # Node is disconnected, can connect - cli._conns[0].state = ConnectionStates.DISCONNECTED - assert cli._can_connect(0) + client_selector_mocked._conns[0].state = ConnectionStates.DISCONNECTED + assert client_selector_mocked._can_connect(0) # Node is disconnected, but blacked out conn.blacked_out.return_value = True - assert not cli._can_connect(0) + assert not client_selector_mocked._can_connect(0) -def test_init_connect(cli, conn): +def test_init_connect(client_selector_mocked, conn): # Node not in metadata, return False - assert not cli._init_connect(2) + assert not client_selector_mocked._init_connect(2) # New node_id creates a conn object - assert 0 not in cli._conns + assert 0 not in client_selector_mocked._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) - assert cli._init_connect(0) is True - assert cli._conns[0] is conn + assert client_selector_mocked._init_connect(0) is True + assert client_selector_mocked._conns[0] is conn -def test_conn_state_change(mocker, cli, conn): - sel = cli._selector +def test_conn_state_change(client_selector_mocked, conn): + sel = client_selector_mocked._selector node_id = 0 - cli._conns[node_id] = conn + client_selector_mocked._conns[node_id] = conn conn.state = ConnectionStates.CONNECTING sock = conn._sock - cli._conn_state_change(node_id, sock, conn) - assert node_id in cli._connecting + client_selector_mocked._conn_state_change(node_id, sock, conn) + assert node_id in client_selector_mocked._connecting sel.register.assert_called_with(sock, selectors.EVENT_WRITE, conn) conn.state = ConnectionStates.CONNECTED - cli._conn_state_change(node_id, sock, conn) - assert node_id not in cli._connecting + client_selector_mocked._conn_state_change(node_id, sock, conn) + assert node_id not in client_selector_mocked._connecting sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update - assert cli.cluster._need_update is False + assert client_selector_mocked.cluster._need_update is False conn.state = ConnectionStates.DISCONNECTED - cli._conn_state_change(node_id, sock, conn) - assert node_id not in cli._connecting - assert cli.cluster._need_update is True + client_selector_mocked._conn_state_change(node_id, sock, conn) + assert node_id not in client_selector_mocked._connecting + assert client_selector_mocked.cluster._need_update is True sel.unregister.assert_called_with(sock) conn.state = ConnectionStates.CONNECTING - cli._conn_state_change(node_id, sock, conn) - assert node_id in cli._connecting + client_selector_mocked._conn_state_change(node_id, sock, conn) + assert node_id in client_selector_mocked._connecting conn.state = ConnectionStates.DISCONNECTED - cli._conn_state_change(node_id, sock, conn) - assert node_id not in cli._connecting + client_selector_mocked._conn_state_change(node_id, sock, conn) + assert node_id not in client_selector_mocked._connecting -def test_ready(mocker, cli, conn): - maybe_connect = mocker.patch.object(cli, 'maybe_connect') +def test_ready(mocker, client_selector_mocked, conn): + maybe_connect = mocker.patch.object(client_selector_mocked, 'maybe_connect') node_id = 1 - cli.ready(node_id) + client_selector_mocked.ready(node_id) maybe_connect.assert_called_with(node_id) -def test_is_ready(mocker, cli, conn): - cli._init_connect(0) - cli._init_connect(1) +def test_is_ready(client_selector_mocked, conn): + client_selector_mocked._init_connect(0) + client_selector_mocked._init_connect(1) # metadata refresh blocks ready nodes - assert cli.is_ready(0) - assert cli.is_ready(1) - cli._metadata_refresh_in_progress = True - assert not cli.is_ready(0) - assert not cli.is_ready(1) + assert client_selector_mocked.is_ready(0) + assert client_selector_mocked.is_ready(1) + client_selector_mocked._metadata_refresh_in_progress = True + assert not client_selector_mocked.is_ready(0) + assert not client_selector_mocked.is_ready(1) # requesting metadata update also blocks ready nodes - cli._metadata_refresh_in_progress = False - assert cli.is_ready(0) - assert cli.is_ready(1) - cli.cluster.request_update() - cli.cluster.config['retry_backoff_ms'] = 0 - assert not cli._metadata_refresh_in_progress - assert not cli.is_ready(0) - assert not cli.is_ready(1) - cli.cluster._need_update = False + client_selector_mocked._metadata_refresh_in_progress = False + assert client_selector_mocked.is_ready(0) + assert client_selector_mocked.is_ready(1) + client_selector_mocked.cluster.request_update() + client_selector_mocked.cluster.config['retry_backoff_ms'] = 0 + assert not client_selector_mocked._metadata_refresh_in_progress + assert not client_selector_mocked.is_ready(0) + assert not client_selector_mocked.is_ready(1) + client_selector_mocked.cluster._need_update = False # if connection can't send more, not ready - assert cli.is_ready(0) + assert client_selector_mocked.is_ready(0) conn.can_send_more.return_value = False - assert not cli.is_ready(0) + assert not client_selector_mocked.is_ready(0) conn.can_send_more.return_value = True # disconnected nodes, not ready - assert cli.is_ready(0) + assert client_selector_mocked.is_ready(0) conn.state = ConnectionStates.DISCONNECTED - assert not cli.is_ready(0) + assert not client_selector_mocked.is_ready(0) -def test_close(mocker, cli, conn): - mocker.patch.object(cli, '_selector') - +def test_close(client_selector_mocked, conn): call_count = conn.close.call_count # Unknown node - silent - cli.close(2) + client_selector_mocked.close(2) call_count += 0 assert conn.close.call_count == call_count # Single node close - cli._init_connect(0) + client_selector_mocked._init_connect(0) assert conn.close.call_count == call_count - cli.close(0) + client_selector_mocked.close(0) call_count += 1 assert conn.close.call_count == call_count # All node close - cli._init_connect(1) - cli.close() + client_selector_mocked._init_connect(1) + client_selector_mocked.close() # +2 close: node 1, node bootstrap (node 0 already closed) call_count += 2 assert conn.close.call_count == call_count -def test_is_disconnected(cli, conn): +def test_is_disconnected(client_selector_mocked, conn): # False if not connected yet conn.state = ConnectionStates.DISCONNECTED - assert not cli.is_disconnected(0) + assert not client_selector_mocked.is_disconnected(0) - cli._init_connect(0) - assert cli.is_disconnected(0) + client_selector_mocked._init_connect(0) + assert client_selector_mocked.is_disconnected(0) conn.state = ConnectionStates.CONNECTING - assert not cli.is_disconnected(0) + assert not client_selector_mocked.is_disconnected(0) conn.state = ConnectionStates.CONNECTED - assert not cli.is_disconnected(0) + assert not client_selector_mocked.is_disconnected(0) -def test_send(cli, conn): +def test_send(client_selector_mocked, conn): # Send to unknown node => raises AssertionError try: - cli.send(2, None) + client_selector_mocked.send(2, None) assert False, 'Exception not raised' except AssertionError: pass # Send to disconnected node => NodeNotReady conn.state = ConnectionStates.DISCONNECTED - f = cli.send(0, None) + f = client_selector_mocked.send(0, None) assert f.failed() assert isinstance(f.exception, Errors.NodeNotReadyError) conn.state = ConnectionStates.CONNECTED - cli._init_connect(0) + client_selector_mocked._init_connect(0) # ProduceRequest w/ 0 required_acks -> no response request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False - ret = cli.send(0, request) + ret = client_selector_mocked.send(0, request) conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None) assert isinstance(ret, Future) request = MetadataRequest[0]([]) - cli.send(0, request) + client_selector_mocked.send(0, request) conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None) -def test_poll(mocker): - metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') - ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms') - _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(api_version=(0, 9)) +def test_poll(mocker, client_poll_mocked): + metadata = mocker.patch.object(client_poll_mocked, '_maybe_refresh_metadata') + ifr_request_timeout = mocker.patch.object(client_poll_mocked, '_next_ifr_request_timeout_ms') now = time.time() t = mocker.patch('time.time') t.return_value = now @@ -235,18 +248,18 @@ def test_poll(mocker): # metadata timeout wins ifr_request_timeout.return_value = float('inf') metadata.return_value = 1000 - cli.poll() - _poll.assert_called_with(1.0) + client_poll_mocked.poll() + client_poll_mocked._poll.assert_called_with(1.0) # user timeout wins - cli.poll(timeout_ms=250) - _poll.assert_called_with(0.25) + client_poll_mocked.poll(timeout_ms=250) + client_poll_mocked._poll.assert_called_with(0.25) # ifr request timeout wins ifr_request_timeout.return_value = 30000 metadata.return_value = 1000000 - cli.poll() - _poll.assert_called_with(30.0) + client_poll_mocked.poll() + client_poll_mocked._poll.assert_called_with(30.0) def test__poll(): @@ -287,80 +300,66 @@ def test_set_topics(mocker): request_update.assert_not_called() -@pytest.fixture -def client(mocker): - _poll = mocker.patch.object(KafkaClient, '_poll') - - cli = KafkaClient(request_timeout_ms=9999999, - reconnect_backoff_ms=2222, - connections_max_idle_ms=float('inf'), - api_version=(0, 9)) - - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - return cli - - -def test_maybe_refresh_metadata_ttl(mocker, client): - client.cluster.ttl.return_value = 1234 +def test_maybe_refresh_metadata_ttl(client_poll_mocked): + client_poll_mocked.cluster.ttl.return_value = 1234 - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(1.234) + client_poll_mocked.poll(timeout_ms=12345678) + client_poll_mocked._poll.assert_called_with(1.234) -def test_maybe_refresh_metadata_backoff(mocker, client): - mocker.patch.object(client, 'least_loaded_node', return_value=None) - mocker.patch.object(client, 'least_loaded_node_refresh_ms', return_value=4321) +def test_maybe_refresh_metadata_backoff(mocker, client_poll_mocked): + mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value=None) + mocker.patch.object(client_poll_mocked, 'least_loaded_node_refresh_ms', return_value=4321) now = time.time() t = mocker.patch('time.time') t.return_value = now - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(4.321) + client_poll_mocked.poll(timeout_ms=12345678) + client_poll_mocked._poll.assert_called_with(4.321) -def test_maybe_refresh_metadata_in_progress(mocker, client): - client._metadata_refresh_in_progress = True +def test_maybe_refresh_metadata_in_progress(client_poll_mocked): + client_poll_mocked._metadata_refresh_in_progress = True - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(9999.999) # request_timeout_ms + client_poll_mocked.poll(timeout_ms=12345678) + client_poll_mocked._poll.assert_called_with(9999.999) # request_timeout_ms -def test_maybe_refresh_metadata_update(mocker, client): - mocker.patch.object(client, 'least_loaded_node', return_value='foobar') - mocker.patch.object(client, '_can_send_request', return_value=True) - send = mocker.patch.object(client, 'send') - client.cluster.need_all_topic_metadata = True +def test_maybe_refresh_metadata_update(mocker, client_poll_mocked): + mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=True) + send = mocker.patch.object(client_poll_mocked, 'send') + client_poll_mocked.cluster.need_all_topic_metadata = True - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(9999.999) # request_timeout_ms - assert client._metadata_refresh_in_progress + client_poll_mocked.poll(timeout_ms=12345678) + client_poll_mocked._poll.assert_called_with(9999.999) # request_timeout_ms + assert client_poll_mocked._metadata_refresh_in_progress request = MetadataRequest[0]([]) send.assert_called_once_with('foobar', request, wakeup=False) -def test_maybe_refresh_metadata_cant_send(mocker, client): - mocker.patch.object(client, 'least_loaded_node', return_value='foobar') - mocker.patch.object(client, '_can_send_request', return_value=False) - mocker.patch.object(client, '_can_connect', return_value=True) - mocker.patch.object(client, '_init_connect', return_value=True) +def test_maybe_refresh_metadata_cant_send(mocker, client_poll_mocked): + mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=False) + mocker.patch.object(client_poll_mocked, '_can_connect', return_value=True) + mocker.patch.object(client_poll_mocked, '_init_connect', return_value=True) now = time.time() t = mocker.patch('time.time') t.return_value = now # first poll attempts connection - client.poll() - client._poll.assert_called() - client._init_connect.assert_called_once_with('foobar') + client_poll_mocked.poll() + client_poll_mocked._poll.assert_called() + client_poll_mocked._init_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection - client._connecting.add('foobar') - client._can_connect.reset_mock() - client.poll() - client._poll.assert_called() - assert not client._can_connect.called - assert not client._metadata_refresh_in_progress + client_poll_mocked._connecting.add('foobar') + client_poll_mocked._can_connect.reset_mock() + client_poll_mocked.poll() + client_poll_mocked._poll.assert_called() + assert not client_poll_mocked._can_connect.called + assert not client_poll_mocked._metadata_refresh_in_progress def test_schedule(): diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 00a929399..8c114c90f 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -24,14 +24,6 @@ from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod -@pytest.fixture -def client(conn, mocker): - cli = KafkaClient(api_version=(0, 9)) - mocker.patch.object(cli, '_init_connect', return_value=True) - try: - yield cli - finally: - cli._close() @pytest.fixture def coordinator(client, metrics, mocker): diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 3fc0c55ae..80bd0e42d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -26,11 +26,6 @@ from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition -@pytest.fixture -def client(): - return KafkaClient(bootstrap_servers=(), api_version=(0, 9)) - - @pytest.fixture def subscription_state(): return SubscriptionState() diff --git a/test/test_metrics.py b/test/test_metrics.py index 308ea5831..07c0e838a 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -19,23 +19,6 @@ def time_keeper(): return TimeKeeper() -@pytest.fixture -def config(): - return MetricConfig() - - -@pytest.fixture -def reporter(): - return DictReporter() - - -@pytest.fixture -def metrics(request, config, reporter): - metrics = Metrics(config, [reporter], enable_expiration=True) - yield metrics - metrics.close() - - def test_MetricName(): # The Java test only cover the differences between the deprecated # constructors, so I'm skipping them but doing some other basic testing. @@ -82,8 +65,9 @@ def test_MetricName(): assert name.tags == tags -def test_simple_stats(mocker, time_keeper, config, metrics): +def test_simple_stats(mocker, time_keeper, metrics): mocker.patch('time.time', side_effect=time_keeper.time) + config = metrics._config measurable = ConstantMeasurable()