This file is indexed.

/usr/share/pyshared/carrot/backends/pystomp.py is in python-carrot 0.10.7-1ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import time
import socket
from itertools import count

from stompy import Client
from stompy import Empty as QueueEmpty

from carrot.backends.base import BaseMessage, BaseBackend

DEFAULT_PORT = 61613


class Message(BaseMessage):
    """A message received by the STOMP broker.

    Usually you don't insantiate message objects yourself, but receive
    them using a :class:`carrot.messaging.Consumer`.

    :param backend: see :attr:`backend`.
    :param frame: see :attr:`_frame`.

    .. attribute:: body

        The message body.

    .. attribute:: delivery_tag

        The message delivery tag, uniquely identifying this message.

    .. attribute:: backend

        The message backend used.
        A subclass of :class:`carrot.backends.base.BaseBackend`.

    .. attribute:: _frame

        The frame received by the STOMP client. This is considered a private
        variable and should never be used in production code.

    """

    def __init__(self, backend, frame, **kwargs):
        self._frame = frame
        self.backend = backend

        kwargs["body"] = frame.body
        kwargs["delivery_tag"] = frame.headers["message-id"]
        kwargs["content_type"] = frame.headers.get("content-type")
        kwargs["content_encoding"] = frame.headers.get("content-encoding")
        kwargs["priority"] = frame.headers.get("priority")

        super(Message, self).__init__(backend, **kwargs)

    def ack(self):
        """Acknowledge this message as being processed.,
        This will remove the message from the queue.

        :raises MessageStateError: If the message has already been
            acknowledged/requeued/rejected.

        """
        if self.acknowledged:
            raise self.MessageStateError(
                "Message already acknowledged with state: %s" % self._state)
        self.backend.ack(self._frame)
        self._state = "ACK"

    def reject(self):
        raise NotImplementedError(
            "The STOMP backend does not implement basic.reject")

    def requeue(self):
        raise NotImplementedError(
            "The STOMP backend does not implement requeue")


class Backend(BaseBackend):
    Stomp = Client
    Message = Message
    default_port = DEFAULT_PORT

    def __init__(self, connection, **kwargs):
        self.connection = connection
        self.default_port = kwargs.get("default_port", self.default_port)
        self._channel = None
        self._consumers = {} # open consumers by consumer tag
        self._callbacks = {}

    def establish_connection(self):
        conninfo = self.connection
        if not conninfo.port:
            conninfo.port = self.default_port
        stomp = self.Stomp(conninfo.hostname, conninfo.port)
        stomp.connect(username=conninfo.userid, password=conninfo.password)
        stomp.drain_events = self.drain_events
        return stomp

    def close_connection(self, connection):
        try:
            connection.disconnect()
        except socket.error:
            pass

    def queue_exists(self, queue):
        return True

    def queue_purge(self, queue, **kwargs):
        for purge_count in count(0):
            try:
                frame = self.channel.get_nowait()
            except QueueEmpty:
                return purge_count
            else:
                self.channel.ack(frame)

    def declare_consumer(self, queue, no_ack, callback, consumer_tag,
            **kwargs):
        ack = no_ack and "auto" or "client"
        self.channel.subscribe(queue, ack=ack)
        self._consumers[consumer_tag] = queue
        self._callbacks[queue] = callback

    def drain_events(self, timeout=None):
        start_time = time.time()
        while True:
            frame = self.channel.get()
            if frame:
                break
            if time.time() - time_start > timeout:
                raise socket.timeout("the operation timed out.")
        queue = frame.headers.get("destination")
        if not queue or queue not in self._callbacks:
            return
        self._callbacks[queue](frame)

    def consume(self, limit=None):
        """Returns an iterator that waits for one message at a time."""
        for total_message_count in count():
            if limit and total_message_count >= limit:
                raise StopIteration
            self.drain_events()
            yield True

    def queue_declare(self, queue, *args, **kwargs):
        self.channel.subscribe(queue, ack="client")

    def get(self, queue, no_ack=False):
        try:
            frame = self.channel.get_nowait()
        except QueueEmpty:
            return None
        else:
            return self.message_to_python(frame)

    def ack(self, frame):
        self.channel.ack(frame)

    def message_to_python(self, raw_message):
        """Convert encoded message body back to a Python value."""
        return self.Message(backend=self, frame=raw_message)

    def prepare_message(self, message_data, delivery_mode, priority=0,
            content_type=None, content_encoding=None):
        persistent = "false"
        if delivery_mode == 2:
            persistent = "true"
        priority = priority or 0
        return {"body": message_data,
                "persistent": persistent,
                "priority": priority,
                "content-encoding": content_encoding,
                "content-type": content_type}

    def publish(self, message, exchange, routing_key, **kwargs):
        message["destination"] = exchange
        self.channel.stomp.send(message)

    def cancel(self, consumer_tag):
        if not self._channel or consumer_tag not in self._consumers:
            return
        queue = self._consumers.pop(consumer_tag)
        self.channel.unsubscribe(queue)

    def close(self):
        for consumer_tag in self._consumers.keys():
            self.cancel(consumer_tag)
        if self._channel:
            try:
                self._channel.disconnect()
            except socket.error:
                pass

    @property
    def channel(self):
        if not self._channel:
            # Sorry, but the python-stomp library needs one connection
            # for each channel.
            self._channel = self.establish_connection()
        return self._channel