/usr/share/pyshared/async/test/test_thread.py is in python-async 0.6.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | # -*- coding: utf-8 -*-
""" Test thead classes and functions"""
from lib import *
from async.thread import *
from Queue import Queue
import time
class TestWorker(WorkerThread):
def __init__(self, *args, **kwargs):
super(TestWorker, self).__init__(*args, **kwargs)
self.reset()
def fun(self, arg):
self.called = True
self.arg = arg
return True
def make_assertion(self):
assert self.called
assert self.arg
self.reset()
def reset(self):
self.called = False
self.arg = None
class TestThreads(TestBase):
@terminate_threads
def test_worker_thread(self):
worker = TestWorker()
assert isinstance(worker.start(), WorkerThread)
# test different method types
standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
for function in (TestWorker.fun, worker.fun, standalone_func):
worker.inq.put((function, 1))
time.sleep(0.01)
worker.make_assertion()
# END for each function type
worker.stop_and_join()
|