/usr/share/pyshared/ZEO/tests/TestThread.py is in python-zodb 1:3.9.7-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | ##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""
import threading
import sys
class TestThread(threading.Thread):
"""Base class for defining threads that run from unittest.
The subclass should define a testrun() method instead of a run()
method.
Call cleanup() when the test is done with the thread, instead of join().
If the thread exits with an uncaught exception, it's captured and
re-raised when cleanup() is called. cleanup() should be called by
the main thread! Trying to tell unittest that a test failed from
another thread creates a nightmare of timing-depending cascading
failures and missed errors (tracebacks that show up on the screen,
but don't cause unittest to believe the test failed).
cleanup() also joins the thread. If the thread ended without raising
an uncaught exception, and the join doesn't succeed in the timeout
period, then the test is made to fail with a "Thread still alive"
message.
"""
def __init__(self):
threading.Thread.__init__(self)
# In case this thread hangs, don't stop Python from exiting.
self.setDaemon(1)
self._exc_info = None
def run(self):
try:
self.testrun()
except:
self._exc_info = sys.exc_info()
def cleanup(self, timeout=15):
self.join(timeout)
if self._exc_info:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
|