/usr/share/pyshared/djcelery/backends/database.py is in python-django-celery 2.5.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | from __future__ import absolute_import
from celery import current_app
from celery.backends.base import BaseDictBackend
from celery.utils.timeutils import maybe_timedelta
from ..models import TaskMeta, TaskSetMeta
class DatabaseBackend(BaseDictBackend):
"""The database backend.
Using Django models to store task state.
"""
TaskModel = TaskMeta
TaskSetModel = TaskSetMeta
expires = current_app.conf.CELERY_TASK_RESULT_EXPIRES
create_django_tables = True
subpolling_interval = 0.5
def _store_result(self, task_id, result, status, traceback=None):
"""Store return value and status of an executed task."""
self.TaskModel._default_manager.store_result(task_id, result, status,
traceback=traceback)
return result
def _save_taskset(self, taskset_id, result):
"""Store the result of an executed taskset."""
self.TaskSetModel._default_manager.store_result(taskset_id, result)
return result
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
return self.TaskModel._default_manager.get_task(task_id).to_dict()
def _restore_taskset(self, taskset_id):
"""Get taskset metadata for a taskset by id."""
meta = self.TaskSetModel._default_manager.restore_taskset(taskset_id)
if meta:
return meta.to_dict()
def _delete_taskset(self, taskset_id):
self.TaskSetModel._default_manager.delete_taskset(taskset_id)
def _forget(self, task_id):
try:
self.TaskModel._default_manager.get(task_id=task_id).delete()
except self.TaskModel.DoesNotExist:
pass
def cleanup(self):
"""Delete expired metadata."""
expires = maybe_timedelta(self.expires)
for model in self.TaskModel, self.TaskSetModel:
model._default_manager.delete_expired(expires)
|