/usr/lib/python3/dist-packages/kafka/util.py is in python3-kafka 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import binascii
import collections
import struct
import sys
from threading import Thread, Event
import six
from kafka.common import BufferUnderflowError
def crc32(data):
return binascii.crc32(data) & 0xffffffff
def write_int_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur):
if len(data) < cur + 2:
raise BufferUnderflowError("Not enough data left")
(strlen,) = struct.unpack('>h', data[cur:cur + 2])
if strlen == -1:
return None, cur + 2
cur += 2
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def read_int_string(data, cur):
if len(data) < cur + 4:
raise BufferUnderflowError(
"Not enough data left to read string len (%d < %d)" %
(len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1:
return None, cur + 4
cur += 4
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def relative_unpack(fmt, data, cur):
size = struct.calcsize(fmt)
if len(data) < cur + size:
raise BufferUnderflowError("Not enough data left")
out = struct.unpack(fmt, data[cur:cur + size])
return out, cur + size
def group_by_topic_and_partition(tuples):
out = collections.defaultdict(dict)
for t in tuples:
assert t.topic not in out or t.partition not in out[t.topic], \
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
t.topic, t.partition)
out[t.topic][t.partition] = t
return out
def kafka_bytestring(s):
"""
Takes a string or bytes instance
Returns bytes, encoding strings in utf-8 as necessary
"""
if isinstance(s, six.binary_type):
return s
if isinstance(s, six.string_types):
return s.encode('utf-8')
raise TypeError(s)
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
Arguments:
t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):
if t <= 0:
raise ValueError('Invalid timeout value')
if not callable(fn):
raise ValueError('fn must be callable')
self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None
def _timer(self, active):
# python2.6 Event.wait() always returns None
# python2.7 and greater returns the flag value (true/false)
# we want the flag value, so add an 'or' here for python2.6
# this is redundant for later python versions (FLAG OR FLAG == FLAG)
while not (active.wait(self.t) or active.is_set()):
self.fn(*self.args, **self.kwargs)
def start(self):
if self.thread is not None:
self.stop()
self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()
def stop(self):
if self.thread is None:
return
self.active.set()
self.thread.join(self.t + 1)
# noinspection PyAttributeOutsideInit
self.timer = None
self.fn = None
def __del__(self):
self.stop()
|