This file is indexed.

/usr/lib/python2.7/dist-packages/pika/adapters/asyncore_connection.py is in python-pika 0.9.13-1build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Use Pika with the stdlib :py:mod:`asyncore` module.

"""
import asyncore
import logging
import time

from pika.adapters import base_connection

LOGGER = logging.getLogger(__name__)


class PikaDispatcher(asyncore.dispatcher):

    # Use epoll's constants to keep life easy
    READ = 0x0001
    WRITE = 0x0004
    ERROR = 0x0008

    def __init__(self, sock=None, map=None, event_callback=None):
        # Is an old style class...
        asyncore.dispatcher.__init__(self, sock, map)
        self._timeouts = dict()
        self._event_callback = event_callback
        self.events = self.READ | self.WRITE


    def add_timeout(self, deadline, callback_method):
        """Add the callback_method to the IOLoop timer to fire after deadline
        seconds. Returns a handle to the timeout. Do not confuse with
        Tornado's timeout where you pass in the time you want to have your
        callback called. Only pass in the seconds until it's to be called.

        :param int deadline: The number of seconds to wait to call callback
        :param method callback_method: The callback method
        :rtype: str

        """
        value = {'deadline': time.time() + deadline,
                 'callback': callback_method}
        timeout_id = hash(frozenset(value.items()))
        self._timeouts[timeout_id] = value
        return timeout_id

    def readable(self):
        return bool(self.events & self.READ)

    def writable(self):
        return bool(self.events & self.WRITE)

    def handle_read(self):
        self._event_callback(self.socket, self.READ)

    def handle_write(self):
        self._event_callback(self.socket, self.WRITE, None, True)

    def process_timeouts(self):
        """Process the self._timeouts event stack"""
        start_time = time.time()
        for timeout_id in self._timeouts.keys():
            if self._timeouts[timeout_id]['deadline'] <= start_time:
                callback = self._timeouts[timeout_id]['callback']
                del self._timeouts[timeout_id]
                callback()

    def remove_timeout(self, timeout_id):
        """Remove a timeout if it's still in the timeout stack

        :param str timeout_id: The timeout id to remove

        """
        if timeout_id in self._timeouts:
            del self._timeouts[timeout_id]

    def start(self):
        LOGGER.debug('Starting IOLoop')
        asyncore.loop()

    def stop(self):
        LOGGER.debug('Stopping IOLoop')
        self.close()

    def update_handler(self, fileno_unused, events):
        """Set the events to the current events

        :param int fileno_unused: The file descriptor
        :param int events: The event mask

        """
        self.events = events


class AsyncoreConnection(base_connection.BaseConnection):
    """The AsyncoreConnection adapter uses the stdlib asyncore module as an
    IOLoop for asyncronous client development.

    :param pika.connection.Parameters parameters: Connection parameters
    :param method on_open_callback: Method to call on connection open
    :param on_open_error_callback: Method to call if the connection cant
                                   be opened
    :type on_open_error_callback: method
    :param method on_close_callback: Method to call on connection close
    :param bool stop_ioloop_on_close: Call ioloop.stop() if disconnected
    :raises: RuntimeError

    """
    def __init__(self,
                 parameters=None,
                 on_open_callback=None,
                 on_open_error_callback=None,
                 on_close_callback=None,
                 stop_ioloop_on_close=True):
        """Create a new instance of the Connection object.

        :param pika.connection.Parameters parameters: Connection parameters
        :param method on_open_callback: Method to call on connection open
        :param on_open_error_callback: Method to call if the connection cant
                                       be opened
        :type on_open_error_callback: method
        :param method on_close_callback: Method to call on connection close
        :param bool stop_ioloop_on_close: Call ioloop.stop() if disconnected
        :raises: RuntimeError

        """
        class ConnectingIOLoop(object):
            def add_timeout(self, duration, callback_method):
                time.sleep(duration)
                return callback_method()
        ioloop = ConnectingIOLoop()
        super(AsyncoreConnection, self).__init__(parameters, on_open_callback,
                                                 on_open_error_callback,
                                                 on_close_callback,
                                                 ioloop,
                                                 stop_ioloop_on_close)

    def _adapter_connect(self):
        """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
        Pika's suggested buffer size for socket reading and writing. We pass
        the handle to self so that the AsyncoreDispatcher object can call back
        into our various state methods.

        """
        if super(AsyncoreConnection, self)._adapter_connect():
            self.socket = PikaDispatcher(self.socket, None, self._handle_events)
            self.ioloop = self.socket
            self._on_connected()