/usr/share/pyshared/txlongpoll/client.py is in python-txlongpoll 0.3.1+bzr86-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | # Copyright 2005-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""
Asynchronous client for AMQP using txAMQP.
"""
import os.path
from twisted.internet.defer import maybeDeferred
from twisted.internet.protocol import ReconnectingClientFactory
from txamqp.client import TwistedDelegate
from txamqp.protocol import AMQClient
from txamqp.queue import Closed
from txamqp.spec import load as load_spec
class AMQClientWithCallback(AMQClient):
"""
An C{AMQClient} that notifies connections with a callback.
@ivar connected_callback: callback called when C{connectionMade} is
called. It takes one argument, the protocol instance itself.
"""
def __init__(self, connected_callback, *args, **kwargs):
AMQClient.__init__(self, *args, **kwargs)
self.connected_callback = connected_callback
def connectionMade(self):
AMQClient.connectionMade(self)
self.connected_callback(self)
_base_dir = os.path.dirname(os.path.abspath(__file__))
AMQP0_8_SPEC = load_spec(os.path.join(_base_dir, "specs", "amqp0-8.xml"))
del _base_dir
class AMQFactory(ReconnectingClientFactory):
"""
A C{ClientFactory} for C{AMQClient} protocol with reconnecting facilities.
@ivar user: the user name to use to connect to the AMQP server.
@ivar password: the corresponding password of the user.
@ivar vhost: the AMQP vhost to create connections against.
@ivar connected_callback: callback called when a successful connection
happened. It takes one argument, the channel opened for the connection.
@ivar disconnected_callback: callback called when a previously connected
connection was lost. It takes no argument.
"""
protocol = AMQClientWithCallback
initialDelay = 0.01
def __init__(self, user, password, vhost, connected_callback,
disconnected_callback, failed_callback, spec=None):
self.user = user
self.password = password
self.vhost = vhost
self.delegate = TwistedDelegate()
if spec is None:
spec = AMQP0_8_SPEC
self.spec = spec
self.connected_callback = connected_callback
self.disconnected_callback = disconnected_callback
self.failed_callback = failed_callback
def buildProtocol(self, addr):
"""
Create the protocol instance and returns it for letting Twisted
connect it to the transport.
@param addr: the attributed address, unused for now.
"""
protocol = self.protocol(self.clientConnectionMade, self.delegate,
self.vhost, spec=self.spec)
protocol.factory = self
return protocol
def clientConnectionMade(self, client):
"""
Called when a connection succeeds: login to the server, and open a
channel against it.
"""
self.resetDelay()
def started(ignored):
# We don't care about authenticate result as long as it succeeds
return client.channel(1).addCallback(got_channel)
def got_channel(channel):
return channel.channel_open().addCallback(opened, channel)
def opened(ignored, channel):
deferred = maybeDeferred(
self.connected_callback, (client, channel))
deferred.addErrback(catch_closed)
def catch_closed(failure):
failure.trap(Closed)
deferred = client.authenticate(self.user, self.password)
return deferred.addCallback(started)
def clientConnectionLost(self, connector, reason):
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
self.disconnected_callback()
def clientConnectionFailed(self, connector, reason):
ReconnectingClientFactory.clientConnectionFailed(
self, connector, reason)
self.failed_callback((connector, reason))
|