/usr/lib/python3/dist-packages/kafka/util.py is in python3-kafka 1.3.2-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | from __future__ import absolute_import
import atexit
import binascii
import collections
import struct
import sys
from threading import Thread, Event
import weakref
from kafka.vendor import six
from kafka.errors import BufferUnderflowError
def crc32(data):
crc = binascii.crc32(data)
# py2 and py3 behave a little differently
# CRC is encoded as a signed int in kafka protocol
# so we'll convert the py3 unsigned result to signed
if six.PY3 and crc >= 2**31:
crc -= 2**32
return crc
def write_int_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is not None and not isinstance(s, six.binary_type):
raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur):
if len(data) < cur + 2:
raise BufferUnderflowError("Not enough data left")
(strlen,) = struct.unpack('>h', data[cur:cur + 2])
if strlen == -1:
return None, cur + 2
cur += 2
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def read_int_string(data, cur):
if len(data) < cur + 4:
raise BufferUnderflowError(
"Not enough data left to read string len (%d < %d)" %
(len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1:
return None, cur + 4
cur += 4
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def relative_unpack(fmt, data, cur):
size = struct.calcsize(fmt)
if len(data) < cur + size:
raise BufferUnderflowError("Not enough data left")
out = struct.unpack(fmt, data[cur:cur + size])
return out, cur + size
def group_by_topic_and_partition(tuples):
out = collections.defaultdict(dict)
for t in tuples:
assert t.topic not in out or t.partition not in out[t.topic], \
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
t.topic, t.partition)
out[t.topic][t.partition] = t
return out
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
Arguments:
t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):
if t <= 0:
raise ValueError('Invalid timeout value')
if not callable(fn):
raise ValueError('fn must be callable')
self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None
def _timer(self, active):
# python2.6 Event.wait() always returns None
# python2.7 and greater returns the flag value (true/false)
# we want the flag value, so add an 'or' here for python2.6
# this is redundant for later python versions (FLAG OR FLAG == FLAG)
while not (active.wait(self.t) or active.is_set()):
self.fn(*self.args, **self.kwargs)
def start(self):
if self.thread is not None:
self.stop()
self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()
def stop(self):
if self.thread is None:
return
self.active.set()
self.thread.join(self.t + 1)
# noinspection PyAttributeOutsideInit
self.timer = None
self.fn = None
def __del__(self):
self.stop()
class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It
is based on http://stackoverflow.com/a/24287465.
Arguments:
object_dot_method: A bound instance method (i.e. 'object.method').
"""
def __init__(self, object_dot_method):
try:
self.target = weakref.ref(object_dot_method.__self__)
except AttributeError:
self.target = weakref.ref(object_dot_method.im_self)
self._target_id = id(self.target())
try:
self.method = weakref.ref(object_dot_method.__func__)
except AttributeError:
self.method = weakref.ref(object_dot_method.im_func)
self._method_id = id(self.method())
def __call__(self, *args, **kwargs):
"""
Calls the method on target with args and kwargs.
"""
return self.method()(self.target(), *args, **kwargs)
def __hash__(self):
return hash(self.target) ^ hash(self.method)
def __eq__(self, other):
if not isinstance(other, WeakMethod):
return False
return self._target_id == other._target_id and self._method_id == other._method_id
def try_method_on_system_exit(obj, method, *args, **kwargs):
def wrapper(_obj, _meth, *args, **kwargs):
try:
getattr(_obj, _meth)(*args, **kwargs)
except (ReferenceError, AttributeError):
pass
atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)
|