diff --git a/kafka/client_async.py b/kafka/client_async.py index 7121ce7a7..4de05b33e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -27,7 +27,7 @@ from kafka.metrics.stats.rate import TimeUnit from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.metadata import MetadataRequest -from kafka.util import Dict, WeakMethod +from kafka.util import Dict, WeakMethod, ensure_valid_topic_name # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file from kafka.vendor import socketpair # noqa: F401 @@ -909,7 +909,13 @@ def add_topic(self, topic): Returns: Future: resolves after metadata request/response + + Raises: + TypeError: if topic is not a string + ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length """ + ensure_valid_topic_name(topic) + if topic in self._topics: return Future().success(set(self._topics)) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index abe37fb86..2b2bcb477 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -9,6 +9,7 @@ from kafka.errors import IllegalStateError from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata +from kafka.util import ensure_valid_topic_name log = logging.getLogger(__name__) @@ -43,10 +44,6 @@ class SubscriptionState(object): " (2) subscribe to topics matching a regex pattern," " (3) assign itself specific topic-partitions.") - # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 - _MAX_NAME_LENGTH = 249 - _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') - def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance @@ -123,24 +120,6 @@ def subscribe(self, topics=(), pattern=None, listener=None): raise TypeError('listener must be a ConsumerRebalanceListener') self.listener = listener - def _ensure_valid_topic_name(self, topic): - """ Ensures that the topic name is valid according to the kafka source. """ - - # See Kafka Source: - # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java - if topic is None: - raise TypeError('All topics must not be None') - if not isinstance(topic, six.string_types): - raise TypeError('All topics must be strings') - if len(topic) == 0: - raise ValueError('All topics must be non-empty strings') - if topic == '.' or topic == '..': - raise ValueError('Topic name cannot be "." or ".."') - if len(topic) > self._MAX_NAME_LENGTH: - raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) - if not self._TOPIC_LEGAL_CHARS.match(topic): - raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) - def change_subscription(self, topics): """Change the topic subscription. @@ -166,7 +145,7 @@ def change_subscription(self, topics): return for t in topics: - self._ensure_valid_topic_name(t) + ensure_valid_topic_name(t) log.info('Updating subscribed topics to: %s', topics) self.subscription = set(topics) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index b8ace0fc1..8da14af1c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -22,6 +22,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition +from kafka.util import ensure_valid_topic_name log = logging.getLogger(__name__) @@ -593,11 +594,15 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest Raises: KafkaTimeoutError: if unable to fetch topic metadata, or unable to obtain memory buffer prior to configured max_block_ms + TypeError: if topic is not a string + ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length + AssertionError: if KafkaProducer is closed, or key and value are both None """ assert not self._closed, 'KafkaProducer already closed!' assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 'Null messages require kafka >= 0.8.1') assert not (value is None and key is None), 'Need at least one: key or value' + ensure_valid_topic_name(topic) key_bytes = value_bytes = None try: assigned_partition = None diff --git a/kafka/util.py b/kafka/util.py index d067a063d..470200b1b 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import binascii +import re import time import weakref @@ -43,6 +44,29 @@ def inner_timeout_ms(fallback=None): return inner_timeout_ms +# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 +TOPIC_MAX_LENGTH = 249 +TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') + +def ensure_valid_topic_name(topic): + """ Ensures that the topic name is valid according to the kafka source. """ + + # See Kafka Source: + # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java + if topic is None: + raise TypeError('All topics must not be None') + if not isinstance(topic, six.string_types): + raise TypeError('All topics must be strings') + if len(topic) == 0: + raise ValueError('All topics must be non-empty strings') + if topic == '.' or topic == '..': + raise ValueError('Topic name cannot be "." or ".."') + if len(topic) > TOPIC_MAX_LENGTH: + raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(TOPIC_MAX_LENGTH, topic)) + if not TOPIC_LEGAL_CHARS.match(topic): + raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) + + class WeakMethod(object): """ Callable that weakly references a method and the object it is bound to. It diff --git a/test/test_subscription_state.py b/test/test_util.py similarity index 83% rename from test/test_subscription_state.py rename to test/test_util.py index 9718f6af4..875b252aa 100644 --- a/test/test_subscription_state.py +++ b/test/test_util.py @@ -3,7 +3,7 @@ import pytest -from kafka.consumer.subscription_state import SubscriptionState +from kafka.util import ensure_valid_topic_name @pytest.mark.parametrize(('topic_name', 'expectation'), [ (0, pytest.raises(TypeError)), @@ -20,6 +20,5 @@ ('name+with+plus', pytest.raises(ValueError)), ]) def test_topic_name_validation(topic_name, expectation): - state = SubscriptionState() with expectation: - state._ensure_valid_topic_name(topic_name) + ensure_valid_topic_name(topic_name)