This file is indexed.

/usr/share/pyshared/ZEO/zrpc/trigger.py is in python-zodb 1:3.10.5-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
##############################################################################
#
# Copyright (c) 2001-2005 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

from __future__ import with_statement

import asyncore
import os
import socket
import thread
import errno

from ZODB.utils import positive_id

# Original comments follow; they're hard to follow in the context of
# ZEO's use of triggers.  TODO:  rewrite from a ZEO perspective.

# Wake up a call to select() running in the main thread.
#
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread.  Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
#
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool.  When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
#
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread.  The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them.  [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]

class _triggerbase(object):
    """OS-independent base class for OS-dependent trigger class."""

    kind = None  # subclass must set to "pipe" or "loopback"; used by repr

    def __init__(self):
        self._closed = False

        # `lock` protects the `thunks` list from being traversed and
        # appended to simultaneously.
        self.lock = thread.allocate_lock()

        # List of no-argument callbacks to invoke when the trigger is
        # pulled.  These run in the thread running the asyncore mainloop,
        # regardless of which thread pulls the trigger.
        self.thunks = []

    def readable(self):
        return 1

    def writable(self):
        return 0

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    # Override the asyncore close() method, because it doesn't know about
    # (so can't close) all the gimmicks we have open.  Subclass must
    # supply a _close() method to do platform-specific closing work.  _close()
    # will be called iff we're not already closed.
    def close(self):
        if not self._closed:
            self._closed = True
            self.del_channel()
            self._close()  # subclass does OS-specific stuff

    def _close(self):    # see close() above; subclass must supply
        raise NotImplementedError

    def pull_trigger(self, *thunk):
        if thunk:
            with self.lock:
                self.thunks.append(thunk)
        try:
            self._physical_pull()
        except Exception:
            if not self._closed:
                raise

    # Subclass must supply _physical_pull, which does whatever the OS
    # needs to do to provoke the "write" end of the trigger.
    def _physical_pull(self):
        raise NotImplementedError

    def handle_read(self):
        try:
            self.recv(8192)
        except socket.error:
            return

        while 1:
            with self.lock:
                if self.thunks:
                    thunk = self.thunks.pop(0)
                else:
                    return
            try:
                thunk[0](*thunk[1:])
            except:
                nil, t, v, tbinfo = asyncore.compact_traceback()
                print ('exception in trigger thunk:'
                       ' (%s:%s %s)' % (t, v, tbinfo))

    def __repr__(self):
        return '<select-trigger (%s) at %x>' % (self.kind, positive_id(self))

if os.name == 'posix':

    class trigger(_triggerbase, asyncore.file_dispatcher):
        kind = "pipe"

        def __init__(self, map=None):
            _triggerbase.__init__(self)
            r, self.trigger = os.pipe()
            asyncore.file_dispatcher.__init__(self, r, map)

            if self.fd != r:
                # Starting in Python 2.6, the descriptor passed to
                # file_dispatcher gets duped and assigned to
                # self.fd. This breals the instantiation semantics and
                # is a bug imo.  I dount it will get fixed, but maybe
                # it will. Who knows. For that reason, we test for the
                # fd changing rather than just checking the Python version.
                os.close(r)

        def _close(self):
            os.close(self.trigger)
            asyncore.file_dispatcher.close(self)

        def _physical_pull(self):
            os.write(self.trigger, 'x')

else:
    # Windows version; uses just sockets, because a pipe isn't select'able
    # on Windows.

    class BindError(Exception):
        pass

    class trigger(_triggerbase, asyncore.dispatcher):
        kind = "loopback"

        def __init__(self, map=None):
            _triggerbase.__init__(self)

            # Get a pair of connected sockets.  The trigger is the 'w'
            # end of the pair, which is connected to 'r'.  'r' is put
            # in the asyncore socket map.  "pulling the trigger" then
            # means writing something on w, which will wake up r.

            w = socket.socket()
            # Disable buffering -- pulling the trigger sends 1 byte,
            # and we want that sent immediately, to wake up asyncore's
            # select() ASAP.
            w.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            count = 0
            while 1:
               count += 1
               # Bind to a local port; for efficiency, let the OS pick
               # a free port for us.
               # Unfortunately, stress tests showed that we may not
               # be able to connect to that port ("Address already in
               # use") despite that the OS picked it.  This appears
               # to be a race bug in the Windows socket implementation.
               # So we loop until a connect() succeeds (almost always
               # on the first try).  See the long thread at
               # http://mail.zope.org/pipermail/zope/2005-July/160433.html
               # for hideous details.
               a = socket.socket()
               a.bind(("127.0.0.1", 0))
               connect_address = a.getsockname()  # assigned (host, port) pair
               a.listen(1)
               try:
                   w.connect(connect_address)
                   break    # success
               except socket.error, detail:
                   if detail[0] != errno.WSAEADDRINUSE:
                       # "Address already in use" is the only error
                       # I've seen on two WinXP Pro SP2 boxes, under
                       # Pythons 2.3.5 and 2.4.1.
                       raise
                   # (10048, 'Address already in use')
                   # assert count <= 2 # never triggered in Tim's tests
                   if count >= 10:  # I've never seen it go above 2
                       a.close()
                       w.close()
                       raise BindError("Cannot bind trigger!")
                   # Close `a` and try again.  Note:  I originally put a short
                   # sleep() here, but it didn't appear to help or hurt.
                   a.close()

            r, addr = a.accept()  # r becomes asyncore's (self.)socket
            a.close()
            self.trigger = w
            asyncore.dispatcher.__init__(self, r, map)

        def _close(self):
            # self.socket is r, and self.trigger is w, from __init__
            self.socket.close()
            self.trigger.close()

        def _physical_pull(self):
            self.trigger.send('x')