This file is indexed.

/usr/share/pyshared/celery/task/http.py is in python-celery 2.4.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# -*- coding: utf-8 -*-
"""
    celery.task.http
    ~~~~~~~~~~~~~~~~

    Task webhooks implementation.

    :copyright: (c) 2009 - 2011 by Ask Solem.
    :license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import sys
import urllib2

from urllib import urlencode
from urlparse import urlparse
try:
    from urlparse import parse_qsl
except ImportError:  # pragma: no cover
    from cgi import parse_qsl  # noqa

from anyjson import deserialize

from .. import __version__ as celery_version
from .base import Task as BaseTask

GET_METHODS = frozenset(["GET", "HEAD"])


class InvalidResponseError(Exception):
    """The remote server gave an invalid response."""


class RemoteExecuteError(Exception):
    """The remote task gave a custom error."""


class UnknownStatusError(InvalidResponseError):
    """The remote server gave an unknown status."""


def maybe_utf8(value):
    """Encode to utf-8, only if the value is Unicode."""
    if isinstance(value, unicode):
        return value.encode("utf-8")
    return value


if sys.version_info >= (3, 0):

    def utf8dict(tup):
        if not isinstance(tup, dict):
            return dict(tup)
        return tup
else:

    def utf8dict(tup):  # noqa
        """With a dict's items() tuple return a new dict with any utf-8
        keys/values encoded."""
        return dict((key.encode("utf-8"), maybe_utf8(value))
                        for key, value in tup)


def extract_response(raw_response):
    """Extract the response text from a raw JSON response."""
    if not raw_response:
        raise InvalidResponseError("Empty response")
    try:
        payload = deserialize(raw_response)
    except ValueError, exc:
        raise InvalidResponseError(str(exc))

    status = payload["status"]
    if status == "success":
        return payload["retval"]
    elif status == "failure":
        raise RemoteExecuteError(payload.get("reason"))
    else:
        raise UnknownStatusError(str(status))


class MutableURL(object):
    """Object wrapping a Uniform Resource Locator.

    Supports editing the query parameter list.
    You can convert the object back to a string, the query will be
    properly urlencoded.

    Examples

        >>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
        >>> url.query
        {'x': '3', 'y': '4'}
        >>> str(url)
        'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
        >>> url.query["x"] = 10
        >>> url.query.update({"George": "Costanza"})
        >>> str(url)
        'http://www.google.com:6580/foo/bar?y=4&x=10&George=Costanza#foo'

    """
    def __init__(self, url):
        self.parts = urlparse(url)
        self.query = dict(parse_qsl(self.parts[4]))

    def __str__(self):
        scheme, netloc, path, params, query, fragment = self.parts
        query = urlencode(utf8dict(self.query.items()))
        components = ["%s://" % scheme,
                      "%s" % netloc,
                      path and "%s" % path or "/",
                      params and ";%s" % params or None,
                      query and "?%s" % query or None,
                      fragment and "#%s" % fragment or None]
        return "".join(filter(None, components))

    def __repr__(self):
        return "<%s: %s>" % (self.__class__.__name__, str(self))


class HttpDispatch(object):
    """Make task HTTP request and collect the task result.

    :param url: The URL to request.
    :param method: HTTP method used. Currently supported methods are `GET`
        and `POST`.
    :param task_kwargs: Task keyword arguments.
    :param logger: Logger used for user/system feedback.

    """
    user_agent = "celery/%s" % celery_version
    timeout = 5

    def __init__(self, url, method, task_kwargs, logger):
        self.url = url
        self.method = method
        self.task_kwargs = task_kwargs
        self.logger = logger

    def make_request(self, url, method, params):
        """Makes an HTTP request and returns the response."""
        request = urllib2.Request(url, params)
        for key, val in self.http_headers.items():
            request.add_header(key, val)
        response = urllib2.urlopen(request)         # user catches errors.
        return response.read()

    def dispatch(self):
        """Dispatch callback and return result."""
        url = MutableURL(self.url)
        params = None
        if self.method in GET_METHODS:
            url.query.update(self.task_kwargs)
        else:
            params = urlencode(utf8dict(self.task_kwargs.items()))
        raw_response = self.make_request(str(url), self.method, params)
        return extract_response(raw_response)

    @property
    def http_headers(self):
        headers = {"User-Agent": self.user_agent}
        return headers


class HttpDispatchTask(BaseTask):
    """Task dispatching to an URL.

    :keyword url: The URL location of the HTTP callback task.
    :keyword method: Method to use when dispatching the callback. Usually
        `GET` or `POST`.
    :keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.

    .. attribute:: url

        If this is set, this is used as the default URL for requests.
        Default is to require the user of the task to supply the url as an
        argument, as this attribute is intended for subclasses.

    .. attribute:: method

        If this is set, this is the default method used for requests.
        Default is to require the user of the task to supply the method as an
        argument, as this attribute is intended for subclasses.

    """

    url = None
    method = None

    def run(self, url=None, method="GET", **kwargs):
        url = url or self.url
        method = method or self.method
        logger = self.get_logger(**kwargs)
        return HttpDispatch(url, method, kwargs, logger).dispatch()


class URL(MutableURL):
    """HTTP Callback URL

    Supports requesting an URL asynchronously.

    :param url: URL to request.
    :keyword dispatcher: Class used to dispatch the request.
        By default this is :class:`HttpDispatchTask`.

    """
    dispatcher = HttpDispatchTask

    def __init__(self, url, dispatcher=None):
        super(URL, self).__init__(url)
        self.dispatcher = dispatcher or self.dispatcher

    def get_async(self, **kwargs):
        return self.dispatcher.delay(str(self), "GET", **kwargs)

    def post_async(self, **kwargs):
        return self.dispatcher.delay(str(self), "POST", **kwargs)