/usr/lib/python2.7/dist-packages/celery/tests/concurrency/test_eventlet.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | from __future__ import absolute_import
import sys
from celery.app.defaults import is_pypy
from celery.concurrency.eventlet import (
apply_target,
Schedule,
Timer,
TaskPool,
)
from celery.tests.case import (
AppCase, Mock, SkipTest, mock_module, patch, patch_many, skip_if_pypy,
)
class EventletCase(AppCase):
@skip_if_pypy
def setup(self):
if is_pypy:
raise SkipTest('mock_modules not working on PyPy1.9')
try:
self.eventlet = __import__('eventlet')
except ImportError:
raise SkipTest(
'eventlet not installed, skipping related tests.')
@skip_if_pypy
def teardown(self):
for mod in [mod for mod in sys.modules if mod.startswith('eventlet')]:
try:
del(sys.modules[mod])
except KeyError:
pass
class test_aaa_eventlet_patch(EventletCase):
def test_aaa_is_patched(self):
with patch('eventlet.monkey_patch', create=True) as monkey_patch:
from celery import maybe_patch_concurrency
maybe_patch_concurrency(['x', '-P', 'eventlet'])
monkey_patch.assert_called_with()
eventlet_modules = (
'eventlet',
'eventlet.debug',
'eventlet.greenthread',
'eventlet.greenpool',
'greenlet',
)
class test_Schedule(EventletCase):
def test_sched(self):
with mock_module(*eventlet_modules):
with patch_many('eventlet.greenthread.spawn_after',
'greenlet.GreenletExit') as (spawn_after,
GreenletExit):
x = Schedule()
x.GreenletExit = KeyError
entry = Mock()
g = x._enter(1, 0, entry)
self.assertTrue(x.queue)
x._entry_exit(g, entry)
g.wait.side_effect = KeyError()
x._entry_exit(g, entry)
entry.cancel.assert_called_with()
self.assertFalse(x._queue)
x._queue.add(g)
x.clear()
x._queue.add(g)
g.cancel.side_effect = KeyError()
x.clear()
class test_TaskPool(EventletCase):
def test_pool(self):
with mock_module(*eventlet_modules):
with patch_many('eventlet.greenpool.GreenPool',
'eventlet.greenthread') as (GreenPool,
greenthread):
x = TaskPool()
x.on_start()
x.on_stop()
x.on_apply(Mock())
x._pool = None
x.on_stop()
self.assertTrue(x.getpid())
@patch('celery.concurrency.eventlet.base')
def test_apply_target(self, base):
apply_target(Mock(), getpid=Mock())
self.assertTrue(base.apply_target.called)
class test_Timer(EventletCase):
def test_timer(self):
x = Timer()
x.ensure_started()
x.schedule = Mock()
x.start()
x.stop()
x.schedule.clear.assert_called_with()
tref = Mock()
x.cancel(tref)
x.schedule.GreenletExit = KeyError
tref.cancel.side_effect = KeyError()
x.cancel(tref)
|