Skip to content

Commit fb023fe

Browse files
committed
Prefer python-lz4 over lz4f if available
1 parent 3b899de commit fb023fe

File tree

4 files changed

+37
-15
lines changed

4 files changed

+37
-15
lines changed

docs/index.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,8 @@ Compression
113113
***********
114114

115115
kafka-python supports gzip compression/decompression natively. To produce or
116-
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
117-
may not work on python2.6). To enable snappy, install python-snappy (also
118-
requires snappy library).
116+
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
117+
To enable snappy, install python-snappy (also requires snappy library).
119118
See `Installation <install.html#optional-snappy-install>`_ for more information.
120119

121120

docs/install.rst

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ Bleeding-Edge
2626
Optional LZ4 install
2727
********************
2828

29-
To enable LZ4 compression/decompression, install lz4tools and xxhash:
29+
To enable LZ4 compression/decompression, install python-lz4:
3030

31-
>>> pip install lz4tools
32-
>>> pip install xxhash
31+
>>> pip install lz4
3332

34-
*Note*: these modules do not support python2.6
3533

3634
Optional Snappy install
3735
***********************

kafka/codec.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616
except ImportError:
1717
snappy = None
1818

19+
try:
20+
import lz4.frame as lz4
21+
except ImportError:
22+
lz4 = None
23+
1924
try:
2025
import lz4f
21-
import xxhash
2226
except ImportError:
2327
lz4f = None
2428

29+
try:
30+
import xxhash
31+
except ImportError:
32+
xxhash = None
33+
2534
PYPY = bool(platform.python_implementation() == 'PyPy')
2635

2736
def has_gzip():
@@ -33,7 +42,11 @@ def has_snappy():
3342

3443

3544
def has_lz4():
36-
return lz4f is not None
45+
if lz4 is not None:
46+
return True
47+
if lz4f is not None:
48+
return True
49+
return False
3750

3851

3952
def gzip_encode(payload, compresslevel=None):
@@ -181,13 +194,15 @@ def snappy_decode(payload):
181194
return snappy.decompress(payload)
182195

183196

184-
def lz4_encode(payload):
185-
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
186-
# pylint: disable-msg=no-member
187-
return lz4f.compressFrame(payload)
197+
if lz4:
198+
lz4_encode = lz4.compress # pylint: disable-msg=no-member
199+
elif lz4f:
200+
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
201+
else:
202+
lz4_encode = None
188203

189204

190-
def lz4_decode(payload):
205+
def lz4f_decode(payload):
191206
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
192207
# pylint: disable-msg=no-member
193208
ctx = lz4f.createDecompContext()
@@ -201,8 +216,17 @@ def lz4_decode(payload):
201216
return data['decomp']
202217

203218

219+
if lz4:
220+
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
221+
elif lz4f:
222+
lz4_decode = lz4f_decode
223+
else:
224+
lz4_decode = None
225+
226+
204227
def lz4_encode_old_kafka(payload):
205228
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
229+
assert xxhash is not None
206230
data = lz4_encode(payload)
207231
header_size = 7
208232
if isinstance(data[4], int):
@@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload):
224248

225249

226250
def lz4_decode_old_kafka(payload):
251+
assert xxhash is not None
227252
# Kafka's LZ4 code has a bug in its header checksum implementation
228253
header_size = 7
229254
if isinstance(payload[4], int):

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ deps =
1717
pytest-mock
1818
mock
1919
python-snappy
20-
lz4tools
20+
lz4
2121
xxhash
2222
py26: unittest2
2323
commands =

0 commit comments

Comments
 (0)