Skip to content

Vendor six consistently #1605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down Expand Up @@ -64,7 +66,7 @@ def run(args):
record = bytes(bytearray(args.record_size))
producer = KafkaProducer(compression_type=args.fixture_compression,
**props)
for i in xrange(args.num_records):
for i in range(args.num_records):
producer.send(topic=args.topic, value=record)
producer.flush()
producer.close()
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down Expand Up @@ -77,7 +79,7 @@ def run(args):
print('-> OK!')
print()

for i in xrange(args.num_records):
for i in range(args.num_records):
producer.send(topic=args.topic, value=record)
producer.flush()

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
from __future__ import print_function
import perf
import six
from kafka.vendor import six


test_data = [
Expand Down
4 changes: 2 additions & 2 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import struct

from kafka.vendor import six
from kafka.vendor.six.moves import xrange # pylint: disable=import-error
from kafka.vendor.six.moves import range

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
Expand Down Expand Up @@ -150,7 +150,7 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()

for chunk in (chunker(payload, i, xerial_blocksize)
for i in xrange(0, len(payload), xerial_blocksize)):
for i in range(0, len(payload), xerial_blocksize)):

block = snappy.compress(chunk)
block_size = len(block)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import random

from kafka.vendor.six.moves import xrange # pylint: disable=import-error
from kafka.vendor.six.moves import range

from kafka.producer.base import Producer

Expand Down Expand Up @@ -39,7 +39,7 @@ def _next_partition(self, topic):
# Randomize the initial partition that is returned
if self.random_start:
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
for _ in range(random.randint(0, num_partitions-1)):
next(self.partition_cycles[topic])

return next(self.partition_cycles[topic])
Expand Down
6 changes: 3 additions & 3 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import uuid

import py
from six.moves import urllib, xrange
from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401

from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
from kafka.client_async import KafkaClient
Expand All @@ -24,7 +24,7 @@
log = logging.getLogger(__name__)

def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in xrange(length))
return "".join(random.choice(string.ascii_letters) for i in range(length))

def version_str_to_list(version_str):
return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1)
Expand Down
2 changes: 1 addition & 1 deletion test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from mock import ANY, MagicMock, patch
from operator import itemgetter
import six
from kafka.vendor import six
from . import unittest

from kafka import SimpleClient
Expand Down
12 changes: 6 additions & 6 deletions test/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import struct

import pytest
from six.moves import xrange
from kafka.vendor.six.moves import range

from kafka.codec import (
has_snappy, has_gzip, has_lz4,
Expand All @@ -18,15 +18,15 @@


def test_gzip():
for i in xrange(1000):
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = gzip_decode(gzip_encode(b1))
assert b1 == b2


@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy():
for i in xrange(1000):
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = snappy_decode(snappy_encode(b1))
assert b1 == b2
Expand Down Expand Up @@ -86,7 +86,7 @@ def test_snappy_encode_xerial():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4():
for i in xrange(1000):
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
Expand All @@ -96,7 +96,7 @@ def test_lz4():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_old():
for i in xrange(1000):
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
assert len(b1) == len(b2)
Expand All @@ -106,7 +106,7 @@ def test_lz4_old():
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_incremental():
for i in xrange(1000):
for i in range(1000):
# lz4 max single block size is 4MB
# make sure we test with multiple-blocks
b1 = random_string(100).encode('utf-8') * 50000
Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time

import pytest
import six
from kafka.vendor import six

from kafka import SimpleClient
from kafka.conn import ConnectionStates
Expand Down
10 changes: 5 additions & 5 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import kafka.codec

import pytest
from six.moves import xrange
import six
from kafka.vendor.six.moves import range
from kafka.vendor import six

from . import unittest
from kafka import (
Expand Down Expand Up @@ -473,7 +473,7 @@ def test_offset_behavior__resuming_behavior(self):
)

# Grab the first 195 messages
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ]
self.assert_message_count(output_msgs1, 195)

# The total offset across both partitions should be at 180
Expand Down Expand Up @@ -603,7 +603,7 @@ def test_kafka_consumer__offset_commit_resume(self):

# Grab the first 180 messages
output_msgs1 = []
for _ in xrange(180):
for _ in range(180):
m = next(consumer1)
output_msgs1.append(m)
self.assert_message_count(output_msgs1, 180)
Expand All @@ -619,7 +619,7 @@ def test_kafka_consumer__offset_commit_resume(self):

# 181-200
output_msgs2 = []
for _ in xrange(20):
for _ in range(20):
m = next(consumer2)
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
Expand Down
2 changes: 1 addition & 1 deletion test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid

import pytest
from six.moves import range
from kafka.vendor.six.moves import range

from kafka import (
SimpleProducer, KeyedProducer,
Expand Down
6 changes: 3 additions & 3 deletions test/test_producer_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kafka.structs import (
ProduceResponsePayload, RetryOptions, TopicPartition)

from six.moves import queue, xrange
from kafka.vendor.six.moves import queue, range


class TestKafkaProducer(unittest.TestCase):
Expand Down Expand Up @@ -84,7 +84,7 @@ def test_producer_async_queue_overfilled(self, mock):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
self.assertEqual(producer.queue.qsize(), queue_size)
for _ in xrange(producer.queue.qsize()):
for _ in range(producer.queue.qsize()):
producer.queue.get()

def test_producer_sync_fail_on_error(self):
Expand Down Expand Up @@ -253,5 +253,5 @@ def send_side_effect(reqs, *args, **kwargs):
self.assertEqual(self.client.send_produce_request.call_count, 5)

def tearDown(self):
for _ in xrange(self.queue.qsize()):
for _ in range(self.queue.qsize()):
self.queue.get()
2 changes: 1 addition & 1 deletion test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import struct

import pytest
import six
from kafka.vendor import six

from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
Expand Down
2 changes: 1 addition & 1 deletion test/test_protocol_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from contextlib import contextmanager
import struct

import six
from kafka.vendor import six
from mock import patch, sentinel
from . import unittest

Expand Down
2 changes: 1 addition & 1 deletion test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import struct

import six
from kafka.vendor import six
from . import unittest

import kafka.errors
Expand Down