/usr/lib/python2.7/dist-packages/celery/tests/concurrency/test_gevent.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | from __future__ import absolute_import
from celery.concurrency.gevent import (
Schedule,
Timer,
TaskPool,
apply_timeout,
)
from celery.tests.case import (
AppCase, Mock, SkipTest, mock_module, patch, patch_many, skip_if_pypy,
)
gevent_modules = (
'gevent',
'gevent.monkey',
'gevent.greenlet',
'gevent.pool',
'greenlet',
)
class GeventCase(AppCase):
@skip_if_pypy
def setup(self):
try:
self.gevent = __import__('gevent')
except ImportError:
raise SkipTest(
'gevent not installed, skipping related tests.')
class test_gevent_patch(GeventCase):
def test_is_patched(self):
with mock_module(*gevent_modules):
with patch('gevent.monkey.patch_all', create=True) as patch_all:
import gevent
gevent.version_info = (1, 0, 0)
from celery import maybe_patch_concurrency
maybe_patch_concurrency(['x', '-P', 'gevent'])
self.assertTrue(patch_all.called)
class test_Schedule(AppCase):
def test_sched(self):
with mock_module(*gevent_modules):
with patch_many('gevent.greenlet',
'gevent.greenlet.GreenletExit') as (greenlet,
GreenletExit):
greenlet.Greenlet = object
x = Schedule()
greenlet.Greenlet = Mock()
x._Greenlet.spawn_later = Mock()
x._GreenletExit = KeyError
entry = Mock()
g = x._enter(1, 0, entry)
self.assertTrue(x.queue)
x._entry_exit(g)
g.kill.assert_called_with()
self.assertFalse(x._queue)
x._queue.add(g)
x.clear()
x._queue.add(g)
g.kill.side_effect = KeyError()
x.clear()
g = x._Greenlet()
g.cancel()
class test_TaskPool(AppCase):
def test_pool(self):
with mock_module(*gevent_modules):
with patch_many('gevent.spawn_raw', 'gevent.pool.Pool') as (
spawn_raw, Pool):
x = TaskPool()
x.on_start()
x.on_stop()
x.on_apply(Mock())
x._pool = None
x.on_stop()
x._pool = Mock()
x._pool._semaphore.counter = 1
x._pool.size = 1
x.grow()
self.assertEqual(x._pool.size, 2)
self.assertEqual(x._pool._semaphore.counter, 2)
x.shrink()
self.assertEqual(x._pool.size, 1)
self.assertEqual(x._pool._semaphore.counter, 1)
x._pool = [4, 5, 6]
self.assertEqual(x.num_processes, 3)
class test_Timer(AppCase):
def test_timer(self):
with mock_module(*gevent_modules):
x = Timer()
x.ensure_started()
x.schedule = Mock()
x.start()
x.stop()
x.schedule.clear.assert_called_with()
class test_apply_timeout(AppCase):
def test_apply_timeout(self):
class Timeout(Exception):
value = None
def __init__(self, value):
self.__class__.value = value
def __enter__(self):
return self
def __exit__(self, *exc_info):
pass
timeout_callback = Mock(name='timeout_callback')
apply_target = Mock(name='apply_target')
apply_timeout(
Mock(), timeout=10, callback=Mock(name='callback'),
timeout_callback=timeout_callback,
apply_target=apply_target, Timeout=Timeout,
)
self.assertEqual(Timeout.value, 10)
self.assertTrue(apply_target.called)
apply_target.side_effect = Timeout(10)
apply_timeout(
Mock(), timeout=10, callback=Mock(),
timeout_callback=timeout_callback,
apply_target=apply_target, Timeout=Timeout,
)
timeout_callback.assert_called_with(False, 10)
|