/usr/share/pyshared/celery/backends/tyrant.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | # -*- coding: utf-8 -*-
"""celery.backends.tyrant"""
from __future__ import absolute_import
try:
import pytyrant
except ImportError:
pytyrant = None # noqa
from ..exceptions import ImproperlyConfigured
from .base import KeyValueStoreBackend
class TyrantBackend(KeyValueStoreBackend):
"""Tokyo Cabinet based task backend store.
.. attribute:: tyrant_host
The hostname to the Tokyo Tyrant server.
.. attribute:: tyrant_port
The port to the Tokyo Tyrant server.
"""
tyrant_host = None
tyrant_port = None
def __init__(self, tyrant_host=None, tyrant_port=None, **kwargs):
"""Initialize Tokyo Tyrant backend instance.
Raises :class:`celery.exceptions.ImproperlyConfigured` if
:setting:`TT_HOST` or :setting:`TT_PORT` is not set.
"""
super(TyrantBackend, self).__init__(**kwargs)
if not pytyrant:
raise ImproperlyConfigured(
"You need to install the pytyrant library to use the "
+ "Tokyo Tyrant backend.")
self.tyrant_host = (tyrant_host or
self.app.conf.get("TT_HOST") or
self.tyrant_host)
self.tyrant_port = (tyrant_port or
self.app.conf.get("TT_PORT") or
self.tyrant_port)
if self.tyrant_port:
self.tyrant_port = int(self.tyrant_port)
if not self.tyrant_host or not self.tyrant_port:
raise ImproperlyConfigured(
"To use the Tokyo Tyrant backend, you have to "
"set the TT_HOST and TT_PORT settings in your settings.py")
self._connection = None
def open(self):
"""Get :class:`pytyrant.PyTyrant` instance with the current
server configuration.
The connection is then cached until you do an
explicit :meth:`close`.
"""
# connection overrides bool()
if self._connection is None:
self._connection = pytyrant.PyTyrant.open(self.tyrant_host,
self.tyrant_port)
return self._connection
def close(self):
"""Close the tyrant connection and remove the cache."""
# connection overrides bool()
if self._connection is not None:
self._connection.close()
self._connection = None
def process_cleanup(self):
self.close()
def get(self, key):
return self.open().get(key)
def set(self, key, value):
self.open()[key] = value
def delete(self, key):
self.open().pop(key, None)
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
dict(tyrant_host=self.tyrant_host,
tyrant_port=self.tyrant_port))
return super(TyrantBackend, self).__reduce__(args, kwargs)
|