/usr/share/pyshared/zope/server/taskthreads.py is in python-zope.server 3.8.6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Threaded Task Dispatcher
"""
from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread
from time import time, sleep
import logging
from zope.server.interfaces import ITaskDispatcher
from zope.interface import implements
log = logging.getLogger(__name__)
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task."""
implements(ITaskDispatcher)
stop_count = 0 # Number of threads that will stop soon.
def __init__(self):
self.threads = {} # { thread number -> 1 }
self.queue = Queue()
self.thread_mgmt_lock = allocate_lock()
def handlerThread(self, thread_no):
threads = self.threads
try:
while threads.get(thread_no):
task = self.queue.get()
if task is None:
# Special value: kill this thread.
break
try:
task.service()
except:
log.exception('Exception during task')
finally:
mlock = self.thread_mgmt_lock
mlock.acquire()
try:
self.stop_count -= 1
try: del threads[thread_no]
except KeyError: pass
finally:
mlock.release()
def setThreadCount(self, count):
"""See zope.server.interfaces.ITaskDispatcher"""
mlock = self.thread_mgmt_lock
mlock.acquire()
try:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
while running < count:
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
threads[thread_no] = 1
running += 1
start_new_thread(self.handlerThread, (thread_no,))
thread_no = thread_no + 1
if running > count:
# Stop threads.
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
self.queue.put(None)
running -= 1
finally:
mlock.release()
def addTask(self, task):
"""See zope.server.interfaces.ITaskDispatcher"""
if task is None:
raise ValueError("No task passed to addTask().")
# assert ITask.providedBy(task)
try:
task.defer()
self.queue.put(task)
except:
task.cancel()
raise
def shutdown(self, cancel_pending=True, timeout=5):
"""See zope.server.interfaces.ITaskDispatcher"""
self.setThreadCount(0)
# Ensure the threads shut down.
threads = self.threads
expiration = time() + timeout
while threads:
if time() >= expiration:
log.error("%d thread(s) still running" % len(threads))
break
sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
try:
queue = self.queue
while not queue.empty():
task = queue.get()
if task is not None:
task.cancel()
except Empty:
pass
def getPendingTasksEstimate(self):
"""See zope.server.interfaces.ITaskDispatcher"""
return self.queue.qsize()
|