/usr/lib/python3/dist-packages/celery/loaders/default.py is in python3-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | # -*- coding: utf-8 -*-
"""The default loader used when no custom app has been initialized."""
from __future__ import absolute_import, unicode_literals
import os
import warnings
from celery.exceptions import NotConfigured
from celery.utils.collections import DictAttribute
from celery.utils.serialization import strtobool
from .base import BaseLoader
__all__ = ['Loader', 'DEFAULT_CONFIG_MODULE']
DEFAULT_CONFIG_MODULE = 'celeryconfig'
#: Warns if configuration file is missing if :envvar:`C_WNOCONF` is set.
C_WNOCONF = strtobool(os.environ.get('C_WNOCONF', False))
class Loader(BaseLoader):
"""The loader used by the default app."""
def setup_settings(self, settingsdict):
return DictAttribute(settingsdict)
def read_configuration(self, fail_silently=True):
"""Read configuration from :file:`celeryconfig.py`."""
configname = os.environ.get('CELERY_CONFIG_MODULE',
DEFAULT_CONFIG_MODULE)
try:
usercfg = self._import_config_module(configname)
except ImportError:
if not fail_silently:
raise
# billiard sets this if forked using execv
if C_WNOCONF and not os.environ.get('FORKED_BY_MULTIPROCESSING'):
warnings.warn(NotConfigured(
'No {module} module found! Please make sure it exists and '
'is available to Python.'.format(module=configname)))
return self.setup_settings({})
else:
self.configured = True
return self.setup_settings(usercfg)
|