This file is indexed.

/usr/share/pyshared/txlongpoll/tests/test_client.py is in python-txlongpoll 0.3.1+bzr86-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright 2005-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""
Tests for C{AMQFactory}.
"""

from unittest import (
    defaultTestLoader,
    TestCase,
    )

from testtools.deferredruntest import flush_logged_errors
from twisted.internet.defer import Deferred
from txamqp.protocol import AMQChannel
from txamqp.queue import Closed
from txamqp.spec import Spec
from txlongpoll.client import AMQFactory
from txlongpoll.testing.client import AMQTest


class AMQFactoryTest(TestCase):

    def test_init(self):
        factory = AMQFactory("guest", "secret", "localhost", lambda x: None,
                             lambda: None, lambda x: None)
        self.assertEquals(factory.user, "guest")
        self.assertEquals(factory.password, "secret")
        self.assertEquals(factory.vhost, "localhost")
        self.assertTrue(isinstance(factory.spec, Spec))


class AMQFactoryConnectedTest(AMQTest):

    def test_connected_callback(self):
        self.assertTrue(isinstance(self.channel, AMQChannel))

    def test_disconnected_callback(self):
        d = Deferred()

        def disconnected():
            d.callback(None)

        self.factory.disconnected_callback = disconnected
        self.connector.disconnect()
        return d

    def test_reconnection(self):
        d = Deferred()

        def connected((client, channel)):
            self.assertTrue(isinstance(channel, AMQChannel))
            self.assertIsNot(channel, self.channel)
            d.callback(None)

        self.factory.connected_callback = connected
        self.factory.maxDelay = 0.01
        self.connector.disconnect()
        return d


class AMQClosingTest(AMQTest):
    """Tests the L{AMQFactory} when the connection is closing."""

    count = 0

    def amq_connected(self, (client, channel)):
        super(AMQClosingTest, self).amq_connected((client, channel))
        if not self.count:
            self.count += 1
            raise Closed()

    def test_catch_closed(self):
        """
        This test ensures that L{Closed} exception raised by C{amq_connected}
        is swallowed by L{AMQFactory}.
        """
        errors = flush_logged_errors()
        self.assertEquals(len(errors), 0)


def test_suite():
    return defaultTestLoader.loadTestsFromName(__name__)