diff --git a/benchmarks/consumer_performance.py b/benchmarks/consumer_performance.py index 3e879ae58..5ffd3f5f6 100755 --- a/benchmarks/consumer_performance.py +++ b/benchmarks/consumer_performance.py @@ -10,6 +10,8 @@ import threading import traceback +from kafka.vendor.six.moves import range + from kafka import KafkaConsumer, KafkaProducer from test.fixtures import KafkaFixture, ZookeeperFixture @@ -64,7 +66,7 @@ def run(args): record = bytes(bytearray(args.record_size)) producer = KafkaProducer(compression_type=args.fixture_compression, **props) - for i in xrange(args.num_records): + for i in range(args.num_records): producer.send(topic=args.topic, value=record) producer.flush() producer.close() diff --git a/benchmarks/producer_performance.py b/benchmarks/producer_performance.py index e9587358e..0c29cbc24 100755 --- a/benchmarks/producer_performance.py +++ b/benchmarks/producer_performance.py @@ -9,6 +9,8 @@ import threading import traceback +from kafka.vendor.six.moves import range + from kafka import KafkaProducer from test.fixtures import KafkaFixture, ZookeeperFixture @@ -77,7 +79,7 @@ def run(args): print('-> OK!') print() - for i in xrange(args.num_records): + for i in range(args.num_records): producer.send(topic=args.topic, value=record) producer.flush() diff --git a/benchmarks/varint_speed.py b/benchmarks/varint_speed.py index 2c5cd620d..624a12a42 100644 --- a/benchmarks/varint_speed.py +++ b/benchmarks/varint_speed.py @@ -1,7 +1,7 @@ #!/usr/bin/env python from __future__ import print_function import perf -import six +from kafka.vendor import six test_data = [ diff --git a/kafka/codec.py b/kafka/codec.py index 4d180ddd3..aa9fc8291 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -6,7 +6,7 @@ import struct from kafka.vendor import six -from kafka.vendor.six.moves import xrange # pylint: disable=import-error +from kafka.vendor.six.moves import range _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' @@ -150,7 +150,7 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024): chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes() for chunk in (chunker(payload, i, xerial_blocksize) - for i in xrange(0, len(payload), xerial_blocksize)): + for i in range(0, len(payload), xerial_blocksize)): block = snappy.compress(chunk) block_size = len(block) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 91e0abc4c..e06e65954 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -4,7 +4,7 @@ import logging import random -from kafka.vendor.six.moves import xrange # pylint: disable=import-error +from kafka.vendor.six.moves import range from kafka.producer.base import Producer @@ -39,7 +39,7 @@ def _next_partition(self, topic): # Randomize the initial partition that is returned if self.random_start: num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in xrange(random.randint(0, num_partitions-1)): + for _ in range(random.randint(0, num_partitions-1)): next(self.partition_cycles[topic]) return next(self.partition_cycles[topic]) diff --git a/test/fixtures.py b/test/fixtures.py index 493a664a5..08cc951a2 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -12,8 +12,8 @@ import uuid import py -from six.moves import urllib, xrange -from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from kafka.vendor.six.moves import urllib, range +from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient from kafka.client_async import KafkaClient @@ -24,7 +24,7 @@ log = logging.getLogger(__name__) def random_string(length): - return "".join(random.choice(string.ascii_letters) for i in xrange(length)) + return "".join(random.choice(string.ascii_letters) for i in range(length)) def version_str_to_list(version_str): return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1) diff --git a/test/test_client.py b/test/test_client.py index c53983c94..1c689789b 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -2,7 +2,7 @@ from mock import ANY, MagicMock, patch from operator import itemgetter -import six +from kafka.vendor import six from . import unittest from kafka import SimpleClient diff --git a/test/test_codec.py b/test/test_codec.py index d31fc8674..e132c1d47 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -4,7 +4,7 @@ import struct import pytest -from six.moves import xrange +from kafka.vendor.six.moves import range from kafka.codec import ( has_snappy, has_gzip, has_lz4, @@ -18,7 +18,7 @@ def test_gzip(): - for i in xrange(1000): + for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = gzip_decode(gzip_encode(b1)) assert b1 == b2 @@ -26,7 +26,7 @@ def test_gzip(): @pytest.mark.skipif(not has_snappy(), reason="Snappy not available") def test_snappy(): - for i in xrange(1000): + for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = snappy_decode(snappy_encode(b1)) assert b1 == b2 @@ -86,7 +86,7 @@ def test_snappy_encode_xerial(): @pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', reason="python-lz4 crashes on old versions of pypy") def test_lz4(): - for i in xrange(1000): + for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) @@ -96,7 +96,7 @@ def test_lz4(): @pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', reason="python-lz4 crashes on old versions of pypy") def test_lz4_old(): - for i in xrange(1000): + for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1)) assert len(b1) == len(b2) @@ -106,7 +106,7 @@ def test_lz4_old(): @pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', reason="python-lz4 crashes on old versions of pypy") def test_lz4_incremental(): - for i in xrange(1000): + for i in range(1000): # lz4 max single block size is 4MB # make sure we test with multiple-blocks b1 = random_string(100).encode('utf-8') * 50000 diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index f9a41a46a..55cf6625d 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -4,7 +4,7 @@ import time import pytest -import six +from kafka.vendor import six from kafka import SimpleClient from kafka.conn import ConnectionStates diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index e6f140598..ce934ea1c 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -6,8 +6,8 @@ import kafka.codec import pytest -from six.moves import xrange -import six +from kafka.vendor.six.moves import range +from kafka.vendor import six from . import unittest from kafka import ( @@ -473,7 +473,7 @@ def test_offset_behavior__resuming_behavior(self): ) # Grab the first 195 messages - output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] + output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ] self.assert_message_count(output_msgs1, 195) # The total offset across both partitions should be at 180 @@ -603,7 +603,7 @@ def test_kafka_consumer__offset_commit_resume(self): # Grab the first 180 messages output_msgs1 = [] - for _ in xrange(180): + for _ in range(180): m = next(consumer1) output_msgs1.append(m) self.assert_message_count(output_msgs1, 180) @@ -619,7 +619,7 @@ def test_kafka_consumer__offset_commit_resume(self): # 181-200 output_msgs2 = [] - for _ in xrange(20): + for _ in range(20): m = next(consumer2) output_msgs2.append(m) self.assert_message_count(output_msgs2, 20) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 6533cfabb..35ce0d7a5 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -3,7 +3,7 @@ import uuid import pytest -from six.moves import range +from kafka.vendor.six.moves import range from kafka import ( SimpleProducer, KeyedProducer, diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py index 6d00116c3..ab80ee707 100644 --- a/test/test_producer_legacy.py +++ b/test/test_producer_legacy.py @@ -16,7 +16,7 @@ from kafka.structs import ( ProduceResponsePayload, RetryOptions, TopicPartition) -from six.moves import queue, xrange +from kafka.vendor.six.moves import queue, range class TestKafkaProducer(unittest.TestCase): @@ -84,7 +84,7 @@ def test_producer_async_queue_overfilled(self, mock): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) self.assertEqual(producer.queue.qsize(), queue_size) - for _ in xrange(producer.queue.qsize()): + for _ in range(producer.queue.qsize()): producer.queue.get() def test_producer_sync_fail_on_error(self): @@ -253,5 +253,5 @@ def send_side_effect(reqs, *args, **kwargs): self.assertEqual(self.client.send_produce_request.call_count, 5) def tearDown(self): - for _ in xrange(self.queue.qsize()): + for _ in range(self.queue.qsize()): self.queue.get() diff --git a/test/test_protocol.py b/test/test_protocol.py index d96365026..7abcefb46 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -3,7 +3,7 @@ import struct import pytest -import six +from kafka.vendor import six from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorRequest diff --git a/test/test_protocol_legacy.py b/test/test_protocol_legacy.py index d705e3a15..1341af003 100644 --- a/test/test_protocol_legacy.py +++ b/test/test_protocol_legacy.py @@ -2,7 +2,7 @@ from contextlib import contextmanager import struct -import six +from kafka.vendor import six from mock import patch, sentinel from . import unittest diff --git a/test/test_util.py b/test/test_util.py index fb592e8e6..a4dbaa5ab 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import struct -import six +from kafka.vendor import six from . import unittest import kafka.errors