/usr/lib/python2.7/dist-packages/celery/app/utils.py is in python-celery 3.1.6-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | # -*- coding: utf-8 -*-
"""
celery.app.utils
~~~~~~~~~~~~~~~~
App utilities: Compat settings, bugreport tool, pickling apps.
"""
from __future__ import absolute_import
import os
import platform as _platform
import re
from collections import Mapping
from types import ModuleType
from celery.datastructures import ConfigurationView
from celery.five import items, string_t, values
from celery.platforms import pyimplementation
from celery.utils.text import pretty
from celery.utils.imports import import_from_cwd, symbol_by_name, qualname
from .defaults import find
__all__ = ['Settings', 'appstr', 'bugreport',
'filter_hidden_settings', 'find_app']
#: Format used to generate bugreport information.
BUGREPORT_INFO = """
software -> celery:{celery_v} kombu:{kombu_v} py:{py_v}
billiard:{billiard_v} {driver_v}
platform -> system:{system} arch:{arch} imp:{py_i}
loader -> {loader}
settings -> transport:{transport} results:{results}
{human_settings}
"""
HIDDEN_SETTINGS = re.compile(
'API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|DATABASE',
re.IGNORECASE,
)
def appstr(app):
"""String used in __repr__ etc, to id app instances."""
return '{0}:0x{1:x}'.format(app.main or '__main__', id(app))
class Settings(ConfigurationView):
"""Celery settings object."""
@property
def CELERY_RESULT_BACKEND(self):
return self.first('CELERY_RESULT_BACKEND', 'CELERY_BACKEND')
@property
def BROKER_TRANSPORT(self):
return self.first('BROKER_TRANSPORT',
'BROKER_BACKEND', 'CARROT_BACKEND')
@property
def BROKER_BACKEND(self):
"""Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
return self.BROKER_TRANSPORT
@property
def BROKER_URL(self):
return (os.environ.get('CELERY_BROKER_URL') or
self.first('BROKER_URL', 'BROKER_HOST'))
@property
def CELERY_TIMEZONE(self):
# this way we also support django's time zone.
return self.first('CELERY_TIMEZONE', 'TIME_ZONE')
def without_defaults(self):
"""Return the current configuration, but without defaults."""
# the last stash is the default settings, so just skip that
return Settings({}, self._order[:-1])
def value_set_for(self, key):
return key in self.without_defaults()
def find_option(self, name, namespace='celery'):
"""Search for option by name.
Will return ``(namespace, key, type)`` tuple, e.g.::
>>> from proj.celery import app
>>> app.conf.find_option('disable_rate_limits')
('CELERY', 'DISABLE_RATE_LIMITS',
<Option: type->bool default->False>))
:param name: Name of option, cannot be partial.
:keyword namespace: Preferred namespace (``CELERY`` by default).
"""
return find(name, namespace)
def find_value_for_key(self, name, namespace='celery'):
"""Shortcut to ``get_by_parts(*find_option(name)[:-1])``"""
return self.get_by_parts(*self.find_option(name, namespace)[:-1])
def get_by_parts(self, *parts):
"""Return the current value for setting specified as a path.
Example::
>>> from proj.celery import app
>>> app.conf.get_by_parts('CELERY', 'DISABLE_RATE_LIMITS')
False
"""
return self['_'.join(part for part in parts if part)]
def table(self, with_defaults=False, censored=True):
filt = filter_hidden_settings if censored else lambda v: v
return filt(dict(
(k, v) for k, v in items(
self if with_defaults else self.without_defaults())
if k.isupper() and not k.startswith('_')
))
def humanize(self, with_defaults=False, censored=True):
"""Return a human readable string showing changes to the
configuration."""
return '\n'.join(
'{0}: {1}'.format(key, pretty(value, width=50))
for key, value in items(self.table(with_defaults, censored)))
class AppPickler(object):
"""Old application pickler/unpickler (< 3.1)."""
def __call__(self, cls, *args):
kwargs = self.build_kwargs(*args)
app = self.construct(cls, **kwargs)
self.prepare(app, **kwargs)
return app
def prepare(self, app, **kwargs):
app.conf.update(kwargs['changes'])
def build_kwargs(self, *args):
return self.build_standard_kwargs(*args)
def build_standard_kwargs(self, main, changes, loader, backend, amqp,
events, log, control, accept_magic_kwargs,
config_source=None):
return dict(main=main, loader=loader, backend=backend, amqp=amqp,
changes=changes, events=events, log=log, control=control,
set_as_current=False,
accept_magic_kwargs=accept_magic_kwargs,
config_source=config_source)
def construct(self, cls, **kwargs):
return cls(**kwargs)
def _unpickle_app(cls, pickler, *args):
"""Rebuild app for versions 2.5+"""
return pickler()(cls, *args)
def _unpickle_app_v2(cls, kwargs):
"""Rebuild app for versions 3.1+"""
kwargs['set_as_current'] = False
return cls(**kwargs)
def filter_hidden_settings(conf):
def maybe_censor(key, value, mask='*' * 8):
if isinstance(value, Mapping):
return filter_hidden_settings(value)
if isinstance(value, string_t) and HIDDEN_SETTINGS.search(key):
return mask
if isinstance(key, string_t) and 'BROKER_URL' in key.upper():
from kombu import Connection
return Connection(value).as_uri(mask=mask)
return value
return dict((k, maybe_censor(k, v)) for k, v in items(conf))
def bugreport(app):
"""Return a string containing information useful in bug reports."""
import billiard
import celery
import kombu
try:
conn = app.connection()
driver_v = '{0}:{1}'.format(conn.transport.driver_name,
conn.transport.driver_version())
transport = conn.transport_cls
except Exception:
transport = driver_v = ''
return BUGREPORT_INFO.format(
system=_platform.system(),
arch=', '.join(x for x in _platform.architecture() if x),
py_i=pyimplementation(),
celery_v=celery.VERSION_BANNER,
kombu_v=kombu.__version__,
billiard_v=billiard.__version__,
py_v=_platform.python_version(),
driver_v=driver_v,
transport=transport,
results=app.conf.CELERY_RESULT_BACKEND or 'disabled',
human_settings=app.conf.humanize(),
loader=qualname(app.loader.__class__),
)
def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
from .base import Celery
try:
sym = symbol_by_name(app, imp=imp)
except AttributeError:
# last part was not an attribute, but a module
sym = imp(app)
if isinstance(sym, ModuleType) and ':' not in app:
try:
found = sym.app
if isinstance(found, ModuleType):
raise AttributeError()
except AttributeError:
try:
found = sym.celery
if isinstance(found, ModuleType):
raise AttributeError()
except AttributeError:
if getattr(sym, '__path__', None):
try:
return find_app(
'{0}.celery'.format(app),
symbol_by_name=symbol_by_name, imp=imp,
)
except ImportError:
pass
for suspect in values(vars(sym)):
if isinstance(suspect, Celery):
return suspect
raise
else:
return found
else:
return found
return sym
|