/usr/share/pyshared/kombu/common.py is in python-kombu 1.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | """
kombu.common
============
Common Utilities.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from kombu import entity
from kombu.utils import uuid
from kombu.utils.compat import defaultdict
declared_entities = defaultdict(lambda: set())
def maybe_declare(entity, channel):
declared = declared_entities[channel.connection.client]
if not entity.is_bound:
entity = entity(channel)
if entity not in declared:
entity.declare()
declared.add(entity)
return True
return False
class Broadcast(entity.Queue):
"""Convenience class used to define broadcast queues.
Every queue instance will have a unique name,
and both the queue and exchange is configued with auto deletion.
:keyword name: This is used as the name of the exchange.
:keyword queue: By default a unique id is used for the queue
name for every consumer. You can specify a custom queue
name here.
:keyword \*\*kwargs: See :class:`~kombu.entity.Queue` for a list
of additional keyword arguments supported.
"""
def __init__(self, name=None, queue=None, **kwargs):
kwargs.setdefault("exchange", entity.Exchange(name, type="fanout",
auto_delete=True))
kwargs.setdefault("auto_delete", True)
kwargs.setdefault("alias", name)
return super(Broadcast, self).__init__(
name=queue or "bcast.%s" % (uuid(), ), **kwargs)
def entry_to_queue(queue, **options):
binding_key = options.get("binding_key") or options.get("routing_key")
e_durable = options.get("exchange_durable")
if e_durable is None:
e_durable = options.get("durable")
e_auto_delete = options.get("exchange_auto_delete")
if e_auto_delete is None:
e_auto_delete = options.get("auto_delete")
q_durable = options.get("queue_durable")
if q_durable is None:
q_durable = options.get("durable")
q_auto_delete = options.get("queue_auto_delete")
if q_auto_delete is None:
q_auto_delete = options.get("auto_delete")
e_arguments = options.get("exchange_arguments")
q_arguments = options.get("queue_arguments")
b_arguments = options.get("binding_arguments")
exchange = entity.Exchange(options.get("exchange"),
type=options.get("exchange_type"),
delivery_mode=options.get("delivery_mode"),
routing_key=options.get("routing_key"),
durable=e_durable,
auto_delete=e_auto_delete,
arguments=e_arguments)
return entity.Queue(queue,
exchange=exchange,
routing_key=binding_key,
durable=q_durable,
exclusive=options.get("exclusive"),
auto_delete=q_auto_delete,
no_ack=options.get("no_ack"),
queue_arguments=q_arguments,
binding_arguments=b_arguments)
|