/usr/lib/python2.7/dist-packages/txlongpoll/tests/test_client.py is in python-txlongpoll 0.3.1+bzr86-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | # Copyright 2005-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""
Tests for C{AMQFactory}.
"""
from unittest import (
defaultTestLoader,
TestCase,
)
from testtools.deferredruntest import flush_logged_errors
from twisted.internet.defer import Deferred
from txamqp.protocol import AMQChannel
from txamqp.queue import Closed
from txamqp.spec import Spec
from txlongpoll.client import AMQFactory
from txlongpoll.testing.client import AMQTest
class AMQFactoryTest(TestCase):
def test_init(self):
factory = AMQFactory("guest", "secret", "localhost", lambda x: None,
lambda: None, lambda x: None)
self.assertEquals(factory.user, "guest")
self.assertEquals(factory.password, "secret")
self.assertEquals(factory.vhost, "localhost")
self.assertTrue(isinstance(factory.spec, Spec))
class AMQFactoryConnectedTest(AMQTest):
def test_connected_callback(self):
self.assertTrue(isinstance(self.channel, AMQChannel))
def test_disconnected_callback(self):
d = Deferred()
def disconnected():
d.callback(None)
self.factory.disconnected_callback = disconnected
self.connector.disconnect()
return d
def test_reconnection(self):
d = Deferred()
def connected((client, channel)):
self.assertTrue(isinstance(channel, AMQChannel))
self.assertIsNot(channel, self.channel)
d.callback(None)
self.factory.connected_callback = connected
self.factory.maxDelay = 0.01
self.connector.disconnect()
return d
class AMQClosingTest(AMQTest):
"""Tests the L{AMQFactory} when the connection is closing."""
count = 0
def amq_connected(self, (client, channel)):
super(AMQClosingTest, self).amq_connected((client, channel))
if not self.count:
self.count += 1
raise Closed()
def test_catch_closed(self):
"""
This test ensures that L{Closed} exception raised by C{amq_connected}
is swallowed by L{AMQFactory}.
"""
errors = flush_logged_errors()
self.assertEquals(len(errors), 0)
def test_suite():
return defaultTestLoader.loadTestsFromName(__name__)
|