Skip to content

Commit d02276e

Browse files
committed
Merge pull request #337 from dpkp/gzip_context
Use context managers in gzip_encode / gzip_decode
2 parents 1a5cb03 + fa43388 commit d02276e

File tree

1 file changed

+32
-23
lines changed

1 file changed

+32
-23
lines changed

kafka/codec.py

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,63 @@
1-
from io import BytesIO
21
import gzip
2+
from io import BytesIO
33
import struct
44

5-
import six
65
from six.moves import xrange
76

87
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
98
_XERIAL_V1_FORMAT = 'bccccccBii'
109

1110
try:
1211
import snappy
13-
_has_snappy = True
12+
_HAS_SNAPPY = True
1413
except ImportError:
15-
_has_snappy = False
14+
_HAS_SNAPPY = False
1615

1716

1817
def has_gzip():
1918
return True
2019

2120

2221
def has_snappy():
23-
return _has_snappy
22+
return _HAS_SNAPPY
2423

2524

2625
def gzip_encode(payload):
27-
buffer = BytesIO()
28-
handle = gzip.GzipFile(fileobj=buffer, mode="w")
29-
handle.write(payload)
30-
handle.close()
31-
buffer.seek(0)
32-
result = buffer.read()
33-
buffer.close()
26+
with BytesIO() as buf:
27+
28+
# Gzip context manager introduced in python 2.6
29+
# so old-fashioned way until we decide to not support 2.6
30+
gzipper = gzip.GzipFile(fileobj=buf, mode="w")
31+
try:
32+
gzipper.write(payload)
33+
finally:
34+
gzipper.close()
35+
36+
result = buf.getvalue()
37+
3438
return result
3539

3640

3741
def gzip_decode(payload):
38-
buffer = BytesIO(payload)
39-
handle = gzip.GzipFile(fileobj=buffer, mode='r')
40-
result = handle.read()
41-
handle.close()
42-
buffer.close()
42+
with BytesIO(payload) as buf:
43+
44+
# Gzip context manager introduced in python 2.6
45+
# so old-fashioned way until we decide to not support 2.6
46+
gzipper = gzip.GzipFile(fileobj=buf, mode='r')
47+
try:
48+
result = gzipper.read()
49+
finally:
50+
gzipper.close()
51+
4352
return result
4453

4554

4655
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
4756
"""Encodes the given data with snappy if xerial_compatible is set then the
4857
stream is encoded in a fashion compatible with the xerial snappy library
4958
50-
The block size (xerial_blocksize) controls how frequent the blocking occurs
51-
32k is the default in the xerial library.
59+
The block size (xerial_blocksize) controls how frequent the blocking
60+
occurs 32k is the default in the xerial library.
5261
5362
The format winds up being
5463
+-------------+------------+--------------+------------+--------------+
@@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
6372
length will always be <= blocksize.
6473
"""
6574

66-
if not _has_snappy:
75+
if not has_snappy():
6776
raise NotImplementedError("Snappy codec is not available")
6877

6978
if xerial_compatible:
@@ -74,7 +83,7 @@ def _chunker():
7483
out = BytesIO()
7584

7685
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
77-
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
86+
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
7887

7988
out.write(header)
8089
for chunk in _chunker():
@@ -113,13 +122,13 @@ def _detect_xerial_stream(payload):
113122
"""
114123

115124
if len(payload) > 16:
116-
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
125+
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
117126
return header == _XERIAL_V1_HEADER
118127
return False
119128

120129

121130
def snappy_decode(payload):
122-
if not _has_snappy:
131+
if not has_snappy():
123132
raise NotImplementedError("Snappy codec is not available")
124133

125134
if _detect_xerial_stream(payload):

0 commit comments

Comments
 (0)