/usr/lib/python2.7/dist-packages/djcelery/views.py is in python-django-celery 3.1.17-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | from __future__ import absolute_import, unicode_literals
from functools import wraps
from django.http import HttpResponse, Http404
from anyjson import serialize
from celery import states
from celery.registry import tasks
from celery.result import AsyncResult
from celery.utils import get_full_cls_name, kwdict
from celery.utils.encoding import safe_repr
# Ensure built-in tasks are loaded for task_list view
import celery.task # noqa
def JsonResponse(response):
return HttpResponse(serialize(response), content_type='application/json')
def task_view(task):
"""Decorator turning any task into a view that applies the task
asynchronously. Keyword arguments (via URLconf, etc.) will
supercede GET or POST parameters when there are conflicts.
Returns a JSON dictionary containing the keys ``ok``, and
``task_id``.
"""
def _applier(request, **options):
kwargs = kwdict(request.method == 'POST' and
request.POST or request.GET)
# no multivalue
kwargs = dict(((k, v) for k, v in kwargs.items()), **options)
result = task.apply_async(kwargs=kwargs)
return JsonResponse({'ok': 'true', 'task_id': result.task_id})
return _applier
def apply(request, task_name):
"""View applying a task.
**Note:** Please use this with caution. Preferably you shouldn't make this
publicly accessible without ensuring your code is safe!
"""
try:
task = tasks[task_name]
except KeyError:
raise Http404('apply: no such task')
return task_view(task)(request)
def is_task_successful(request, task_id):
"""Returns task execute status in JSON format."""
return JsonResponse({'task': {
'id': task_id,
'executed': AsyncResult(task_id).successful(),
}})
def task_status(request, task_id):
"""Returns task status and result in JSON format."""
result = AsyncResult(task_id)
state, retval = result.state, result.result
response_data = dict(id=task_id, status=state, result=retval)
if state in states.EXCEPTION_STATES:
traceback = result.traceback
response_data.update({'result': safe_repr(retval),
'exc': get_full_cls_name(retval.__class__),
'traceback': traceback})
return JsonResponse({'task': response_data})
def registered_tasks(request):
"""View returning all defined tasks as a JSON object."""
return JsonResponse({'regular': list(tasks.regular().keys()),
'periodic': list(tasks.periodic().keys())})
def task_webhook(fun):
"""Decorator turning a function into a task webhook.
If an exception is raised within the function, the decorated
function catches this and returns an error JSON response, otherwise
it returns the result as a JSON response.
Example:
.. code-block:: python
@task_webhook
def add(request):
x = int(request.GET['x'])
y = int(request.GET['y'])
return x + y
def view(request):
response = add(request)
print(response.content)
Gives::
"{'status': 'success', 'retval': 100}"
"""
@wraps(fun)
def _inner(*args, **kwargs):
try:
retval = fun(*args, **kwargs)
except Exception as exc:
response = {'status': 'failure', 'reason': safe_repr(exc)}
else:
response = {'status': 'success', 'retval': retval}
return JsonResponse(response)
return _inner
|