This file is indexed.

/usr/lib/python2.7/dist-packages/Pyro4/socketserver/threadpool.py is in python2-pyro4 4.63-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Thread pool job processor with variable number of worker threads (between max/min amount).

Pyro - Python Remote Objects.  Copyright by Irmen de Jong (irmen@razorvine.net).
"""

from __future__ import with_statement
import time
import logging
import threading
from Pyro4.configuration import config

log = logging.getLogger("Pyro4.threadpool")


class PoolError(Exception):
    pass


class NoFreeWorkersError(PoolError):
    pass


class Worker(threading.Thread):
    def __init__(self, pool):
        super(Worker, self).__init__()
        self.daemon = True
        self.name = "Pyro-Worker-%d " % id(self)
        self.job_available = threading.Event()
        self.job = None
        self.pool = pool

    def process(self, job):
        self.job = job
        self.job_available.set()

    def run(self):
        while True:
            self.job_available.wait()
            self.job_available.clear()
            if self.job is None:
                break
            try:
                self.job()
            except Exception as x:
                log.exception("unhandled exception from job in worker thread %s: %s", self.name, x)
            self.job = None
            self.pool.notify_done(self)
        self.pool = None


class Pool(object):
    """
    A job processing pool that is using a pool of worker threads.
    The amount of worker threads in the pool is configurable and scales between min/max size.
    """
    def __init__(self):
        if config.THREADPOOL_SIZE < 1 or config.THREADPOOL_SIZE_MIN < 1:
            raise ValueError("threadpool sizes must be greater than zero")
        if config.THREADPOOL_SIZE_MIN > config.THREADPOOL_SIZE:
            raise ValueError("minimum threadpool size must be less than or equal to max size")
        self.idle = set()
        self.busy = set()
        self.closed = False
        for _ in range(config.THREADPOOL_SIZE_MIN):
            worker = Worker(self)
            self.idle.add(worker)
            worker.start()
        log.debug("worker pool created with initial size %d", self.num_workers())
        self.count_lock = threading.Lock()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        if not self.closed:
            log.debug("closing down")
            for w in list(self.busy):
                w.process(None)
            for w in list(self.idle):
                w.process(None)
            self.closed = True
            time.sleep(0.1)
            idle, self.idle = self.idle, set()
            busy, self.busy = self.busy, set()
            # check if the threads that are joined are not the current thread,
            # otherwise Python 2.x crashes with "cannot join current thread".
            current_thread = threading.current_thread()
            while idle:
                p = idle.pop()
                if p is not current_thread:
                    p.join(timeout=0.1)
            while busy:
                p = busy.pop()
                if p is not current_thread:
                    p.join(timeout=0.1)

    def __repr__(self):
        return "<%s.%s at 0x%x; %d busy workers; %d idle workers>" % \
               (self.__class__.__module__, self.__class__.__name__, id(self), len(self.busy), len(self.idle))

    def num_workers(self):
        return len(self.busy) + len(self.idle)

    def process(self, job):
        if self.closed:
            raise PoolError("job queue is closed")
        if self.idle:
            worker = self.idle.pop()
        elif self.num_workers() < config.THREADPOOL_SIZE:
            worker = Worker(self)
            worker.start()
        else:
            raise NoFreeWorkersError("no free workers available, increase thread pool size")
        self.busy.add(worker)
        worker.process(job)
        log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))

    def notify_done(self, worker):
        if worker in self.busy:
            self.busy.remove(worker)
        if self.closed:
            worker.process(None)
            return
        if len(self.idle) >= config.THREADPOOL_SIZE_MIN:
            worker.process(None)
        else:
            self.idle.add(worker)
        log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))