/usr/lib/python2.7/dist-packages/cl/models.py is in python-cl 0.0.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | """cl.models"""
from __future__ import absolute_import
from kombu import Consumer, Queue
from kombu.utils import gen_unique_id
from . import Actor
_all__ = ["ModelActor", "ModelConsumer"]
class ModelConsumer(Consumer):
model = None
field = "name"
auto_delete = True
def __init__(self, channel, exchange, *args, **kwargs):
model = kwargs.pop("model", None)
self.model = model if model is not None else self.model
self.exchange = exchange
self.prepare_signals(kwargs.pop("sigmap", None))
queues = self.sync_queues(kwargs.pop("queues", []))
super(ModelConsumer, self).__init__(channel, queues, *args, **kwargs)
def prepare_signals(self, sigmap=None):
for callback, connect in (sigmap or {}).iteritems():
if isinstance(callback, basestring):
callback = getattr(self, callback)
connect(callback)
def create_queue(self, field_value):
return Queue(gen_unique_id(), self.exchange, field_value,
auto_delete=self.auto_delete)
def sync_queues(self, keep_queues=[]):
expected = [getattr(obj, self.field)
for obj in self.model._default_manager.enabled()]
queues = set()
create = self.create_queue
for v in expected:
queues.add(create(v))
for queue in queues:
if queue.routing_key not in expected:
queues.discard(v)
return list(keep_queues) + list(queues)
def on_create(self, instance=None, **kwargs):
fv = getattr(instance, self.field)
if not self.find_queue_by_rkey(fv):
self.add_queue(self.create_queue(fv))
self.consume()
def on_delete(self, instance=None, **kwargs):
fv = getattr(instance, self.field)
queue = self.find_queue_by_rkey(fv)
if queue:
self.cancel_by_queue(queue.name)
def find_queue_by_rkey(self, rkey):
for queue in self.queues:
if queue.routing_key == rkey:
return queue
class ModelActor(Actor):
#: The model this actor is a controller for (*required*).
model = None
#: Map of signals to connect and corresponding actions.
sigmap = {}
def __init__(self, connection=None, id=None, name=None, *args, **kwargs):
if self.model is None:
raise NotImplementedError(
"ModelActors must define the 'model' attribute!")
if not name or self.name:
name = self.model.__name__
super(ModelActor, self).__init__(connection, id, name, *args, **kwargs)
def Consumer(self, channel, **kwargs):
return ModelConsumer(channel, self.exchange,
callbacks=[self.on_message],
sigmap=self.sigmap, model=self.model,
queues=[self.get_scatter_queue(),
self.get_rr_queue()],
**kwargs)
|