This file is indexed.

/usr/share/pyshared/pika/adapters/select_connection.py is in python-pika 0.9.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****

import select
import time

from pika.adapters.base_connection import BaseConnection
import pika.log

# One of select, epoll, kqueue or poll
SELECT_TYPE = None

# Use epoll's constants to keep life easy
READ = 0x0001
WRITE = 0x0004
ERROR = 0x0008


class SelectConnection(BaseConnection):

    def __init__(self, parameters=None, on_open_callback=None,
                 reconnection_strategy=None):
        # Run our base connection init
        BaseConnection.__init__(self, parameters, on_open_callback,
                                reconnection_strategy)

    def _adapter_connect(self, host, port):
        """
        Connect to the given host and port
        """
        BaseConnection._adapter_connect(self, host, port)

        # Setup the IOLoop
        self.ioloop = IOLoop()

        # Setup our and start our IOLoop and Poller
        self.ioloop.fileno = self.socket.fileno()
        self.ioloop.start_poller(self._handle_events, self.event_state)

        # Let everyone know we're connected
        self._on_connected()

    def _flush_outbound(self):
        """
        Call the state manager who will figure out that we need to write then
        call the poller's poll function to force it to process events.
        """
        self._manage_event_state()

        # Force our poller to come up for air
        self.ioloop.poller.poll()


class IOLoop(object):
    """
    Singlton wrapper that decides which type of poller to use, creates an
    instance of it in start_poller and keeps the invoking application in a
    blocking state by calling the pollers start method. Poller should keep
    looping until IOLoop.instance().stop() is called or there is a socket
    error.

    Also provides a convenient pass-through for add_timeout and set_events
    """
    def __init__(self):
        self.fileno = None

    def add_timeout(self, deadline, handler):
        """
        Pass through a deadline and handler to the active poller
        """
        return self.poller.add_timeout(deadline, handler)

    def remove_timeout(self, timeout_id):
        """
        Remove a timeout if it's still in the timeout stack of our poller
        """
        self.poller.remove_timeout(timeout_id)

    @property
    def poller_type(self):
        return self.poller.__class__.__name__

    def start_poller(self, handler, events):
        """
        Start the Poller, once started will take over for IOLoop.start()
        """
        # By default we don't have a poller type
        self.poller = None

        # Decide what poller to use and set it up as appropriate
        if hasattr(select, 'poll'):
            if not SELECT_TYPE or SELECT_TYPE == 'poll':
                self.poller = PollPoller(self.fileno, handler, events)

        if not self.poller and hasattr(select, 'epoll'):
            if not SELECT_TYPE or SELECT_TYPE == 'epoll':
                self.poller = EPollPoller(self.fileno, handler, events)

        if not self.poller and hasattr(select, 'kqueue'):
            if not SELECT_TYPE or SELECT_TYPE == 'kqueue':
                self.poller = KQueuePoller(self.fileno, handler, events)

        # We couldn't satisfy epoll, kqueue or poll
        if not self.poller:
            self.poller = SelectPoller(self.fileno, handler, events)

    def update_handler(self, fileno, events):
        """
        Pass in the events we want to process
        """
        self.poller.update_handler(fileno, events)

    def start(self):
        """
        Wait until we have a poller
        """
        while not self.poller:
            time.sleep(SelectPoller.TIMEOUT)

        # Loop on the poller
        self.poller.start()

    def stop(self):
        """
        Stop the poller's event loop
        """
        self.poller.update_handler(self.fileno, 0)
        self.poller.open = False


class SelectPoller(object):
    """
    Default behavior is to use Select since it's the widest supported and has
    all of the methods we need for child classes as well. One should only need
    to override the update_handler and start methods for additional types.
    """
    # How many seconds to wait until we try and process timeouts
    TIMEOUT = 1

    def __init__(self, fileno, handler, events):
        self.fileno = fileno
        self.events = events
        self.open = True
        self._handler = handler
        self._timeouts = dict()

    def update_handler(self, fileno, events):
        """
        Set our events to our current events
        """
        self.events = events

    def add_timeout(self, deadline, handler):
        """
        Add a timeout to the stack by deadline
        """
        timeout_id = '%.8f' % time.time()
        self._timeouts[timeout_id] = {'deadline': deadline,
                                      'handler': handler}
        return timeout_id

    def remove_timeout(self, timeout_id):
        """
        Remove a timeout from the stack
        """
        if timeout_id in self._timeouts:
            del self._timeouts[timeout_id]

    def process_timeouts(self):
        """
        Process our self._timeouts event stack
        """
        # Process our timeout events
        keys = self._timeouts.keys()

        start_time = time.time()
        for timeout_id in keys:
            if timeout_id in self._timeouts and \
               self._timeouts[timeout_id]['deadline'] <= start_time:
                pika.log.debug('%s: Timeout calling %s',
                               self.__class__.__name__,
                               self._timeouts[timeout_id]['handler'])
                self._timeouts[timeout_id]['handler']()
                del(self._timeouts[timeout_id])

    def start(self):
        """
        Start the main poller loop. It will loop here until self.closed
        """
        while self.open:

            # Call our poller
            self.poll()

            # Process our timeouts
            self.process_timeouts()

    def poll(self):
        # Build our values to pass into select
        input_fileno, output_fileno, error_fileno = [], [], []

        if self.events & READ:
            input_fileno = [self.fileno]
        if self.events & WRITE:
            output_fileno = [self.fileno]
        if self.events & ERROR:
            error_fileno = [self.fileno]

        # Wait on select to let us know what's up
        try:
            read, write, error = select.select(input_fileno,
                                               output_fileno,
                                               error_fileno,
                                               SelectPoller.TIMEOUT)
        except select.error, error:
            return self._handler(self.fileno, ERROR, error)

        # Build our events bit mask
        events = 0
        if read:
            events |= READ
        if write:
            events |= WRITE
        if error:
            events |= ERROR

        if events:
            pika.log.debug("%s: Calling %s", self.__class__.__name__,
                           self._handler)
            self._handler(self.fileno, events)


