This file is indexed.

/usr/share/doc/python-amqplib-doc/examples/tests/client_0_8/test_basic_message.py is in python-amqplib-doc 1.0.2-1.

This file is owned by root:root, with mode 0o755.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python
"""
Test the amqplib.client_0_8.basic_message module.

"""
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301

from datetime import datetime
from decimal import Decimal
import unittest

try:
    import cPickle as pickle
except:
    import pickle


import settings

from amqplib.client_0_8.basic_message import Message


class TestBasicMessage(unittest.TestCase):

    def check_proplist(self, msg):
        """
        Check roundtrip processing of a single object

        """
        raw_properties = msg._serialize_properties()

        new_msg = Message()
        new_msg._load_properties(raw_properties)
        new_msg.body = msg.body

        self.assertEqual(msg, new_msg)


    def test_eq(self):
        msg = Message('hello', content_type='text/plain')
        self.assertNotEqual(msg, None)

        #
        # Make sure that something that looks vaguely
        # like a Message doesn't raise an Attribute
        # error when compared to a Message, and instead
        # returns False
        #
        class FakeMsg(object): pass

        fake_msg = FakeMsg()
        fake_msg.properties = {'content_type': 'text/plain'}

        self.assertNotEqual(msg, fake_msg)


    def test_pickle(self):
        msg = Message(
            'some body' * 200000,
            content_type='text/plain',
            content_encoding='utf-8',
            application_headers={'foo': 7, 'bar': 'baz', 'd2': {'foo2': 'xxx', 'foo3': -1}},
            delivery_mode=1,
            priority=7)

        msg2 = pickle.loads(pickle.dumps(msg))

        self.assertEqual(msg, msg2)


    def test_roundtrip(self):
        """
        Check round-trip processing of content-properties.

        """
        self.check_proplist(Message())

        self.check_proplist(Message(content_type='text/plain'))

        self.check_proplist(Message(
            content_type='text/plain',
            content_encoding='utf-8',
            application_headers={'foo': 7, 'bar': 'baz', 'd2': {'foo2': 'xxx', 'foo3': -1}},
            delivery_mode=1,
            priority=7))

        self.check_proplist(Message(
            application_headers={
                'regular': datetime(2007, 11, 12, 12, 34, 56),
                'dst': datetime(2007, 7, 12, 12, 34, 56),
                }))

        n = datetime.now()
        n = n.replace(microsecond=0) # AMQP only does timestamps to 1-second resolution
        self.check_proplist(Message(
            application_headers={'foo': n}))

        self.check_proplist(Message(
            application_headers={'foo': Decimal('10.1')}))

        self.check_proplist(Message(
            application_headers={'foo': Decimal('-1987654.193')}))

        self.check_proplist(Message(timestamp=datetime(1980, 1, 2, 3, 4, 6)))


def main():
    suite = unittest.TestLoader().loadTestsFromTestCase(TestBasicMessage)
    unittest.TextTestRunner(**settings.test_args).run(suite)


if __name__ == '__main__':
    main()