/usr/share/pyshared/pika/adapters/base_connection.py is in python-pika 0.9.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | # ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****
"""
Pika provides multiple adapters to connect to RabbitMQ:
- adapters.select_connection.SelectConnection: A native event based connection
adapter that implements select, kqueue, poll and epoll.
- adapters.asyncore_connection.AsyncoreConnection: Legacy adapter kept for
convenience of previous Pika users. It is recommended to use the
SelectConnection instead of AsyncoreConnection.
- adapters.tornado_connection.TornadoConnection: Connection adapter for use
with the Tornado web framework.
- adapters.blocking_connection.BlockingConnection: Enables blocking,
synchronous operation on top of library for simple uses. This is not
recommended and is included for legacy reasons only.
"""
import errno
import socket
import time
from pika.connection import Connection
import pika.log
# Use epoll's constants to keep life easy
READ = 0x0001
WRITE = 0x0004
ERROR = 0x0008
class BaseConnection(Connection):
def __init__(self, parameters=None, on_open_callback=None,
reconnection_strategy=None):
# Set our defaults
self.fd = None
self.ioloop = None
self.socket = None
# Event states (base and current)
self.base_events = READ | ERROR
self.event_state = self.base_events
# Call our parent's __init__
Connection.__init__(self, parameters, on_open_callback,
reconnection_strategy)
def _adapter_connect(self, host, port):
"""
Base connection function to be extended as needed
"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.socket.connect((host, port))
self.socket.setblocking(0)
def add_timeout(self, delay_sec, callback):
deadline = time.time() + delay_sec
return self.ioloop.add_timeout(deadline, callback)
def remove_timeout(self, timeout_id):
self.ioloop.remove_timeout(timeout_id)
def _erase_credentials(self):
pass
def _flush_outbound(self):
"""
Call the state manager who will figure out that we need to write.
"""
self._manage_event_state()
def _adapter_disconnect(self):
"""
Called if we are forced to disconnect for some reason from Connection
"""
# Remove from the IOLoop
self.ioloop.stop()
# Close our socket
self.socket.close()
def _handle_disconnect(self):
"""
Called internally when we know our socket is disconnected already
"""
# Remove from the IOLoop
self.ioloop.stop()
# Close up our Connection state
self._on_connection_closed(None, True)
def _handle_error(self, error):
"""
Internal error handling method. Here we expect a socket.error coming in
and will handle different socket errors differently.
"""
# Handle version differences in Python
if hasattr(error, 'errno'): # Python >= 2.6
error_code = error.errno
else:
error_code = error[0] # Python <= 2.5
# Ok errors, just continue what we were doing before
if error_code in (errno.EWOULDBLOCK, errno.EAGAIN, errno.EINTR):
return
# Socket is closed, so lets just go to our handle_close method
elif error_code == errno.EBADF:
pika.log.error("%s: Socket is closed", self.__class__.__name__)
else:
# Haven't run into this one yet, log it.
pika.log.error("%s: Socket Error on %d: %s",
self.__class__.__name__,
self.socket.fileno(),
error_code)
# Disconnect from our IOLoop and let Connection know what's up
self._handle_disconnect()
def _handle_events(self, fd, events, error=None):
"""
Our IO/Event loop have called us with events, so process them
"""
if not self.socket:
pika.log.error("%s: Got events for closed stream %d",
self.__class__.__name__, self.socket.fileno())
return
if events & READ:
self._handle_read()
if events & ERROR:
self._handle_error(error)
if events & WRITE:
self._handle_write()
# Call our event state manager who will decide if we reset our
# event state due to having an empty outbound buffer
self._manage_event_state()
def _handle_read(self):
"""
Read from the socket and call our on_data_available with the data
"""
try:
data = self.socket.recv(self._suggested_buffer_size)
except socket.timeout:
raise
except socket.error, error:
return self._handle_error(error)
# We received no data, so disconnect
if not data:
return self._adapter_disconnect()
# Pass the data into our top level frame dispatching method
self._on_data_available(data)
def _handle_write(self):
"""
We only get here when we have data to write, so try and send
Pika's suggested buffer size of data (be nice to Windows)
"""
data = self.outbound_buffer.read(self._suggested_buffer_size)
try:
bytes_written = self.socket.send(data)
except socket.timeout:
raise
except socket.error, error:
return self._handle_error(error)
# Remove the content from our output buffer
self.outbound_buffer.consume(bytes_written)
def _manage_event_state(self):
"""
We use this to manage the bitmask for reading/writing/error which
we want to use to have our io/event handler tell us when we can
read/write, etc
"""
# Do we have data pending in the outbound buffer?
if self.outbound_buffer.size:
# If we don't already have write in our event state append it
# otherwise do nothing
if not self.event_state & WRITE:
# We can assume that we're in our base_event state
self.event_state |= WRITE
# Update the IOLoop
self.ioloop.update_handler(self.socket.fileno(),
self.event_state)
# We don't have data in the outbound buffer
elif self.event_state & WRITE:
# Set our event state to the base events
self.event_state = self.base_events
# Update the IOLoop
self.ioloop.update_handler(self.socket.fileno(), self.event_state)
|