This file is indexed.

/usr/share/pyshared/txlongpoll/frontend.py is in python-txlongpoll 0.3.1+bzr86-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# Copyright 2005-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""
Async frontend server for serving answers from background processor.
"""

import json

from twisted.internet.defer import (
    Deferred,
    inlineCallbacks,
    returnValue,
    )
from twisted.python import log
from twisted.web.http import (
    BAD_REQUEST,
    INTERNAL_SERVER_ERROR,
    NOT_FOUND,
    REQUEST_TIMEOUT,
    )
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from txamqp.client import Closed
from txamqp.queue import (
    Closed as QueueClosed,
    Empty,
    )


__all__ = ["QueueManager", "FrontEndAjax"]


class NotFound(Exception):
    """Exception raised when a queue is not found in the message server."""


class QueueManager(object):
    """
    An AMQP consumer which handles messages sent over a "frontend" queue to
    set up temporary queues.  The L{get_message} method should be invoked to
    retrieve one single message from those temporary queues.

    @ivar message_timeout: time to wait for a message before giving up in
        C{get_message}.
    @ivar _channel: reference to the current C{AMQChannel}.
    @ivar _client: reference to the current C{AMQClient}.
    """

    # The timeout must be lower than the Apache one in front, which by default
    # is 5 minutes.
    message_timeout = 270

    def __init__(self, prefix=None):
        self._prefix = prefix
        self._channel = None
        self._client = None
        self._pending_requests = []
        # Preserve compatibility by using special forms for naming when a
        # prefix is specified.
        if self._prefix is not None and len(self._prefix) != 0:
            self._tag_form = "%s.notifications-tag.%%s.%%s" % self._prefix
            self._queue_form = "%s.notifications-queue.%%s" % self._prefix
        else:
            self._tag_form = "%s.%s"
            self._queue_form = "%s"

    def disconnected(self):
        """
        Called when losing the connection to broker: cancel all pending calls,
        reinitialize variables.
        """
        self._channel = None
        self._client = None

    def connected(self, (client, channel)):
        """
        This method should be used as the C{connected_callback} parameter to
        L{AMQFactory}.
        """
        log.msg("Async frontend connected to AMQP broker")
        self._client = client
        self._channel = channel
        # Make sure we only get one message at a time, to make load balancing
        # work.
        d = channel.basic_qos(prefetch_count=1)
        while self._pending_requests:
            self._pending_requests.pop(0).callback(None)
        return d

    def _wait_for_connection(self):
        """
        Return a L{Deferred} which will fire when the connection is available.
        """
        pending = Deferred()
        self._pending_requests.append(pending)
        return pending

    @inlineCallbacks
    def get_message(self, uuid, sequence):
        """Consume and return one message for C{uuid}.

        @param uuid: The identifier of the queue.
        @param sequence: The sequential number for identifying the subscriber
            in the queue.

        If no message is received within the number of seconds in
        L{message_timeout}, then the returned Deferred will errback with
        L{Empty}.
        """
        if self._channel is None:
            yield self._wait_for_connection()
        tag = self._tag_form % (uuid, sequence)
        try:
            yield self._channel.basic_consume(
                consumer_tag=tag, queue=(self._queue_form % uuid))

            log.msg("Consuming from queue '%s'" % uuid)

            queue = yield self._client.queue(tag)
            msg = yield queue.get(self.message_timeout)
        except Empty:
            # Let's wait for the cancel here
            yield self._channel.basic_cancel(consumer_tag=tag)
            self._client.queues.pop(tag, None)
            # Check for the messages arrived in the mean time
            if queue.pending:
                msg = queue.pending.pop()
                returnValue((msg.content.body, msg.delivery_tag))
            raise Empty()
        except QueueClosed:
            # The queue has been closed, presumably because of a side effect.
            # Let's retry after reconnection.
            yield self._wait_for_connection()
            data = yield self.get_message(uuid, sequence)
            returnValue(data)
        except Closed, e:
            if self._client and self._client.transport:
                self._client.transport.loseConnection()
            if e.args and e.args[0].reply_code == 404:
                raise NotFound()
            else:
                raise
        except:
            if self._client and self._client.transport:
                self._client.transport.loseConnection()
            raise

        yield self._channel.basic_cancel(consumer_tag=tag, nowait=True)
        self._client.queues.pop(tag, None)

        returnValue((msg.content.body, msg.delivery_tag))

    def reject_message(self, tag):
        """Put back a message."""
        return self._channel.basic_reject(tag, requeue=True)

    def ack_message(self, tag):
        """Confirm the reading of a message)."""
        return self._channel.basic_ack(tag)

    @inlineCallbacks
    def cancel_get_message(self, uuid, sequence):
        """
        Cancel a previous C{get_message} when a request is done, to be able to
        reuse the tag properly.

        @param uuid: The identifier of the queue.
        @param sequence: The sequential number for identifying the subscriber
            in the queue.
        """
        if self._client is not None:
            tag = self._tag_form % (uuid, sequence)
            queue = yield self._client.queue(tag)
            queue.put(Empty)


class FrontEndAjax(Resource):
    """
    A web resource which, when rendered with a C{'uuid'} in the request
    arguments, will return the raw data from the message queue associated with
    that UUID.
    """
    isLeaf = True

    def __init__(self, message_queue):
        Resource.__init__(self)
        self.message_queue = message_queue
        self._finished = {}

    def render(self, request):
        """Render the request.

        It expects a page key (the UUID), and a sequence number indicated how
        many times this key has been used, and use the page key to retrieve
        messages from the associated queue.
        """
        if "uuid" not in request.args and "sequence" not in request.args:
            request.setHeader("Content-Type", "text/plain")
            return "Async frontend for %s" % self.message_queue._prefix

        if "uuid" not in request.args or "sequence" not in request.args:
            request.setHeader("Content-Type", "text/plain")
            request.setResponseCode(BAD_REQUEST)
            return "Invalid request"

        uuid = request.args["uuid"][0]
        sequence = request.args["sequence"][0]
        if not uuid or not sequence:
            request.setHeader("Content-Type", "text/plain")
            request.setResponseCode(BAD_REQUEST)
            return "Invalid request"

        request_id = "%s-%s" % (uuid, sequence)

        def _finished(ignored):
            if request_id in self._finished:
                # If the request_id is already in finished, that means the
                # request terminated properly. We remove it from finished to
                # prevent from it growing indefinitely.
                self._finished.pop(request_id)
            else:
                # Otherwise, put it in finished so that the message is not sent
                # when write is called.
                self._finished[request_id] = True
                self.message_queue.cancel_get_message(uuid, sequence)

        request.notifyFinish().addBoth(_finished)

        d = self.message_queue.get_message(uuid, sequence)

        def write(data):
            result, tag = data
            if self._finished.get(request_id):
                self._finished.pop(request_id)
                self.message_queue.reject_message(tag)
                return

            self.message_queue.ack_message(tag)

            data = json.loads(result)

            if data.pop("original-uuid", None) == uuid:
                # Ignore the message for the page who emitted the job
                d = self.message_queue.get_message(uuid, sequence)
                d.addCallback(write)
                d.addErrback(failed)
                return

            if "error" in data:
                request.setResponseCode(BAD_REQUEST)

            request.setHeader("Content-Type", "application/json")

            request.write(result)
            self._finished[request_id] = False
            request.finish()

        def failed(error):
            if self._finished.get(request_id):
                self._finished.pop(request_id)
                return

            if error.check(Empty):
                request.setResponseCode(REQUEST_TIMEOUT)
            elif error.check(NotFound):
                request.setResponseCode(NOT_FOUND)
            else:
                log.err(error, "Failed to get message")
                request.setResponseCode(INTERNAL_SERVER_ERROR)
                request.write(str(error.value))
            self._finished[request_id] = False
            request.finish()

        d.addCallback(write)
        d.addErrback(failed)
        return NOT_DONE_YET