This file is indexed.

/usr/share/pyshared/pika/adapters/asyncore_connection.py is in python-pika 0.9.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****

import asyncore
import select
import socket
import time

from pika.adapters.base_connection import BaseConnection


class AsyncoreDispatcher(asyncore.dispatcher):
    """
    We extend asyncore.dispatcher here and throw in everything we need to
    handle both asyncore's needs and pika's. In the async adapter structure
    we expect a ioloop behavior which includes timeouts and a start and stop
    function.
    """

    def __init__(self, host, port):
        """
        Initialize the dispatcher, socket and our defaults. We turn of nageling
        in the socket to allow for faster throughput.
        """
        asyncore.dispatcher.__init__(self)

        # Create the socket, turn off nageling and connect
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        self.connect((host, port))

        # Setup defaults
        self.connecting = True
        self.connection = None
        self._timeouts = dict()
        self.writable_ = False
        self.map = None

    def handle_connect(self):
        """
        asyncore required method. Is called on connection.
        """
        self.connecting = False
        self.connection._on_connected()

        # Make our own map to pass in places
        self.map = dict({self.socket.fileno(): self})

    def handle_close(self):
        """
        asyncore required method. Is called on close.
        """
        # If we're not already closing or closed, disconnect the Connection
        if not self.connection.closing and not self.connection.closed:
            self.connection._adapter_disconnect()

    def handle_read(self):
        """
        Read from the socket and call our on_data_available with the data
        """
        try:
            data = self.recv(self.suggested_buffer_size)
        except socket.timeout:
            raise
        except socket.error, error:
            return self._handle_error(error)

        # We received no data, so disconnect
        if not data:
            return self.connection._adapter_disconnect()

        # Pass the data into our top level frame dispatching method
        self.connection._on_data_available(data)

    def handle_write(self):
        """
        asyncore required function, is called when we can write to the socket
        """
        data = self.connection.outbound_buffer.read(self.suggested_buffer_size)
        try:
            bytes_written = self.send(data)
        except socket.timeout:
            raise
        except socket.error, error:
            return self._handle_error(error)

        # Remove the content we used from our buffer
        if not bytes_written:
            return self.connection._adapter_disconnect()

        # Remove what we wrote from the outbound buffer
        self.connection.outbound_buffer.consume(bytes_written)

        # If our buffer is empty, turn off writing
        if not self.connection.outbound_buffer.size:
            self.writable_ = False

    def writable(self):
        """
        asyncore required function, used to toggle the write bit on the
        select poller. For some reason, if we return false while connecting
        asyncore hangs, so we check for that explicitly and tell it that
        it can write while it's connecting.
        """
        if not self.connected:
            return True

        # Flag maintained by AsyncoreConneciton.flush_outbound and
        # self.handle_write
        return self.writable_

    # IOLoop Compatibility
    def add_timeout(self, deadline, handler):
        """
        Add a timeout to the stack by deadline
        """
        timeout_id = 'id%.8f' % time.time()
        self._timeouts[timeout_id] = {'deadline': deadline,
                                      'handler': handler}
        return timeout_id

    def remove_timeout(self, timeout_id):
        """
        Remove a timeout from the stack
        """
        if timeout_id in self._timeouts:
            del self._timeouts[timeout_id]

    def _process_timeouts(self):
        """
        Process our self._timeouts event stack
        """
        # Process our timeout events
        keys = self._timeouts.keys()
        start_time = time.time()
        for timeout_id in keys:
            if timeout_id in self._timeouts and \
               self._timeouts[timeout_id]['deadline'] <= start_time:
                self._timeouts[timeout_id]['handler']()
                del(self._timeouts[timeout_id])

    def start(self):
        """
        Pika Adapter IOLoop start function. This blocks until we are no longer
        connected.
        """
        while self.connected or self.connecting:
            try:
                # Use our socket map if we've made it, makes things less buggy
                if self.map:
                    asyncore.loop(timeout=1, map=self.map, count=1)
                else:
                    asyncore.loop(timeout=1, count=1)
            except select.error, e:
                if e[0] == 9:
                    break
            self._process_timeouts()

    def stop(self):
        """
        Pika Adapter IOLoop stop function. When called, it will close an open
        connection, exiting us out of the IOLoop running in start.
        """
        self.close()


class AsyncoreConnection(BaseConnection):

    def _adapter_connect(self, host, port):
        """
        Connect to our RabbitMQ boker using AsyncoreDispatcher, then setting
        Pika's suggested buffer size for socket reading and writing. We pass
        the handle to self so that the AsyncoreDispatcher object can call back
        into our various state methods.
        """
        self.ioloop = AsyncoreDispatcher(host, port)

        # Map some core values for compatibility
        self.ioloop._handle_error = self._handle_error
        self.ioloop.connection = self
        self.ioloop.suggested_buffer_size = self._suggested_buffer_size
        self.socket = self.ioloop.socket

    def _flush_outbound(self):
        """
        We really can't flush the socket in asyncore, so instead just use this
        to toggle a flag that lets it know we want to write to the socket.
        """
        if self.outbound_buffer.size:
            self.ioloop.writable_ = True