/usr/lib/python2.7/dist-packages/celery/loaders/default.py is in python-celery 3.1.23-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | # -*- coding: utf-8 -*-
"""
celery.loaders.default
~~~~~~~~~~~~~~~~~~~~~~
The default loader used when no custom app has been initialized.
"""
from __future__ import absolute_import
import os
import warnings
from celery.datastructures import DictAttribute
from celery.exceptions import NotConfigured
from celery.utils import strtobool
from .base import BaseLoader
__all__ = ['Loader', 'DEFAULT_CONFIG_MODULE']
DEFAULT_CONFIG_MODULE = 'celeryconfig'
#: Warns if configuration file is missing if :envvar:`C_WNOCONF` is set.
C_WNOCONF = strtobool(os.environ.get('C_WNOCONF', False))
class Loader(BaseLoader):
"""The loader used by the default app."""
def setup_settings(self, settingsdict):
return DictAttribute(settingsdict)
def read_configuration(self, fail_silently=True):
"""Read configuration from :file:`celeryconfig.py` and configure
celery and Django so it can be used by regular Python."""
configname = os.environ.get('CELERY_CONFIG_MODULE',
DEFAULT_CONFIG_MODULE)
try:
usercfg = self._import_config_module(configname)
except ImportError:
if not fail_silently:
raise
# billiard sets this if forked using execv
if C_WNOCONF and not os.environ.get('FORKED_BY_MULTIPROCESSING'):
warnings.warn(NotConfigured(
'No {module} module found! Please make sure it exists and '
'is available to Python.'.format(module=configname)))
return self.setup_settings({})
else:
self.configured = True
return self.setup_settings(usercfg)
|