/usr/lib/python3/dist-packages/celery/tests/utils/test_dispatcher.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | from __future__ import absolute_import
import gc
import sys
import time
from celery.utils.dispatch import Signal
from celery.tests.case import Case
if sys.platform.startswith('java'):
def garbage_collect():
# Some JVM GCs will execute finalizers in a different thread, meaning
# we need to wait for that to complete before we go on looking for the
# effects of that.
gc.collect()
time.sleep(0.1)
elif hasattr(sys, 'pypy_version_info'):
def garbage_collect(): # noqa
# Collecting weakreferences can take two collections on PyPy.
gc.collect()
gc.collect()
else:
def garbage_collect(): # noqa
gc.collect()
def receiver_1_arg(val, **kwargs):
return val
class Callable(object):
def __call__(self, val, **kwargs):
return val
def a(self, val, **kwargs):
return val
a_signal = Signal(providing_args=['val'])
class DispatcherTests(Case):
"""Test suite for dispatcher (barely started)"""
def _testIsClean(self, signal):
"""Assert that everything has been cleaned up automatically"""
self.assertEqual(signal.receivers, [])
# force cleanup just in case
signal.receivers = []
def test_exact(self):
a_signal.connect(receiver_1_arg, sender=self)
expected = [(receiver_1_arg, 'test')]
result = a_signal.send(sender=self, val='test')
self.assertEqual(result, expected)
a_signal.disconnect(receiver_1_arg, sender=self)
self._testIsClean(a_signal)
def test_ignored_sender(self):
a_signal.connect(receiver_1_arg)
expected = [(receiver_1_arg, 'test')]
result = a_signal.send(sender=self, val='test')
self.assertEqual(result, expected)
a_signal.disconnect(receiver_1_arg)
self._testIsClean(a_signal)
def test_garbage_collected(self):
a = Callable()
a_signal.connect(a.a, sender=self)
expected = []
del a
garbage_collect()
result = a_signal.send(sender=self, val='test')
self.assertEqual(result, expected)
self._testIsClean(a_signal)
def test_multiple_registration(self):
a = Callable()
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
result = a_signal.send(sender=self, val='test')
self.assertEqual(len(result), 1)
self.assertEqual(len(a_signal.receivers), 1)
del a
del result
garbage_collect()
self._testIsClean(a_signal)
def test_uid_registration(self):
def uid_based_receiver_1(**kwargs):
pass
def uid_based_receiver_2(**kwargs):
pass
a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
self.assertEqual(len(a_signal.receivers), 1)
a_signal.disconnect(dispatch_uid='uid')
self._testIsClean(a_signal)
def test_robust(self):
def fails(val, **kwargs):
raise ValueError('this')
a_signal.connect(fails)
result = a_signal.send_robust(sender=self, val='test')
err = result[0][1]
self.assertTrue(isinstance(err, ValueError))
self.assertEqual(err.args, ('this',))
a_signal.disconnect(fails)
self._testIsClean(a_signal)
def test_disconnection(self):
receiver_1 = Callable()
receiver_2 = Callable()
receiver_3 = Callable()
a_signal.connect(receiver_1)
a_signal.connect(receiver_2)
a_signal.connect(receiver_3)
a_signal.disconnect(receiver_1)
del receiver_2
garbage_collect()
a_signal.disconnect(receiver_3)
self._testIsClean(a_signal)
|