diff --git a/kafka/admin/client.py b/kafka/admin/client.py index cc126c6d6..7b1fbb2cb 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -470,14 +470,48 @@ def delete_topics(self, topics, timeout_ms=None): .format(version)) return response - # list topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() - # describe topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the controller + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): + """ + topics == None means "get all topics" + """ + version = self._matching_api_version(MetadataRequest) + if version <= 3: + if auto_topic_creation: + raise IncompatibleBrokerVersion( + "auto_topic_creation requires MetadataRequest >= v4, which" + " is not supported by Kafka {}" + .format(self.config['api_version'])) - # describe cluster functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() + request = MetadataRequest[version](topics=topics) + elif version <= 5: + request = MetadataRequest[version]( + topics=topics, + allow_auto_topic_creation=auto_topic_creation + ) + + future = self._send_request_to_node( + self._client.least_loaded_node(), + request + ) + self._wait_for_futures([future]) + return future.value + + def list_topics(self): + metadata = self._get_cluster_metadata(topics=None) + obj = metadata.to_object() + return [t['topic'] for t in obj['topics']] + + def describe_topics(self, topics=None): + metadata = self._get_cluster_metadata(topics=topics) + obj = metadata.to_object() + return obj['topics'] + + def describe_cluster(self): + metadata = self._get_cluster_metadata() + obj = metadata.to_object() + obj.pop('topics') # We have 'describe_topics' for this + return obj @staticmethod def _convert_describe_acls_response_to_acls(describe_response): diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index efaf63ea2..55bb58fdd 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -3,7 +3,7 @@ import abc from kafka.protocol.struct import Struct -from kafka.protocol.types import Int16, Int32, String, Schema +from kafka.protocol.types import Int16, Int32, String, Schema, Array class RequestHeader(Struct): @@ -47,6 +47,9 @@ def expect_response(self): """Override this method if an api request does not always generate a response""" return True + def to_object(self): + return _to_object(self.SCHEMA, self.fields) + class Response(Struct): __metaclass__ = abc.ABCMeta @@ -65,3 +68,30 @@ def API_VERSION(self): def SCHEMA(self): """An instance of Schema() representing the response structure""" pass + + def to_object(self): + return _to_object(self.SCHEMA, self.fields) + + +def _to_object(schema, data): + obj = {} + for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)): + if isinstance(data, dict): + val = data[name] + else: + val = data[idx] + + if isinstance(_type, Schema): + obj[name] = _to_object(_type, val) + elif isinstance(_type, Array): + if isinstance(_type.array_of, (Array, Schema)): + obj[name] = [ + _to_object(_type.array_of, x) + for x in val + ] + else: + obj[name] = val + else: + obj[name] = val + + return obj diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 676de1ba4..608ffc40a 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -12,14 +12,17 @@ class Struct(AbstractType): SCHEMA = Schema() def __init__(self, *args, **kwargs): + self.fields = {} if len(args) == len(self.SCHEMA.fields): for i, name in enumerate(self.SCHEMA.names): self.__dict__[name] = args[i] + self.fields[name] = args[i] elif len(args) > 0: raise ValueError('Args must be empty or mirror schema') else: for name in self.SCHEMA.names: self.__dict__[name] = kwargs.pop(name, None) + self.fields[name] = self.__dict__[name] if kwargs: raise ValueError('Keyword(s) not in schema %s: %s' % (list(self.SCHEMA.names), diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py new file mode 100644 index 000000000..a1af43756 --- /dev/null +++ b/test/test_object_conversion.py @@ -0,0 +1,223 @@ +from kafka.protocol.admin import Request +from kafka.protocol.admin import Response +from kafka.protocol.types import Schema +from kafka.protocol.types import Array +from kafka.protocol.types import Int16 +from kafka.protocol.types import String + +import pytest + +@pytest.mark.parametrize('superclass', (Request, Response)) +class TestObjectConversion: + def test_with_empty_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema() + + tc = TestClass() + tc.encode() + assert tc.to_object() == {} + + def test_with_basic_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myobject', Int16)) + + tc = TestClass(myobject=0) + tc.encode() + assert tc.to_object() == {'myobject': 0} + + def test_with_basic_array_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array(Int16))) + + tc = TestClass(myarray=[1,2,3]) + tc.encode() + assert tc.to_object()['myarray'] == [1, 2, 3] + + def test_with_complex_array_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subobject', Int16), + ('othersubobject', String('utf-8'))))) + + tc = TestClass( + myarray=[[10, 'hello']] + ) + tc.encode() + obj = tc.to_object() + assert len(obj['myarray']) == 1 + assert obj['myarray'][0]['subobject'] == 10 + assert obj['myarray'][0]['othersubobject'] == 'hello' + + def test_with_array_and_other(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subobject', Int16), + ('othersubobject', String('utf-8')))), + ('notarray', Int16)) + + tc = TestClass( + myarray=[[10, 'hello']], + notarray=42 + ) + + obj = tc.to_object() + assert len(obj['myarray']) == 1 + assert obj['myarray'][0]['subobject'] == 10 + assert obj['myarray'][0]['othersubobject'] == 'hello' + assert obj['notarray'] == 42 + + def test_with_nested_array(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subarray', Array(Int16)), + ('otherobject', Int16)))) + + tc = TestClass( + myarray=[ + [[1, 2], 2], + [[2, 3], 4], + ] + ) + print(tc.encode()) + + + obj = tc.to_object() + assert len(obj['myarray']) == 2 + assert obj['myarray'][0]['subarray'] == [1, 2] + assert obj['myarray'][0]['otherobject'] == 2 + assert obj['myarray'][1]['subarray'] == [2, 3] + assert obj['myarray'][1]['otherobject'] == 4 + + def test_with_complex_nested_array(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subarray', Array( + ('innertest', String('utf-8')), + ('otherinnertest', String('utf-8')))), + ('othersubarray', Array(Int16)))), + ('notarray', String('utf-8'))) + + tc = TestClass( + myarray=[ + [[['hello', 'hello'], ['hello again', 'hello again']], [0]], + [[['hello', 'hello again']], [1]], + ], + notarray='notarray' + ) + tc.encode() + + obj = tc.to_object() + + assert obj['notarray'] == 'notarray' + myarray = obj['myarray'] + assert len(myarray) == 2 + + assert myarray[0]['othersubarray'] == [0] + assert len(myarray[0]['subarray']) == 2 + assert myarray[0]['subarray'][0]['innertest'] == 'hello' + assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello' + assert myarray[0]['subarray'][1]['innertest'] == 'hello again' + assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again' + + assert myarray[1]['othersubarray'] == [1] + assert len(myarray[1]['subarray']) == 1 + assert myarray[1]['subarray'][0]['innertest'] == 'hello' + assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again' + +def test_with_metadata_response(): + from kafka.protocol.metadata import MetadataResponse_v5 + tc = MetadataResponse_v5( + throttle_time_ms=0, + brokers=[ + [0, 'testhost0', 9092, 'testrack0'], + [1, 'testhost1', 9092, 'testrack1'], + ], + cluster_id='abcd', + controller_id=0, + topics=[ + [0, 'testtopic1', False, [ + [0, 0, 0, [0, 1], [0, 1], []], + [0, 1, 1, [1, 0], [1, 0], []], + ], + ], [0, 'other-test-topic', True, [ + [0, 0, 0, [0, 1], [0, 1], []], + ] + ]] + ) + tc.encode() # Make sure this object encodes successfully + + + obj = tc.to_object() + + assert obj['throttle_time_ms'] == 0 + + assert len(obj['brokers']) == 2 + assert obj['brokers'][0]['node_id'] == 0 + assert obj['brokers'][0]['host'] == 'testhost0' + assert obj['brokers'][0]['port'] == 9092 + assert obj['brokers'][0]['rack'] == 'testrack0' + assert obj['brokers'][1]['node_id'] == 1 + assert obj['brokers'][1]['host'] == 'testhost1' + assert obj['brokers'][1]['port'] == 9092 + assert obj['brokers'][1]['rack'] == 'testrack1' + + assert obj['cluster_id'] == 'abcd' + assert obj['controller_id'] == 0 + + assert len(obj['topics']) == 2 + assert obj['topics'][0]['error_code'] == 0 + assert obj['topics'][0]['topic'] == 'testtopic1' + assert obj['topics'][0]['is_internal'] == False + assert len(obj['topics'][0]['partitions']) == 2 + assert obj['topics'][0]['partitions'][0]['error_code'] == 0 + assert obj['topics'][0]['partitions'][0]['partition'] == 0 + assert obj['topics'][0]['partitions'][0]['leader'] == 0 + assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['offline_replicas'] == [] + assert obj['topics'][0]['partitions'][1]['error_code'] == 0 + assert obj['topics'][0]['partitions'][1]['partition'] == 1 + assert obj['topics'][0]['partitions'][1]['leader'] == 1 + assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['offline_replicas'] == [] + + assert obj['topics'][1]['error_code'] == 0 + assert obj['topics'][1]['topic'] == 'other-test-topic' + assert obj['topics'][1]['is_internal'] == True + assert len(obj['topics'][1]['partitions']) == 1 + assert obj['topics'][1]['partitions'][0]['error_code'] == 0 + assert obj['topics'][1]['partitions'][0]['partition'] == 0 + assert obj['topics'][1]['partitions'][0]['leader'] == 0 + assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['offline_replicas'] == [] + + tc.encode()