This file is indexed.

/usr/lib/python2.7/dist-packages/amqp/abstract_channel.py is in python-amqp 2.2.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Code common to Connection and Channel objects."""
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>)
from __future__ import absolute_import, unicode_literals

from vine import ensure_promise, promise

from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .five import bytes_if_py2
from .serialization import dumps, loads

__all__ = ['AbstractChannel']


class AbstractChannel(object):
    """Superclass for Connection and Channel.

    The connection is treated as channel 0, then comes
    user-created channel objects.

    The subclasses must have a _METHOD_MAP class property, mapping
    between AMQP method signatures and Python methods.
    """

    def __init__(self, connection, channel_id):
        self.connection = connection
        self.channel_id = channel_id
        connection.channels[channel_id] = self
        self.method_queue = []  # Higher level queue for methods
        self.auto_decode = False
        self._pending = {}
        self._callbacks = {}

        self._setup_listeners()

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        self.close()

    def send_method(self, sig,
                    format=None, args=None, content=None,
                    wait=None, callback=None, returns_tuple=False):
        p = promise()
        conn = self.connection
        if conn is None:
            raise RecoverableConnectionError('connection already closed')
        args = dumps(format, args) if format else bytes_if_py2('')
        try:
            conn.frame_writer(1, self.channel_id, sig, args, content)
        except StopIteration:
            raise RecoverableConnectionError('connection already closed')

        # TODO temp: callback should be after write_method ... ;)
        if callback:
            p.then(callback)
        p()
        if wait:
            return self.wait(wait, returns_tuple=returns_tuple)
        return p

    def close(self):
        """Close this Channel or Connection."""
        raise NotImplementedError('Must be overriden in subclass')

    def wait(self, method, callback=None, timeout=None, returns_tuple=False):
        p = ensure_promise(callback)
        pending = self._pending
        prev_p = []
        if not isinstance(method, list):
            method = [method]

        for m in method:
            prev_p.append(pending.get(m))
            pending[m] = p

        try:
            while not p.ready:
                self.connection.drain_events(timeout=timeout)

            if p.value:
                args, kwargs = p.value
                return args if returns_tuple else (args and args[0])
        finally:
            for i, m in enumerate(method):
                if prev_p[i] is not None:
                    pending[m] = prev_p[i]
                else:
                    pending.pop(m, None)

    def dispatch_method(self, method_sig, payload, content):
        if content and \
                self.auto_decode and \
                hasattr(content, 'content_encoding'):
            try:
                content.body = content.body.decode(content.content_encoding)
            except Exception:
                pass

        try:
            amqp_method = self._METHODS[method_sig]
        except KeyError:
            raise AMQPNotImplementedError(
                'Unknown AMQP method {0!r}'.format(method_sig))

        try:
            listeners = [self._callbacks[method_sig]]
        except KeyError:
            listeners = None
        try:
            one_shot = self._pending.pop(method_sig)
        except KeyError:
            if not listeners:
                return
        else:
            if listeners is None:
                listeners = [one_shot]
            else:
                listeners.append(one_shot)

        args = []
        if amqp_method.args:
            args, _ = loads(amqp_method.args, payload, 4)
        if amqp_method.content:
            args.append(content)

        for listener in listeners:
            listener(*args)

    #: Placeholder, the concrete implementations will have to
    #: supply their own versions of _METHOD_MAP
    _METHODS = {}