Skip to content

Use context managers in gzip_encode / gzip_decode #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 29, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,63 @@
from io import BytesIO
import gzip
from io import BytesIO
import struct

import six
from six.moves import xrange

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'

try:
import snappy
_has_snappy = True
_HAS_SNAPPY = True
except ImportError:
_has_snappy = False
_HAS_SNAPPY = False


def has_gzip():
return True


def has_snappy():
return _has_snappy
return _HAS_SNAPPY


def gzip_encode(payload):
buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
buffer.seek(0)
result = buffer.read()
buffer.close()
with BytesIO() as buf:

# Gzip context manager introduced in python 2.6
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode="w")
try:
gzipper.write(payload)
finally:
gzipper.close()

result = buf.getvalue()

return result


def gzip_decode(payload):
buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
buffer.close()
with BytesIO(payload) as buf:

# Gzip context manager introduced in python 2.6
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode='r')
try:
result = gzipper.read()
finally:
gzipper.close()

return result


def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy if xerial_compatible is set then the
stream is encoded in a fashion compatible with the xerial snappy library

The block size (xerial_blocksize) controls how frequent the blocking occurs
32k is the default in the xerial library.
The block size (xerial_blocksize) controls how frequent the blocking
occurs 32k is the default in the xerial library.

The format winds up being
+-------------+------------+--------------+------------+--------------+
Expand All @@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
length will always be <= blocksize.
"""

if not _has_snappy:
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")

if xerial_compatible:
Expand All @@ -74,7 +83,7 @@ def _chunker():
out = BytesIO()

header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])

out.write(header)
for chunk in _chunker():
Expand Down Expand Up @@ -113,13 +122,13 @@ def _detect_xerial_stream(payload):
"""

if len(payload) > 16:
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False


def snappy_decode(payload):
if not _has_snappy:
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")

if _detect_xerial_stream(payload):
Expand Down