/usr/share/pyshared/carrot/backends/pystomp.py is in python-carrot 0.10.7-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | import time
import socket
from itertools import count
from stompy import Client
from stompy import Empty as QueueEmpty
from carrot.backends.base import BaseMessage, BaseBackend
DEFAULT_PORT = 61613
class Message(BaseMessage):
"""A message received by the STOMP broker.
Usually you don't insantiate message objects yourself, but receive
them using a :class:`carrot.messaging.Consumer`.
:param backend: see :attr:`backend`.
:param frame: see :attr:`_frame`.
.. attribute:: body
The message body.
.. attribute:: delivery_tag
The message delivery tag, uniquely identifying this message.
.. attribute:: backend
The message backend used.
A subclass of :class:`carrot.backends.base.BaseBackend`.
.. attribute:: _frame
The frame received by the STOMP client. This is considered a private
variable and should never be used in production code.
"""
def __init__(self, backend, frame, **kwargs):
self._frame = frame
self.backend = backend
kwargs["body"] = frame.body
kwargs["delivery_tag"] = frame.headers["message-id"]
kwargs["content_type"] = frame.headers.get("content-type")
kwargs["content_encoding"] = frame.headers.get("content-encoding")
kwargs["priority"] = frame.headers.get("priority")
super(Message, self).__init__(backend, **kwargs)
def ack(self):
"""Acknowledge this message as being processed.,
This will remove the message from the queue.
:raises MessageStateError: If the message has already been
acknowledged/requeued/rejected.
"""
if self.acknowledged:
raise self.MessageStateError(
"Message already acknowledged with state: %s" % self._state)
self.backend.ack(self._frame)
self._state = "ACK"
def reject(self):
raise NotImplementedError(
"The STOMP backend does not implement basic.reject")
def requeue(self):
raise NotImplementedError(
"The STOMP backend does not implement requeue")
class Backend(BaseBackend):
Stomp = Client
Message = Message
default_port = DEFAULT_PORT
def __init__(self, connection, **kwargs):
self.connection = connection
self.default_port = kwargs.get("default_port", self.default_port)
self._channel = None
self._consumers = {} # open consumers by consumer tag
self._callbacks = {}
def establish_connection(self):
conninfo = self.connection
if not conninfo.port:
conninfo.port = self.default_port
stomp = self.Stomp(conninfo.hostname, conninfo.port)
stomp.connect(username=conninfo.userid, password=conninfo.password)
stomp.drain_events = self.drain_events
return stomp
def close_connection(self, connection):
try:
connection.disconnect()
except socket.error:
pass
def queue_exists(self, queue):
return True
def queue_purge(self, queue, **kwargs):
for purge_count in count(0):
try:
frame = self.channel.get_nowait()
except QueueEmpty:
return purge_count
else:
self.channel.ack(frame)
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
**kwargs):
ack = no_ack and "auto" or "client"
self.channel.subscribe(queue, ack=ack)
self._consumers[consumer_tag] = queue
self._callbacks[queue] = callback
def drain_events(self, timeout=None):
start_time = time.time()
while True:
frame = self.channel.get()
if frame:
break
if time.time() - time_start > timeout:
raise socket.timeout("the operation timed out.")
queue = frame.headers.get("destination")
if not queue or queue not in self._callbacks:
return
self._callbacks[queue](frame)
def consume(self, limit=None):
"""Returns an iterator that waits for one message at a time."""
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration
self.drain_events()
yield True
def queue_declare(self, queue, *args, **kwargs):
self.channel.subscribe(queue, ack="client")
def get(self, queue, no_ack=False):
try:
frame = self.channel.get_nowait()
except QueueEmpty:
return None
else:
return self.message_to_python(frame)
def ack(self, frame):
self.channel.ack(frame)
def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
return self.Message(backend=self, frame=raw_message)
def prepare_message(self, message_data, delivery_mode, priority=0,
content_type=None, content_encoding=None):
persistent = "false"
if delivery_mode == 2:
persistent = "true"
priority = priority or 0
return {"body": message_data,
"persistent": persistent,
"priority": priority,
"content-encoding": content_encoding,
"content-type": content_type}
def publish(self, message, exchange, routing_key, **kwargs):
message["destination"] = exchange
self.channel.stomp.send(message)
def cancel(self, consumer_tag):
if not self._channel or consumer_tag not in self._consumers:
return
queue = self._consumers.pop(consumer_tag)
self.channel.unsubscribe(queue)
def close(self):
for consumer_tag in self._consumers.keys():
self.cancel(consumer_tag)
if self._channel:
try:
self._channel.disconnect()
except socket.error:
pass
@property
def channel(self):
if not self._channel:
# Sorry, but the python-stomp library needs one connection
# for each channel.
self._channel = self.establish_connection()
return self._channel
|