/usr/share/pyshared/celery/backends/cache.py is in python-celery 2.5.3-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # -*- coding: utf-8 -*-
from __future__ import absolute_import
from ..datastructures import LRUCache
from ..exceptions import ImproperlyConfigured
from ..utils import cached_property
from .base import KeyValueStoreBackend
_imp = [None]
def import_best_memcache():
if _imp[0] is None:
is_pylibmc = False
try:
import pylibmc as memcache
is_pylibmc = True
except ImportError:
try:
import memcache # noqa
except ImportError:
raise ImproperlyConfigured(
"Memcached backend requires either the 'pylibmc' "
"or 'memcache' library")
_imp[0] = (is_pylibmc, memcache)
return _imp[0]
def get_best_memcache(*args, **kwargs):
behaviors = kwargs.pop("behaviors", None)
is_pylibmc, memcache = import_best_memcache()
client = memcache.Client(*args, **kwargs)
if is_pylibmc and behaviors is not None:
client.behaviors = behaviors
return client
class DummyClient(object):
def __init__(self, *args, **kwargs):
self.cache = LRUCache(limit=5000)
def get(self, key, *args, **kwargs):
return self.cache.get(key)
def get_multi(self, keys):
cache = self.cache
return dict((k, cache[k]) for k in keys if k in cache)
def set(self, key, value, *args, **kwargs):
self.cache[key] = value
def delete(self, key, *args, **kwargs):
self.cache.pop(key, None)
def incr(self, key, delta=1):
return self.cache.incr(key, delta)
backends = {"memcache": lambda: get_best_memcache,
"memcached": lambda: get_best_memcache,
"pylibmc": lambda: get_best_memcache,
"memory": lambda: DummyClient}
class CacheBackend(KeyValueStoreBackend):
servers = None
supports_native_join = True
def __init__(self, expires=None, backend=None, options={}, **kwargs):
super(CacheBackend, self).__init__(self, **kwargs)
self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
**options)
self.backend = backend or self.app.conf.CELERY_CACHE_BACKEND
if self.backend:
self.backend, _, servers = self.backend.partition("://")
self.servers = servers.rstrip('/').split(";")
self.expires = self.prepare_expires(expires, type=int)
try:
self.Client = backends[self.backend]()
except KeyError:
raise ImproperlyConfigured(
"Unknown cache backend: %s. Please use one of the "
"following backends: %s" % (self.backend,
", ".join(backends.keys())))
def get(self, key):
return self.client.get(key)
def mget(self, keys):
return self.client.get_multi(keys)
def set(self, key, value):
return self.client.set(key, value, self.expires)
def delete(self, key):
return self.client.delete(key)
def on_chord_apply(self, setid, body, result=None, **kwargs):
key = self.get_key_for_chord(setid)
self.client.set(key, '0', time=86400)
def on_chord_part_return(self, task, propagate=False):
from ..task.sets import subtask
from ..result import TaskSetResult
setid = task.request.taskset
if not setid:
return
key = self.get_key_for_chord(setid)
deps = TaskSetResult.restore(setid, backend=task.backend)
if self.client.incr(key) >= deps.total:
subtask(task.request.chord).delay(deps.join(propagate=propagate))
deps.delete()
self.client.delete(key)
@cached_property
def client(self):
return self.Client(self.servers, **self.options)
def __reduce__(self, args=(), kwargs={}):
servers = ";".join(self.servers)
backend = "%s://%s/" % (self.backend, servers)
kwargs.update(
dict(backend=backend,
expires=self.expires,
options=self.options))
return super(CacheBackend, self).__reduce__(args, kwargs)
|