/usr/lib/python2.7/dist-packages/amqp/abstract_channel.py is in python-amqp 2.2.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | """Code common to Connection and Channel objects."""
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>)
from __future__ import absolute_import, unicode_literals
from vine import ensure_promise, promise
from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .five import bytes_if_py2
from .serialization import dumps, loads
__all__ = ['AbstractChannel']
class AbstractChannel(object):
"""Superclass for Connection and Channel.
The connection is treated as channel 0, then comes
user-created channel objects.
The subclasses must have a _METHOD_MAP class property, mapping
between AMQP method signatures and Python methods.
"""
def __init__(self, connection, channel_id):
self.connection = connection
self.channel_id = channel_id
connection.channels[channel_id] = self
self.method_queue = [] # Higher level queue for methods
self.auto_decode = False
self._pending = {}
self._callbacks = {}
self._setup_listeners()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def send_method(self, sig,
format=None, args=None, content=None,
wait=None, callback=None, returns_tuple=False):
p = promise()
conn = self.connection
if conn is None:
raise RecoverableConnectionError('connection already closed')
args = dumps(format, args) if format else bytes_if_py2('')
try:
conn.frame_writer(1, self.channel_id, sig, args, content)
except StopIteration:
raise RecoverableConnectionError('connection already closed')
# TODO temp: callback should be after write_method ... ;)
if callback:
p.then(callback)
p()
if wait:
return self.wait(wait, returns_tuple=returns_tuple)
return p
def close(self):
"""Close this Channel or Connection."""
raise NotImplementedError('Must be overriden in subclass')
def wait(self, method, callback=None, timeout=None, returns_tuple=False):
p = ensure_promise(callback)
pending = self._pending
prev_p = []
if not isinstance(method, list):
method = [method]
for m in method:
prev_p.append(pending.get(m))
pending[m] = p
try:
while not p.ready:
self.connection.drain_events(timeout=timeout)
if p.value:
args, kwargs = p.value
return args if returns_tuple else (args and args[0])
finally:
for i, m in enumerate(method):
if prev_p[i] is not None:
pending[m] = prev_p[i]
else:
pending.pop(m, None)
def dispatch_method(self, method_sig, payload, content):
if content and \
self.auto_decode and \
hasattr(content, 'content_encoding'):
try:
content.body = content.body.decode(content.content_encoding)
except Exception:
pass
try:
amqp_method = self._METHODS[method_sig]
except KeyError:
raise AMQPNotImplementedError(
'Unknown AMQP method {0!r}'.format(method_sig))
try:
listeners = [self._callbacks[method_sig]]
except KeyError:
listeners = None
try:
one_shot = self._pending.pop(method_sig)
except KeyError:
if not listeners:
return
else:
if listeners is None:
listeners = [one_shot]
else:
listeners.append(one_shot)
args = []
if amqp_method.args:
args, _ = loads(amqp_method.args, payload, 4)
if amqp_method.content:
args.append(content)
for listener in listeners:
listener(*args)
#: Placeholder, the concrete implementations will have to
#: supply their own versions of _METHOD_MAP
_METHODS = {}
|