class KQueuePoller(SelectPoller):

    def __init__(self, fileno, handler, events):
        SelectPoller.__init__(self, fileno, handler, events)
        # Make our events 0 by default for first run of update_handler
        self.events = 0
        # Create our KQueue object
        self._kqueue = select.kqueue()
        # KQueue needs us to register each event individually
        self.update_handler(fileno, events)

    def update_handler(self, fileno, events):
        # No need to update if our events are the same
        if self.events == events:
            return

        # Keep a list of the events we want to pass into _kqueue.control
        kevents = list()

        # Build our list of kevents based upon if we have to add or remove
        # events and each event gets its own operation

        # We don't want READ
        if not events & READ:

            # We did have a read last time
            if self.events & READ:

                # Remove READ
                kevents.append(select.kevent(fileno,
                                             filter=select.KQ_FILTER_READ,
                                             flags=select.KQ_EV_DELETE))
        # We do want READ
        else:

            # We did not have a read last time
            if not self.events & READ:

                # Add READ
                kevents.append(select.kevent(fileno,
                                             filter=select.KQ_FILTER_READ,
                                             flags=select.KQ_EV_ADD))

        # We don't want write events
        if not events & WRITE:

            # We had a write last time
            if self.events & WRITE:

                # Remove it
                kevents.append(select.kevent(fileno,
                                             filter=select.KQ_FILTER_WRITE,
                                             flags=select.KQ_EV_DELETE))
        # We do want write events
        else:

            # We didn't have a WRITE last time
            if not self.events & WRITE:

                # Add write
                kevents.append(select.kevent(fileno,
                                             filter=select.KQ_FILTER_WRITE,
                                             flags=select.KQ_EV_ADD))

        # Send our event changes to kqueue control
        for event in kevents:
            self._kqueue.control([event], 0)

        # Carry the state we just sent
        self.events = events

    def start(self):
        """
        Start the main poller loop. It will loop here until self.closed
        """
        while self.open:

            # Call our poll function
            self.poll()

            # Process our timeouts
            self.process_timeouts()

    def poll(self):

        # We'll build a bitmask of events that happened in kqueue
        events = 0

        # Get up to a max of 1000 events or wait until timeout
        try:
            kevents = self._kqueue.control(None, 1000,
                                           SelectPoller.TIMEOUT)
        except OSError, error:
            return self._handler(self.fileno, ERROR, error)

        # Loop through the events returned to us and build a bitmask
        for event in kevents:

            # We had a read event, data and we're listening for them
            if event.filter == select.KQ_FILTER_READ and \
               READ & self.events:
                events |= READ

            # We're clear to write so get that done
            if event.filter == select.KQ_FILTER_WRITE and \
               WRITE & self.events:
                events |= WRITE

            # Look for errors, no event registration needed
            if event.flags & select.KQ_EV_ERROR and \
                ERROR & self.events:
                events |= ERROR

        # Call our event handler if we have events in our stack
        if events:
            pika.log.debug("%s: Calling %s(%i)", self.__class__.__name__,
                           self._handler, events)
            self._handler(self.fileno, events)


class PollPoller(SelectPoller):

    def __init__(self, fileno, handler, events):
        SelectPoller.__init__(self, fileno, handler, events)
        self._poll = select.poll()
        self._poll.register(fileno, self.events)

    def update_handler(self, fileno, events):
        self.events = events
        self._poll.modify(fileno, self.events)

    def start(self):
        """
        Start the main poller loop. It will loop here until self.closed
        """
        while self.open:

            # Poll our poller
            self.poll()

            # Process our timeouts
            self.process_timeouts()

    def poll(self):

        # Poll until TIMEOUT waiting for an event
        events = self._poll.poll(int(SelectPoller.TIMEOUT * 1000))

        # If we didn't timeout pass the event to the handler
        if events:
            pika.log.debug("%s: Calling %s", self.__class__.__name__,
                           self._handler)
            self._handler(events[0][0], events[0][1])


class EPollPoller(PollPoller):
    """
    EPoll and Poll function signatures match.
    """
    def __init__(self, fileno, handler, events):
        SelectPoller.__init__(self, fileno, handler, events)
        self._poll = select.epoll()
        self._poll.register(fileno, self.events)

    def poll(self):

        # Poll until TIMEOUT waiting for an event
        events = self._poll.poll(SelectPoller.TIMEOUT)

        # If we didn't timeout pass the event to the handler
        if events:
            pika.log.debug("%s: Calling %s", self.__class__.__name__,
                           self._handler)
            self._handler(events[0][0], events[0][1])