/usr/lib/python2.7/dist-packages/pika/adapters/asyncore_connection.py is in python-pika 0.9.13-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | """
Use Pika with the stdlib :py:mod:`asyncore` module.
"""
import asyncore
import logging
import time
from pika.adapters import base_connection
LOGGER = logging.getLogger(__name__)
class PikaDispatcher(asyncore.dispatcher):
# Use epoll's constants to keep life easy
READ = 0x0001
WRITE = 0x0004
ERROR = 0x0008
def __init__(self, sock=None, map=None, event_callback=None):
# Is an old style class...
asyncore.dispatcher.__init__(self, sock, map)
self._timeouts = dict()
self._event_callback = event_callback
self.events = self.READ | self.WRITE
def add_timeout(self, deadline, callback_method):
"""Add the callback_method to the IOLoop timer to fire after deadline
seconds. Returns a handle to the timeout. Do not confuse with
Tornado's timeout where you pass in the time you want to have your
callback called. Only pass in the seconds until it's to be called.
:param int deadline: The number of seconds to wait to call callback
:param method callback_method: The callback method
:rtype: str
"""
value = {'deadline': time.time() + deadline,
'callback': callback_method}
timeout_id = hash(frozenset(value.items()))
self._timeouts[timeout_id] = value
return timeout_id
def readable(self):
return bool(self.events & self.READ)
def writable(self):
return bool(self.events & self.WRITE)
def handle_read(self):
self._event_callback(self.socket, self.READ)
def handle_write(self):
self._event_callback(self.socket, self.WRITE, None, True)
def process_timeouts(self):
"""Process the self._timeouts event stack"""
start_time = time.time()
for timeout_id in self._timeouts.keys():
if self._timeouts[timeout_id]['deadline'] <= start_time:
callback = self._timeouts[timeout_id]['callback']
del self._timeouts[timeout_id]
callback()
def remove_timeout(self, timeout_id):
"""Remove a timeout if it's still in the timeout stack
:param str timeout_id: The timeout id to remove
"""
if timeout_id in self._timeouts:
del self._timeouts[timeout_id]
def start(self):
LOGGER.debug('Starting IOLoop')
asyncore.loop()
def stop(self):
LOGGER.debug('Stopping IOLoop')
self.close()
def update_handler(self, fileno_unused, events):
"""Set the events to the current events
:param int fileno_unused: The file descriptor
:param int events: The event mask
"""
self.events = events
class AsyncoreConnection(base_connection.BaseConnection):
"""The AsyncoreConnection adapter uses the stdlib asyncore module as an
IOLoop for asyncronous client development.
:param pika.connection.Parameters parameters: Connection parameters
:param method on_open_callback: Method to call on connection open
:param on_open_error_callback: Method to call if the connection cant
be opened
:type on_open_error_callback: method
:param method on_close_callback: Method to call on connection close
:param bool stop_ioloop_on_close: Call ioloop.stop() if disconnected
:raises: RuntimeError
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
stop_ioloop_on_close=True):
"""Create a new instance of the Connection object.
:param pika.connection.Parameters parameters: Connection parameters
:param method on_open_callback: Method to call on connection open
:param on_open_error_callback: Method to call if the connection cant
be opened
:type on_open_error_callback: method
:param method on_close_callback: Method to call on connection close
:param bool stop_ioloop_on_close: Call ioloop.stop() if disconnected
:raises: RuntimeError
"""
class ConnectingIOLoop(object):
def add_timeout(self, duration, callback_method):
time.sleep(duration)
return callback_method()
ioloop = ConnectingIOLoop()
super(AsyncoreConnection, self).__init__(parameters, on_open_callback,
on_open_error_callback,
on_close_callback,
ioloop,
stop_ioloop_on_close)
def _adapter_connect(self):
"""Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
Pika's suggested buffer size for socket reading and writing. We pass
the handle to self so that the AsyncoreDispatcher object can call back
into our various state methods.
"""
if super(AsyncoreConnection, self)._adapter_connect():
self.socket = PikaDispatcher(self.socket, None, self._handle_events)
self.ioloop = self.socket
self._on_connected()
|