/usr/lib/python3/dist-packages/celery/utils/threads.py is in python3-celery 3.1.13-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | # -*- coding: utf-8 -*-
"""
celery.utils.threads
~~~~~~~~~~~~~~~~~~~~
Threading utilities.
"""
from __future__ import absolute_import, print_function
import os
import socket
import sys
import threading
import traceback
from contextlib import contextmanager
from celery.local import Proxy
from celery.five import THREAD_TIMEOUT_MAX, items
__all__ = ['bgThread', 'Local', 'LocalStack', 'LocalManager',
'get_ident', 'default_socket_timeout']
USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
PY3 = sys.version_info[0] == 3
@contextmanager
def default_socket_timeout(timeout):
prev = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
yield
socket.setdefaulttimeout(prev)
class bgThread(threading.Thread):
def __init__(self, name=None, **kwargs):
super(bgThread, self).__init__()
self._is_shutdown = threading.Event()
self._is_stopped = threading.Event()
self.daemon = True
self.name = name or self.__class__.__name__
def body(self):
raise NotImplementedError('subclass responsibility')
def on_crash(self, msg, *fmt, **kwargs):
print(msg.format(*fmt), file=sys.stderr)
exc_info = sys.exc_info()
try:
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, sys.stderr)
finally:
del(exc_info)
def run(self):
body = self.body
shutdown_set = self._is_shutdown.is_set
try:
while not shutdown_set():
try:
body()
except Exception as exc:
try:
self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
self._set_stopped()
finally:
os._exit(1) # exiting by normal means won't work
finally:
self._set_stopped()
def _set_stopped(self):
try:
self._is_stopped.set()
except TypeError: # pragma: no cover
# we lost the race at interpreter shutdown,
# so gc collected built-in modules.
pass
def stop(self):
"""Graceful shutdown."""
self._is_shutdown.set()
self._is_stopped.wait()
if self.is_alive():
self.join(THREAD_TIMEOUT_MAX)
try:
from greenlet import getcurrent as get_ident
except ImportError: # pragma: no cover
try:
from _thread import get_ident # noqa
except ImportError:
try:
from thread import get_ident # noqa
except ImportError: # pragma: no cover
try:
from _dummy_thread import get_ident # noqa
except ImportError:
from dummy_thread import get_ident # noqa
def release_local(local):
"""Releases the contents of the local for the current context.
This makes it possible to use locals without a manager.
Example::
>>> loc = Local()
>>> loc.foo = 42
>>> release_local(loc)
>>> hasattr(loc, 'foo')
False
With this function one can release :class:`Local` objects as well
as :class:`StackLocal` objects. However it is not possible to
release data held by proxies that way, one always has to retain
a reference to the underlying local object in order to be able
to release it.
.. versionadded:: 0.6.1
"""
local.__release_local__()
class Local(object):
__slots__ = ('__storage__', '__ident_func__')
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def __iter__(self):
return iter(items(self.__storage__))
def __call__(self, proxy):
"""Create a proxy for a name."""
return Proxy(self, proxy)
def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None)
def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
class _LocalStack(object):
"""This class works similar to a :class:`Local` but keeps a stack
of objects instead. This is best explained with an example::
>>> ls = LocalStack()
>>> ls.push(42)
>>> ls.top
42
>>> ls.push(23)
>>> ls.top
23
>>> ls.pop()
23
>>> ls.top
42
They can be force released by using a :class:`LocalManager` or with
the :func:`release_local` function but the correct way is to pop the
item from the stack after using. When the stack is empty it will
no longer be bound to the current context (and as such released).
By calling the stack without arguments it will return a proxy that
resolves to the topmost item on the stack.
"""
def __init__(self):
self._local = Local()
def __release_local__(self):
self._local.__release_local__()
def _get__ident_func__(self):
return self._local.__ident_func__
def _set__ident_func__(self, value):
object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__)
del _get__ident_func__, _set__ident_func__
def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError('object unbound')
return rv
return Proxy(_lookup)
def push(self, obj):
"""Pushes a new item to the stack"""
rv = getattr(self._local, 'stack', None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv
def pop(self):
"""Remove the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()
def __len__(self):
stack = getattr(self._local, 'stack', None)
return len(stack) if stack else 0
@property
def stack(self):
"""get_current_worker_task uses this to find
the original task that was executed by the worker."""
stack = getattr(self._local, 'stack', None)
if stack is not None:
return stack
return []
@property
def top(self):
"""The topmost item on the stack. If the stack is empty,
`None` is returned.
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
class LocalManager(object):
"""Local objects cannot manage themselves. For that you need a local
manager. You can pass a local manager multiple locals or add them
later by appending them to `manager.locals`. Everytime the manager
cleans up it, will clean up all the data left in the locals for this
context.
The `ident_func` parameter can be added to override the default ident
function for the wrapped locals.
"""
def __init__(self, locals=None, ident_func=None):
if locals is None:
self.locals = []
elif isinstance(locals, Local):
self.locals = [locals]
else:
self.locals = list(locals)
if ident_func is not None:
self.ident_func = ident_func
for local in self.locals:
object.__setattr__(local, '__ident_func__', ident_func)
else:
self.ident_func = get_ident
def get_ident(self):
"""Return the context identifier the local objects use internally
for this context. You cannot override this method to change the
behavior but use it to link other context local objects (such as
SQLAlchemy's scoped sessions) to the Werkzeug locals."""
return self.ident_func()
def cleanup(self):
"""Manually clean up the data in the locals for this context.
Call this at the end of the request or use `make_middleware()`.
"""
for local in self.locals:
release_local(local)
def __repr__(self):
return '<{0} storages: {1}>'.format(
self.__class__.__name__, len(self.locals))
class _FastLocalStack(threading.local):
def __init__(self):
self.stack = []
self.push = self.stack.append
self.pop = self.stack.pop
@property
def top(self):
try:
return self.stack[-1]
except (AttributeError, IndexError):
return None
def __len__(self):
return len(self.stack)
if USE_FAST_LOCALS: # pragma: no cover
LocalStack = _FastLocalStack
else:
# - See #706
# since each thread has its own greenlet we can just use those as
# identifiers for the context. If greenlets are not available we
# fall back to the current thread ident.
LocalStack = _LocalStack # noqa
|