/usr/lib/python2.7/dist-packages/celery/backends/database/__init__.py is in python-celery 3.1.6-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | # -*- coding: utf-8 -*-
"""
celery.backends.database
~~~~~~~~~~~~~~~~~~~~~~~~
SQLAlchemy result store backend.
"""
from __future__ import absolute_import
from functools import wraps
from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.five import range
from celery.utils.timeutils import maybe_timedelta
from celery.backends.base import BaseBackend
from .models import Task, TaskSet
from .session import ResultSession
__all__ = ['DatabaseBackend']
def _sqlalchemy_installed():
try:
import sqlalchemy
except ImportError:
raise ImproperlyConfigured(
'The database result backend requires SQLAlchemy to be installed.'
'See http://pypi.python.org/pypi/SQLAlchemy')
return sqlalchemy
_sqlalchemy_installed()
from sqlalchemy.exc import DatabaseError, OperationalError
def retry(fun):
@wraps(fun)
def _inner(*args, **kwargs):
max_retries = kwargs.pop('max_retries', 3)
for retries in range(max_retries):
try:
return fun(*args, **kwargs)
except (DatabaseError, OperationalError):
if retries + 1 >= max_retries:
raise
return _inner
class DatabaseBackend(BaseBackend):
"""The database result backend."""
# ResultSet.iterate should sleep this much between each pool,
# to not bombard the database with queries.
subpolling_interval = 0.5
def __init__(self, dburi=None, expires=None,
engine_options=None, url=None, **kwargs):
# The `url` argument was added later and is used by
# the app to set backend by url (celery.backends.get_backend_by_url)
super(DatabaseBackend, self).__init__(**kwargs)
conf = self.app.conf
self.expires = maybe_timedelta(self.prepare_expires(expires))
self.dburi = url or dburi or conf.CELERY_RESULT_DBURI
self.engine_options = dict(
engine_options or {},
**conf.CELERY_RESULT_ENGINE_OPTIONS or {})
self.short_lived_sessions = kwargs.get(
'short_lived_sessions',
conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS,
)
tablenames = conf.CELERY_RESULT_DB_TABLENAMES or {}
Task.__table__.name = tablenames.get('task', 'celery_taskmeta')
TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')
if not self.dburi:
raise ImproperlyConfigured(
'Missing connection string! Do you have '
'CELERY_RESULT_DBURI set to a real value?')
def ResultSession(self):
return ResultSession(
dburi=self.dburi,
short_lived_sessions=self.short_lived_sessions,
**self.engine_options
)
@retry
def _store_result(self, task_id, result, status,
traceback=None, max_retries=3, **kwargs):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if not task:
task = Task(task_id)
session.add(task)
session.flush()
task.result = result
task.status = status
task.traceback = traceback
session.commit()
return result
finally:
session.close()
@retry
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if task is None:
task = Task(task_id)
task.status = states.PENDING
task.result = None
return task.to_dict()
finally:
session.close()
@retry
def _save_group(self, group_id, result):
"""Store the result of an executed group."""
session = self.ResultSession()
try:
group = TaskSet(group_id, result)
session.add(group)
session.flush()
session.commit()
return result
finally:
session.close()
@retry
def _restore_group(self, group_id):
"""Get metadata for group by id."""
session = self.ResultSession()
try:
group = session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).first()
if group:
return group.to_dict()
finally:
session.close()
@retry
def _delete_group(self, group_id):
"""Delete metadata for group by id."""
session = self.ResultSession()
try:
session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).delete()
session.flush()
session.commit()
finally:
session.close()
@retry
def _forget(self, task_id):
"""Forget about result."""
session = self.ResultSession()
try:
session.query(Task).filter(Task.task_id == task_id).delete()
session.commit()
finally:
session.close()
def cleanup(self):
"""Delete expired metadata."""
session = self.ResultSession()
expires = self.expires
now = self.app.now()
try:
session.query(Task).filter(
Task.date_done < (now - expires)).delete()
session.query(TaskSet).filter(
TaskSet.date_done < (now - expires)).delete()
session.commit()
finally:
session.close()
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
dict(dburi=self.dburi,
expires=self.expires,
engine_options=self.engine_options))
return super(DatabaseBackend, self).__reduce__(args, kwargs)
|