From 824ef6f1ad3b931a4331997b1ddca496d348121e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 10:52:43 -0700 Subject: [PATCH] Fix external kafka/zk fixtures for testing --- test/conftest.py | 22 +++++++--- test/fixtures.py | 108 ++++++++++++++++------------------------------- test/service.py | 10 ++++- 3 files changed, 62 insertions(+), 78 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index d54a91243..bf1fa6687 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,24 +1,36 @@ from __future__ import absolute_import +import os import uuid import pytest +from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from test.testutil import env_kafka_version, random_string from test.fixtures import KafkaFixture, ZookeeperFixture @pytest.fixture(scope="module") def zookeeper(): """Return a Zookeeper fixture""" - zk_instance = ZookeeperFixture.instance() - yield zk_instance - zk_instance.close() + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + yield ZookeeperFixture.instance(host=host, port=port, external=True) + else: + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() @pytest.fixture(scope="module") -def kafka_broker(kafka_broker_factory): +def kafka_broker(kafka_broker_factory, zookeeper): """Return a Kafka broker fixture""" - return kafka_broker_factory()[0] + if "KAFKA_URI" in os.environ: + parse = urlparse(os.environ["KAFKA_URI"]) + (host, port) = (parse.hostname, parse.port) + return KafkaFixture.instance(0, zookeeper, host=host, port=port, external=True) + else: + return kafka_broker_factory()[0] @pytest.fixture(scope="module") diff --git a/test/fixtures.py b/test/fixtures.py index c9f138ef5..9843d5a2b 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -10,8 +10,7 @@ import uuid import py -from kafka.vendor.six.moves import urllib, range -from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from kafka.vendor.six.moves import range from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError @@ -74,43 +73,6 @@ def __init__(self): if not os.path.isdir(self.kafka_root): raise FileNotFoundError(self.kafka_root) - @classmethod - def download_official_distribution(cls, - kafka_version=None, - scala_version=None, - output_dir=None): - if not kafka_version: - kafka_version = cls.kafka_version - if not scala_version: - scala_version = cls.scala_version - if not output_dir: - output_dir = os.path.join(cls.project_root, 'servers', 'dist') - - distfile = 'kafka_%s-%s' % (scala_version, kafka_version,) - url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,) - output_file = os.path.join(output_dir, distfile + '.tgz') - - if os.path.isfile(output_file): - log.info("Found file already on disk: %s", output_file) - return output_file - - # New tarballs are .tgz, older ones are sometimes .tar.gz - try: - url = url_base + distfile + '.tgz' - log.info("Attempting to download %s", url) - response = urllib.request.urlopen(url) - except urllib.error.HTTPError: - log.exception("HTTP Error") - url = url_base + distfile + '.tar.gz' - log.info("Attempting to download %s", url) - response = urllib.request.urlopen(url) - - log.info("Saving distribution file to %s", output_file) - with open(output_file, 'w') as output_file_fd: - output_file_fd.write(response.read()) - - return output_file - @classmethod def test_resource(cls, filename): path = os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename) @@ -169,23 +131,18 @@ def dump_logs(self): class ZookeeperFixture(Fixture): @classmethod - def instance(cls): - if "ZOOKEEPER_URI" in os.environ: - parse = urlparse(os.environ["ZOOKEEPER_URI"]) - (host, port) = (parse.hostname, parse.port) - fixture = ExternalService(host, port) - else: - (host, port) = ("127.0.0.1", None) - fixture = cls(host, port) - + def instance(cls, host=None, port=None, external=False): + if host is None: + host = "127.0.0.1" + fixture = cls(host, port, external=external) fixture.open() return fixture - def __init__(self, host, port, tmp_dir=None): + def __init__(self, host, port, external=False, tmp_dir=None): super(ZookeeperFixture, self).__init__() self.host = host self.port = port - + self.running = external self.tmp_dir = tmp_dir def kafka_run_class_env(self): @@ -198,6 +155,8 @@ def out(self, message): log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) def open(self): + if self.running: + return if self.tmp_dir is None: self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member self.tmp_dir.ensure(dir=True) @@ -262,34 +221,30 @@ class KafkaFixture(Fixture): @classmethod def instance(cls, broker_id, zookeeper, zk_chroot=None, - host=None, port=None, - transport='PLAINTEXT', replicas=1, partitions=2, + host=None, port=None, external=False, + transport='PLAINTEXT', replicas=1, partitions=4, sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") - if "KAFKA_URI" in os.environ: - parse = urlparse(os.environ["KAFKA_URI"]) - (host, port) = (parse.hostname, parse.port) - fixture = ExternalService(host, port) - else: - if host is None: - host = "localhost" - fixture = KafkaFixture(host, port, broker_id, - zookeeper, zk_chroot, - transport=transport, - replicas=replicas, partitions=partitions, - sasl_mechanism=sasl_mechanism, - auto_create_topic=auto_create_topic, - tmp_dir=tmp_dir) - - fixture.open() + if host is None: + host = "localhost" + fixture = KafkaFixture(host, port, broker_id, + zookeeper, zk_chroot, + external=external, + transport=transport, + replicas=replicas, partitions=partitions, + sasl_mechanism=sasl_mechanism, + auto_create_topic=auto_create_topic, + tmp_dir=tmp_dir) + + fixture.open() return fixture def __init__(self, host, port, broker_id, zookeeper, zk_chroot, replicas=1, partitions=2, transport='PLAINTEXT', sasl_mechanism=None, auto_create_topic=True, - tmp_dir=None): + tmp_dir=None, external=False): super(KafkaFixture, self).__init__() self.host = host @@ -321,9 +276,16 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.partitions = partitions self.tmp_dir = tmp_dir - self.running = False + self.external = external + + if self.external: + self.child = ExternalService(self.host, self.port) + (self._client,) = self.get_clients(1, client_id='_internal_client') + self.running = True + else: + self._client = None + self.running = False - self._client = None self.sasl_config = '' self.jaas_config = '' @@ -416,6 +378,8 @@ def _create_zk_chroot(self): self.out("Kafka chroot created in Zookeeper!") def start(self): + if self.running: + return True # Configure Kafka child process properties = self.tmp_dir.join("kafka.properties") jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") @@ -515,6 +479,8 @@ def __del__(self): self.close() def stop(self): + if self.external: + return if not self.running: self.out("Instance already stopped") return diff --git a/test/service.py b/test/service.py index e4e89f8fe..a53fab8da 100644 --- a/test/service.py +++ b/test/service.py @@ -29,6 +29,11 @@ def open(self): def close(self): pass + def dump_logs(self): + pass + + def wait_for(self, pattern, timeout=30): + pass class SpawnedService(threading.Thread): def __init__(self, args=None, env=None): @@ -52,8 +57,8 @@ def __init__(self, args=None, env=None): log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): - if self.alive: return - if self.child and self.child.poll() is None: return + if self.alive or (self.child and self.child.poll() is None): + return self.child = subprocess.Popen( self.args, @@ -76,6 +81,7 @@ def _despawn(self): else: self.child.kill() + # via threading.Thread def run(self): self._spawn() while True: