/usr/lib/python2.7/dist-packages/celery/tests/utils/test_saferef.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | from __future__ import absolute_import
from celery.five import range
from celery.utils.dispatch.saferef import safe_ref
from celery.tests.case import Case
class Class1(object):
def x(self):
pass
def fun(obj):
pass
class Class2(object):
def __call__(self, obj):
pass
class SaferefTests(Case):
def setUp(self):
ts = []
ss = []
for x in range(5000):
t = Class1()
ts.append(t)
s = safe_ref(t.x, self._closure)
ss.append(s)
ts.append(fun)
ss.append(safe_ref(fun, self._closure))
for x in range(30):
t = Class2()
ts.append(t)
s = safe_ref(t, self._closure)
ss.append(s)
self.ts = ts
self.ss = ss
self.closureCount = 0
def tearDown(self):
del self.ts
del self.ss
def test_in(self):
"""test_in
Test the "in" operator for safe references (cmp)
"""
for t in self.ts[:50]:
self.assertTrue(safe_ref(t.x) in self.ss)
def test_valid(self):
"""test_value
Test that the references are valid (return instance methods)
"""
for s in self.ss:
self.assertTrue(s())
def test_shortcircuit(self):
"""test_shortcircuit
Test that creation short-circuits to reuse existing references
"""
sd = {}
for s in self.ss:
sd[s] = 1
for t in self.ts:
if hasattr(t, 'x'):
self.assertIn(safe_ref(t.x), sd)
else:
self.assertIn(safe_ref(t), sd)
def test_representation(self):
"""test_representation
Test that the reference object's representation works
XXX Doesn't currently check the results, just that no error
is raised
"""
repr(self.ss[-1])
def _closure(self, ref):
"""Dumb utility mechanism to increment deletion counter"""
self.closureCount += 1
|