/usr/lib/python2.7/dist-packages/celery/tests/app/test_defaults.py is in python-celery 3.1.23-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | from __future__ import absolute_import
import sys
from importlib import import_module
from celery.app.defaults import NAMESPACES
from celery.tests.case import (
AppCase, Mock, patch, pypy_version, sys_platform,
)
class test_defaults(AppCase):
def setup(self):
self._prev = sys.modules.pop('celery.app.defaults', None)
def teardown(self):
if self._prev:
sys.modules['celery.app.defaults'] = self._prev
def test_option_repr(self):
self.assertTrue(repr(NAMESPACES['BROKER']['URL']))
def test_any(self):
val = object()
self.assertIs(self.defaults.Option.typemap['any'](val), val)
def test_default_pool_pypy_14(self):
with sys_platform('darwin'):
with pypy_version((1, 4, 0)):
self.assertEqual(self.defaults.DEFAULT_POOL, 'solo')
def test_default_pool_pypy_15(self):
with sys_platform('darwin'):
with pypy_version((1, 5, 0)):
self.assertEqual(self.defaults.DEFAULT_POOL, 'prefork')
def test_deprecated(self):
source = Mock()
source.CELERYD_LOG_LEVEL = 2
with patch('celery.utils.warn_deprecated') as warn:
self.defaults.find_deprecated_settings(source)
self.assertTrue(warn.called)
def test_default_pool_jython(self):
with sys_platform('java 1.6.51'):
self.assertEqual(self.defaults.DEFAULT_POOL, 'threads')
def test_find(self):
find = self.defaults.find
self.assertEqual(find('server_email')[2].default, 'celery@localhost')
self.assertEqual(find('default_queue')[2].default, 'celery')
self.assertEqual(find('celery_default_exchange')[2], 'celery')
@property
def defaults(self):
return import_module('celery.app.defaults')
|