/usr/share/pyshared/celery/task/http.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | # -*- coding: utf-8 -*-
"""
celery.task.http
~~~~~~~~~~~~~~~~
Task webhooks implementation.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import sys
import urllib2
from urllib import urlencode
from urlparse import urlparse
try:
from urlparse import parse_qsl
except ImportError: # pragma: no cover
from cgi import parse_qsl # noqa
from anyjson import deserialize
from .. import __version__ as celery_version
from .base import Task as BaseTask
GET_METHODS = frozenset(["GET", "HEAD"])
class InvalidResponseError(Exception):
"""The remote server gave an invalid response."""
class RemoteExecuteError(Exception):
"""The remote task gave a custom error."""
class UnknownStatusError(InvalidResponseError):
"""The remote server gave an unknown status."""
def maybe_utf8(value):
"""Encode to utf-8, only if the value is Unicode."""
if isinstance(value, unicode):
return value.encode("utf-8")
return value
if sys.version_info >= (3, 0):
def utf8dict(tup):
if not isinstance(tup, dict):
return dict(tup)
return tup
else:
def utf8dict(tup): # noqa
"""With a dict's items() tuple return a new dict with any utf-8
keys/values encoded."""
return dict((key.encode("utf-8"), maybe_utf8(value))
for key, value in tup)
def extract_response(raw_response):
"""Extract the response text from a raw JSON response."""
if not raw_response:
raise InvalidResponseError("Empty response")
try:
payload = deserialize(raw_response)
except ValueError, exc:
raise InvalidResponseError(str(exc))
status = payload["status"]
if status == "success":
return payload["retval"]
elif status == "failure":
raise RemoteExecuteError(payload.get("reason"))
else:
raise UnknownStatusError(str(status))
class MutableURL(object):
"""Object wrapping a Uniform Resource Locator.
Supports editing the query parameter list.
You can convert the object back to a string, the query will be
properly urlencoded.
Examples
>>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
>>> url.query
{'x': '3', 'y': '4'}
>>> str(url)
'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
>>> url.query["x"] = 10
>>> url.query.update({"George": "Costanza"})
>>> str(url)
'http://www.google.com:6580/foo/bar?y=4&x=10&George=Costanza#foo'
"""
def __init__(self, url):
self.parts = urlparse(url)
self.query = dict(parse_qsl(self.parts[4]))
def __str__(self):
scheme, netloc, path, params, query, fragment = self.parts
query = urlencode(utf8dict(self.query.items()))
components = ["%s://" % scheme,
"%s" % netloc,
path and "%s" % path or "/",
params and ";%s" % params or None,
query and "?%s" % query or None,
fragment and "#%s" % fragment or None]
return "".join(filter(None, components))
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, str(self))
class HttpDispatch(object):
"""Make task HTTP request and collect the task result.
:param url: The URL to request.
:param method: HTTP method used. Currently supported methods are `GET`
and `POST`.
:param task_kwargs: Task keyword arguments.
:param logger: Logger used for user/system feedback.
"""
user_agent = "celery/%s" % celery_version
timeout = 5
def __init__(self, url, method, task_kwargs, logger):
self.url = url
self.method = method
self.task_kwargs = task_kwargs
self.logger = logger
def make_request(self, url, method, params):
"""Makes an HTTP request and returns the response."""
request = urllib2.Request(url, params)
for key, val in self.http_headers.items():
request.add_header(key, val)
response = urllib2.urlopen(request) # user catches errors.
return response.read()
def dispatch(self):
"""Dispatch callback and return result."""
url = MutableURL(self.url)
params = None
if self.method in GET_METHODS:
url.query.update(self.task_kwargs)
else:
params = urlencode(utf8dict(self.task_kwargs.items()))
raw_response = self.make_request(str(url), self.method, params)
return extract_response(raw_response)
@property
def http_headers(self):
headers = {"User-Agent": self.user_agent}
return headers
class HttpDispatchTask(BaseTask):
"""Task dispatching to an URL.
:keyword url: The URL location of the HTTP callback task.
:keyword method: Method to use when dispatching the callback. Usually
`GET` or `POST`.
:keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.
.. attribute:: url
If this is set, this is used as the default URL for requests.
Default is to require the user of the task to supply the url as an
argument, as this attribute is intended for subclasses.
.. attribute:: method
If this is set, this is the default method used for requests.
Default is to require the user of the task to supply the method as an
argument, as this attribute is intended for subclasses.
"""
url = None
method = None
def run(self, url=None, method="GET", **kwargs):
url = url or self.url
method = method or self.method
logger = self.get_logger(**kwargs)
return HttpDispatch(url, method, kwargs, logger).dispatch()
class URL(MutableURL):
"""HTTP Callback URL
Supports requesting an URL asynchronously.
:param url: URL to request.
:keyword dispatcher: Class used to dispatch the request.
By default this is :class:`HttpDispatchTask`.
"""
dispatcher = HttpDispatchTask
def __init__(self, url, dispatcher=None):
super(URL, self).__init__(url)
self.dispatcher = dispatcher or self.dispatcher
def get_async(self, **kwargs):
return self.dispatcher.delay(str(self), "GET", **kwargs)
def post_async(self, **kwargs):
return self.dispatcher.delay(str(self), "POST", **kwargs)
|