This file is indexed.

/usr/lib/python2.7/dist-packages/Pyro4/socketserver/threadpoolserver.py is in python2-pyro4 4.23-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Socket server based on a worker thread pool. Doesn't use select.

Uses a single worker thread per client connection.

Pyro - Python Remote Objects.  Copyright by Irmen de Jong (irmen@razorvine.net).
"""

from __future__ import with_statement
import socket, logging, sys, os
import struct
from Pyro4 import socketutil, errors
import Pyro4.tpjobqueue

log=logging.getLogger("Pyro4.socketserver.threadpool")


class ClientConnectionJob(object):
    """
    Takes care of a single client connection and all requests
    that may arrive during its life span.
    """
    def __init__(self, clientSocket, clientAddr, daemon):
        self.csock = socketutil.SocketConnection(clientSocket)
        self.caddr = clientAddr
        self.daemon = daemon

    def __call__(self):
        log.debug("job call() %s", self.caddr)  # XXX remove this after issue #19 is fixed
        if self.handleConnection():
            try:
                while True:
                    try:
                        self.daemon.handleRequest(self.csock)
                    except (socket.error, errors.ConnectionClosedError):
                        # client went away.
                        log.debug("disconnected %s", self.caddr)
                        break
                    except errors.SecurityError:
                        log.debug("security error on client %s", self.caddr)
                        break
                    # other errors simply crash the client worker thread (and close its connection)
            finally:
                self.csock.close()

    def handleConnection(self):
        # connection handshake
        try:
            if self.daemon._handshake(self.csock):
                return True
        except:
            ex_t, ex_v, ex_tb = sys.exc_info()
            tb = Pyro4.util.formatTraceback(ex_t, ex_v, ex_tb)
            log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb))
            self.csock.close()
        return False

    def interrupt(self):
        """attempt to interrupt the worker's request loop"""
        try:
            self.csock.sock.shutdown(socket.SHUT_RDWR)
            self.csock.sock.setblocking(False)
        except (OSError, socket.error):
            pass
        if hasattr(socket, "SO_RCVTIMEO"):
            # setting a recv timeout seems to break the blocking call to recv() on some systems
            try:
                self.csock.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", 1,1))
            except socket.error:
                pass
        self.csock.close()



class SocketServer_Threadpool(object):
    """transport server for socket connections, worker thread pool version."""
    def init(self, daemon, host, port, unixsocket=None):
        log.info("starting thread pool socketserver")
        self.daemon = daemon
        self.sock=None
        bind_location=unixsocket if unixsocket else (host,port)
        self.sock=socketutil.createSocket(bind=bind_location, reuseaddr=Pyro4.config.SOCK_REUSE, timeout=Pyro4.config.COMMTIMEOUT, noinherit=True)
        self._socketaddr=self.sock.getsockname()
        if self._socketaddr[0].startswith("127."):
            if host is None or host.lower()!="localhost" and not host.startswith("127."):
                log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host)
        if unixsocket:
            self.locationStr="./u:"+unixsocket
        else:
            host=host or self._socketaddr[0]
            port=port or self._socketaddr[1]
            if ":" in host:  # ipv6
                self.locationStr="[%s]:%d" % (host, port)
            else:
                self.locationStr="%s:%d" % (host, port)
        self.jobqueue = Pyro4.tpjobqueue.ThreadPooledJobQueue()
        log.info("%d workers started", self.jobqueue.workercount)

    def __del__(self):
        if self.sock is not None:
            self.sock.close()
        if self.jobqueue is not None:
            self.jobqueue.close()

    def __repr__(self):
        return "<%s on %s, %d workers, %d jobs>" % (self.__class__.__name__, self.locationStr,
            self.jobqueue.workercount, self.jobqueue.jobcount)

    def loop(self, loopCondition=lambda: True):
        log.debug("threadpool server requestloop")
        while (self.sock is not None) and loopCondition():
            try:
                self.events([self.sock])
            except socket.error:
                x=sys.exc_info()[1]
                err=getattr(x, "errno", x.args[0])
                if not loopCondition():
                    # swallow the socket error if loop terminates anyway
                    # this can occur if we are asked to shutdown, socket can be invalid then
                    break
                if err in socketutil.ERRNO_RETRIES:
                    continue
                else:
                    raise
            except KeyboardInterrupt:
                log.debug("stopping on break signal")
                break
        log.debug("threadpool server exits requestloop")

    def events(self, eventsockets):
        """used for external event loops: handle events that occur on one of the sockets of this server"""
        # we only react on events on our own server socket.
        # all other (client) sockets are owned by their individual threads.
        assert self.sock in eventsockets
        try:
            csock, caddr=self.sock.accept()
            log.debug("connected %s", caddr)
            if Pyro4.config.COMMTIMEOUT:
                csock.settimeout(Pyro4.config.COMMTIMEOUT)
            self.jobqueue.process(ClientConnectionJob(csock, caddr, self.daemon))
        except socket.timeout:
            pass  # just continue the loop on a timeout on accept

    def close(self, joinWorkers=True):
        log.debug("closing threadpool server")
        if self.sock:
            sockname=None
            try:
                sockname=self.sock.getsockname()
            except socket.error:
                pass
            try:
                self.sock.close()
                if type(sockname) is str:
                    # it was a Unix domain socket, remove it from the filesystem
                    if os.path.exists(sockname):
                        os.remove(sockname)
            except Exception:
                pass
            self.sock=None
        self.jobqueue.close()
        for worker in self.jobqueue.busy.copy():
            if worker.job is not None:
                worker.job.interrupt()   # try to break all busy workers
        if joinWorkers:
            self.jobqueue.drain()

    @property
    def sockets(self):
        # the server socket is all we care about, all client sockets are running in their own threads
        return [self.sock]

    def wakeup(self):
        interruptSocket(self._socketaddr)


def interruptSocket(address):
    """bit of a hack to trigger a blocking server to get out of the loop, useful at clean shutdowns"""
    try:
        sock=socketutil.createSocket(connect=address, keepalive=False, timeout=None)
        socketutil.triggerSocket(sock)
        try:
            sock.shutdown(socket.SHUT_RDWR)
        except (OSError, socket.error):
            pass
        sock.close()
    except socket.error:
        pass