/usr/lib/python2.7/dist-packages/pika/adapters/twisted_connection.py is in python-pika 0.9.14-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 | """Using Pika with a Twisted reactor.
Supports two methods of establishing the connection, using TwistedConnection
or TwistedProtocolConnection. For details about each method, see the docstrings
of the corresponding classes.
The interfaces in this module are Deferred-based when possible. This means that
the connection.channel() method and most of the channel methods return
Deferreds instead of taking a callback argument and that basic_consume()
returns a Twisted DeferredQueue where messages from the server will be
stored. Refer to the docstrings for TwistedConnection.channel() and the
TwistedChannel class for details.
"""
import functools
from twisted.internet import defer, error, reactor
from twisted.python import log
from pika import exceptions
from pika.adapters import base_connection
class ClosableDeferredQueue(defer.DeferredQueue):
"""
Like the normal Twisted DeferredQueue, but after close() is called with an
Exception instance all pending Deferreds are errbacked and further attempts
to call get() or put() return a Failure wrapping that exception.
"""
def __init__(self, size=None, backlog=None):
self.closed = None
super(ClosableDeferredQueue, self).__init__(size, backlog)
def put(self, obj):
if self.closed:
return defer.fail(self.closed)
return defer.DeferredQueue.put(self, obj)
def get(self):
if self.closed:
return defer.fail(self.closed)
return defer.DeferredQueue.get(self)
def close(self, reason):
self.closed = reason
while self.waiting:
self.waiting.pop().errback(reason)
self.pending = []
class TwistedChannel(object):
"""A wrapper wround Pika's Channel.
Channel methods that normally take a callback argument are wrapped to
return a Deferred that fires with whatever would be passed to the callback.
If the channel gets closed, all pending Deferreds are errbacked with a
ChannelClosed exception. The returned Deferreds fire with whatever
arguments the callback to the original method would receive.
The basic_consume method is wrapped in a special way, see its docstring for
details.
"""
WRAPPED_METHODS = ('exchange_declare', 'exchange_delete',
'queue_declare', 'queue_bind', 'queue_purge',
'queue_unbind', 'basic_qos', 'basic_get',
'basic_recover', 'tx_select', 'tx_commit',
'tx_rollback', 'flow', 'basic_cancel')
def __init__(self, channel):
self.__channel = channel
self.__closed = None
self.__calls = set()
self.__consumers = {}
channel.add_on_close_callback(self.channel_closed)
def channel_closed(self, channel, reply_code, reply_text):
# enter the closed state
self.__closed = exceptions.ChannelClosed(reply_code, reply_text)
# errback all pending calls
for d in self.__calls:
d.errback(self.__closed)
# close all open queues
for consumers in self.__consumers.values():
for c in consumers:
c.close(self.__closed)
# release references to stored objects
self.__calls = set()
self.__consumers = {}
def basic_consume(self, *args, **kwargs):
"""Consume from a server queue. Returns a Deferred that fires with a
tuple: (queue_object, consumer_tag). The queue object is an instance of
ClosableDeferredQueue, where data received from the queue will be
stored. Clients should use its get() method to fetch individual
message.
"""
if self.__closed:
return defer.fail(self.__closed)
queue = ClosableDeferredQueue()
queue_name = kwargs['queue']
kwargs['consumer_callback'] = lambda *args: queue.put(args)
self.__consumers.setdefault(queue_name, set()).add(queue)
try:
consumer_tag = self.__channel.basic_consume(*args, **kwargs)
except:
return defer.fail()
return defer.succeed((queue, consumer_tag))
def queue_delete(self, *args, **kwargs):
"""Wraps the method the same way all the others are wrapped, but removes
the reference to the queue object after it gets deleted on the server.
"""
wrapped = self.__wrap_channel_method('queue_delete')
queue_name = kwargs['queue']
d = wrapped(*args, **kwargs)
return d.addCallback(self.__clear_consumer, queue_name)
def basic_publish(self, *args, **kwargs):
"""Make sure the channel is not closed and then publish. Return a
Deferred that fires with the result of the channel's basic_publish.
"""
if self.__closed:
return defer.fail(self.__closed)
return defer.succeed(self.__channel.basic_publish(*args, **kwargs))
def __wrap_channel_method(self, name):
"""Wrap Pika's Channel method to make it return a Deferred that fires
when the method completes and errbacks if the channel gets closed. If
the original method's callback would receive more than one argument, the
Deferred fires with a tuple of argument values.
"""
method = getattr(self.__channel, name)
@functools.wraps(method)
def wrapped(*args, **kwargs):
if self.__closed:
return defer.fail(self.__closed)
d = defer.Deferred()
self.__calls.add(d)
d.addCallback(self.__clear_call, d)
def single_argument(*args):
"""
Make sure that the deferred is called with a single argument.
In case the original callback fires with more than one, convert
to a tuple.
"""
if len(args) > 1:
d.callback(tuple(args))
else:
d.callback(*args)
kwargs['callback'] = single_argument
try:
method(*args, **kwargs)
except:
return defer.fail()
return d
return wrapped
def __clear_consumer(self, ret, queue_name):
self.__consumers.pop(queue_name, None)
return ret
def __clear_call(self, ret, d):
self.__calls.discard(d)
return ret
def __getattr__(self, name):
# Wrap methods defined in WRAPPED_METHODS, forward the rest of accesses
# to the channel.
if name in self.WRAPPED_METHODS:
return self.__wrap_channel_method(name)
return getattr(self.__channel, name)
class IOLoopReactorAdapter(object):
"""An adapter providing Pika's IOLoop interface using a Twisted reactor.
Accepts a TwistedConnection object and a Twisted reactor object.
"""
def __init__(self, connection, reactor):
self.connection = connection
self.reactor = reactor
self.started = False
def add_timeout(self, deadline, callback_method):
"""Add the callback_method to the IOLoop timer to fire after deadline
seconds. Returns a handle to the timeout. Do not confuse with
Tornado's timeout where you pass in the time you want to have your
callback called. Only pass in the seconds until it's to be called.
:param int deadline: The number of seconds to wait to call callback
:param method callback_method: The callback method
:rtype: twisted.internet.interfaces.IDelayedCall
"""
return self.reactor.callLater(deadline, callback_method)
def remove_timeout(self, call):
"""Remove a call
:param twisted.internet.interfaces.IDelayedCall call: The call to cancel
"""
call.cancel()
def stop(self):
# Guard against stopping the reactor multiple times
if not self.started:
return
self.started = False
self.reactor.stop()
def start(self):
# Guard against starting the reactor multiple times
if self.started:
return
self.started = True
self.reactor.run()
def remove_handler(self, _):
# The fileno is irrelevant, as it's the connection's job to provide it
# to the reactor when asked to do so. Removing the handler from the
# ioloop is removing it from the reactor in Twisted's parlance.
self.reactor.removeReader(self.connection)
self.reactor.removeWriter(self.connection)
def update_handler(self, _, event_state):
# Same as in remove_handler, the fileno is irrelevant. First remove the
# connection entirely from the reactor, then add it back depending on
# the event state.
self.reactor.removeReader(self.connection)
self.reactor.removeWriter(self.connection)
if event_state & self.connection.READ:
self.reactor.addReader(self.connection)
if event_state & self.connection.WRITE:
self.reactor.addWriter(self.connection)
class TwistedConnection(base_connection.BaseConnection):
"""A standard Pika connection adapter. You instantiate the class passing the
connection parameters and the connected callback and when it gets called
you can start using it.
The problem is that connection establishing is done using the blocking
socket module. For instance, if the host you are connecting to is behind a
misconfigured firewall that just drops packets, the whole process will
freeze until the connection timeout passes. To work around that problem,
use TwistedProtocolConnection, but read its docstring first.
Objects of this class get put in the Twisted reactor which will notify them
when the socket connection becomes readable or writable, so apart from
implementing the BaseConnection interface, they also provide Twisted's
IReadWriteDescriptor interface.
"""
def __init__(self, parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
stop_ioloop_on_close=False):
super(TwistedConnection, self).__init__(
parameters=parameters,
on_open_callback=on_open_callback,
on_open_error_callback=on_open_error_callback,
on_close_callback=on_close_callback,
ioloop=IOLoopReactorAdapter(self, reactor),
stop_ioloop_on_close=stop_ioloop_on_close)
def _adapter_connect(self):
"""Connect to the RabbitMQ broker"""
# Connect (blockignly!) to the server
error = super(TwistedConnection, self)._adapter_connect()
if not error:
# Set the I/O events we're waiting for (see IOLoopReactorAdapter
# docstrings for why it's OK to pass None as the file descriptor)
self.ioloop.update_handler(None, self.event_state)
return error
def _adapter_disconnect(self):
"""Called when the adapter should disconnect"""
self.ioloop.remove_handler(None)
self.socket.close()
def _handle_disconnect(self):
"""Do not stop the reactor, this would cause the entire process to exit,
just fire the disconnect callbacks
"""
self._on_connection_closed(None, True)
def _on_connected(self):
"""Call superclass and then update the event state to flush the outgoing
frame out. Commit 50d842526d9f12d32ad9f3c4910ef60b8c301f59 removed a
self._flush_outbound call that was in _send_frame which previously
made this step unnecessary.
"""
super(TwistedConnection, self)._on_connected()
self._manage_event_state()
def channel(self, channel_number=None):
"""Return a Deferred that fires with an instance of a wrapper around the
Pika Channel class.
"""
d = defer.Deferred()
base_connection.BaseConnection.channel(self, d.callback, channel_number)
return d.addCallback(TwistedChannel)
# IReadWriteDescriptor methods
def fileno(self):
return self.socket.fileno()
def logPrefix(self):
return "twisted-pika"
def connectionLost(self, reason):
# If the connection was not closed cleanly, log the error
if not reason.check(error.ConnectionDone):
log.err(reason)
self._handle_disconnect()
def doRead(self):
self._handle_read()
def doWrite(self):
self._handle_write()
self._manage_event_state()
class TwistedProtocolConnection(base_connection.BaseConnection):
"""A hybrid between a Pika Connection and a Twisted Protocol. Allows using
Twisted's non-blocking connectTCP/connectSSL methods for connecting to the
server.
It has one caveat: TwistedProtocolConnection objects have a ready
instance variable that's a Deferred which fires when the connection is
ready to be used (the initial AMQP handshaking has been done). You *have*
to wait for this Deferred to fire before requesting a channel.
Since it's Twisted handling connection establishing it does not accept
connect callbacks, you have to implement that within Twisted. Also remember
that the host, port and ssl values of the connection parameters are ignored
because, yet again, it's Twisted who manages the connection.
"""
def __init__(self, parameters):
self.ready = defer.Deferred()
super(TwistedProtocolConnection, self).__init__(
parameters=parameters,
on_open_callback=self.connectionReady,
on_open_error_callback=self.connectionFailed,
on_close_callback=None,
ioloop=IOLoopReactorAdapter(self, reactor),
stop_ioloop_on_close=False)
def connect(self):
# The connection is open asynchronously by Twisted, so skip the whole
# connect() part, except for setting the connection state
self._set_connection_state(self.CONNECTION_INIT)
def _adapter_connect(self):
# Should never be called, as we override connect() and leave the
# building of a TCP connection to Twisted, but implement anyway to keep
# the interface
return False
def _adapter_disconnect(self):
# Disconnect from the server
self.transport.loseConnection()
def _send_frame(self, frame_value):
"""Send data the Twisted way, by writing to the transport. No need for
buffering, Twisted handles that by itself.
:param frame_value: The frame to write
:type frame_value: pika.frame.Frame|pika.frame.ProtocolHeader
"""
if self.is_closed:
raise exceptions.ConnectionClosed
marshaled_frame = frame_value.marshal()
self.bytes_sent += len(marshaled_frame)
self.frames_sent += 1
self.transport.write(marshaled_frame)
def channel(self, channel_number=None):
"""Create a new channel with the next available channel number or pass
in a channel number to use. Must be non-zero if you would like to
specify but it is recommended that you let Pika manage the channel
numbers.
Return a Deferred that fires with an instance of a wrapper around the
Pika Channel class.
:param int channel_number: The channel number to use, defaults to the
next available.
"""
d = defer.Deferred()
base_connection.BaseConnection.channel(self, d.callback, channel_number)
return d.addCallback(TwistedChannel)
# IProtocol methods
def dataReceived(self, data):
# Pass the bytes to Pika for parsing
self._on_data_available(data)
def connectionLost(self, reason):
# Let the caller know there's been an error
d, self.ready = self.ready, None
if d:
d.errback(reason)
def makeConnection(self, transport):
self.transport = transport
self.connectionMade()
def connectionMade(self):
# Tell everyone we're connected
self._on_connected()
# Our own methods
def connectionReady(self, res):
d, self.ready = self.ready, None
if d:
d.callback(res)
def connectionFailed(self, connection_unused, error_message=None):
d, self.ready = self.ready, None
if d:
attempts = self.params.connection_attempts
exc = exceptions.AMQPConnectionError(attempts)
d.errback(exc)
|