/usr/share/pyshared/carrot/backends/base.py is in python-carrot 0.10.7-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | """
Backend base classes.
"""
from carrot import serialization
ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
class MessageStateError(Exception):
"""The message has already been acknowledged."""
class BaseMessage(object):
"""Base class for received messages."""
_state = None
MessageStateError = MessageStateError
def __init__(self, backend, **kwargs):
self.backend = backend
self.body = kwargs.get("body")
self.delivery_tag = kwargs.get("delivery_tag")
self.content_type = kwargs.get("content_type")
self.content_encoding = kwargs.get("content_encoding")
self.delivery_info = kwargs.get("delivery_info", {})
self._decoded_cache = None
self._state = "RECEIVED"
def decode(self):
"""Deserialize the message body, returning the original
python structure sent by the publisher."""
return serialization.decode(self.body, self.content_type,
self.content_encoding)
@property
def payload(self):
"""The decoded message."""
if not self._decoded_cache:
self._decoded_cache = self.decode()
return self._decoded_cache
def ack(self):
"""Acknowledge this message as being processed.,
This will remove the message from the queue.
:raises MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
if self.acknowledged:
raise self.MessageStateError(
"Message already acknowledged with state: %s" % self._state)
self.backend.ack(self.delivery_tag)
self._state = "ACK"
def reject(self):
"""Reject this message.
The message will be discarded by the server.
:raises MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
if self.acknowledged:
raise self.MessageStateError(
"Message already acknowledged with state: %s" % self._state)
self.backend.reject(self.delivery_tag)
self._state = "REJECTED"
def requeue(self):
"""Reject this message and put it back on the queue.
You must not use this method as a means of selecting messages
to process.
:raises MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
if self.acknowledged:
raise self.MessageStateError(
"Message already acknowledged with state: %s" % self._state)
self.backend.requeue(self.delivery_tag)
self._state = "REQUEUED"
@property
def acknowledged(self):
return self._state in ACKNOWLEDGED_STATES
class BaseBackend(object):
"""Base class for backends."""
default_port = None
extra_options = None
def __init__(self, connection, **kwargs):
self.connection = connection
self.extra_options = kwargs.get("extra_options")
def queue_declare(self, *args, **kwargs):
"""Declare a queue by name."""
pass
def queue_delete(self, *args, **kwargs):
"""Delete a queue by name."""
pass
def exchange_declare(self, *args, **kwargs):
"""Declare an exchange by name."""
pass
def queue_bind(self, *args, **kwargs):
"""Bind a queue to an exchange."""
pass
def get(self, *args, **kwargs):
"""Pop a message off the queue."""
pass
def declare_consumer(self, *args, **kwargs):
pass
def consume(self, *args, **kwargs):
"""Iterate over the declared consumers."""
pass
def cancel(self, *args, **kwargs):
"""Cancel the consumer."""
pass
def ack(self, delivery_tag):
"""Acknowledge the message."""
pass
def queue_purge(self, queue, **kwargs):
"""Discard all messages in the queue. This will delete the messages
and results in an empty queue."""
return 0
def reject(self, delivery_tag):
"""Reject the message."""
pass
def requeue(self, delivery_tag):
"""Requeue the message."""
pass
def purge(self, queue, **kwargs):
"""Discard all messages in the queue."""
pass
def message_to_python(self, raw_message):
"""Convert received message body to a python datastructure."""
return raw_message
def prepare_message(self, message_data, delivery_mode, **kwargs):
"""Prepare message for sending."""
return message_data
def publish(self, message, exchange, routing_key, **kwargs):
"""Publish a message."""
pass
def close(self):
"""Close the backend."""
pass
def establish_connection(self):
"""Establish a connection to the backend."""
pass
def close_connection(self, connection):
"""Close the connection."""
pass
def flow(self, active):
"""Enable/disable flow from peer."""
pass
def qos(self, prefetch_size, prefetch_count, apply_global=False):
"""Request specific Quality of Service."""
pass
|