/usr/lib/python2.7/dist-packages/kafka/codec.py is in python-kafka 1.3.2-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | from __future__ import absolute_import
import gzip
import io
import platform
import struct
from kafka.vendor import six
from kafka.vendor.six.moves import xrange # pylint: disable=import-error
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
except ImportError:
snappy = None
try:
import lz4f
import xxhash
except ImportError:
lz4f = None
PYPY = bool(platform.python_implementation() == 'PyPy')
def has_gzip():
return True
def has_snappy():
return snappy is not None
def has_lz4():
return lz4f is not None
def gzip_encode(payload, compresslevel=None):
if not compresslevel:
compresslevel = 9
buf = io.BytesIO()
# Gzip context manager introduced in python 2.7
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel)
try:
gzipper.write(payload)
finally:
gzipper.close()
return buf.getvalue()
def gzip_decode(payload):
buf = io.BytesIO(payload)
# Gzip context manager introduced in python 2.7
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode='r')
try:
return gzipper.read()
finally:
gzipper.close()
def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
"""Encodes the given data with snappy compression.
If xerial_compatible is set then the stream is encoded in a fashion
compatible with the xerial snappy library.
The block size (xerial_blocksize) controls how frequent the blocking occurs
32k is the default in the xerial library.
The format winds up being:
+-------------+------------+--------------+------------+--------------+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
+-------------+------------+--------------+------------+--------------+
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+-------------+------------+--------------+------------+--------------+
It is important to note that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the number
of bytes that will be present in the stream; so the length will always be
<= blocksize.
"""
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if not xerial_compatible:
return snappy.compress(payload)
out = io.BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack('!' + fmt, dat))
# Chunk through buffers to avoid creating intermediate slice copies
if PYPY:
# on pypy, snappy.compress() on a sliced buffer consumes the entire
# buffer... likely a python-snappy bug, so just use a slice copy
chunker = lambda payload, i, size: payload[i:size+i]
elif six.PY2:
# Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()
for chunk in (chunker(payload, i, xerial_blocksize)
for i in xrange(0, len(payload), xerial_blocksize)):
block = snappy.compress(chunk)
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(block)
return out.getvalue()
def _detect_xerial_stream(payload):
"""Detects if the data given might have been encoded with the blocking mode
of the xerial snappy library.
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
+--------+--------------+------------+---------+--------+
| byte | c-string | byte | int32 | int32 |
+--------+--------------+------------+---------+--------+
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
The pad appears to be to ensure that SNAPPY is a valid cstring
The version is the version of this format as written by xerial,
in the wild this is currently 1 as such we only support v1.
Compat is there to claim the miniumum supported version that
can read a xerial block stream, presently in the wild this is
1.
"""
if len(payload) > 16:
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False
def snappy_decode(payload):
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = io.BytesIO()
byt = payload[16:]
length = len(byt)
cursor = 0
while cursor < length:
block_size = struct.unpack_from('!i', byt[cursor:])[0]
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
cursor = end
out.seek(0)
return out.read()
else:
return snappy.decompress(payload)
def lz4_encode(payload):
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
return lz4f.compressFrame(payload)
def lz4_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
data = lz4f.decompressFrame(payload, ctx)
# lz4f python module does not expose how much of the payload was
# actually read if the decompression was only partial.
if data['next'] != 0:
raise RuntimeError('lz4f unable to decompress full payload')
return data['decomp']
def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
flg = data[4]
else:
flg = ord(data[4])
content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
header_size += 8
# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
return b''.join([
data[0:header_size-1],
hc,
data[header_size:]
])
def lz4_decode_old_kafka(payload):
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
flg = payload[4]
else:
flg = ord(payload[4])
content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
header_size += 8
# This should be the correct hc
hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
munged_payload = b''.join([
payload[0:header_size-1],
hc,
payload[header_size:]
])
return lz4_decode(munged_payload)
|