/usr/lib/python3/dist-packages/celery/backends/consul.py is in python3-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | # -*- coding: utf-8 -*-
"""Consul result store backend.
- :class:`ConsulBackend` implements KeyValueStoreBackend to store results
in the key-value store of Consul.
"""
from __future__ import absolute_import, unicode_literals
from kombu.utils.url import parse_url
from celery.exceptions import ImproperlyConfigured
from celery.backends.base import KeyValueStoreBackend, PY3
from celery.utils.log import get_logger
try:
import consul
except ImportError:
consul = None
logger = get_logger(__name__)
__all__ = ['ConsulBackend']
CONSUL_MISSING = """\
You need to install the python-consul library in order to use \
the Consul result store backend."""
class ConsulBackend(KeyValueStoreBackend):
"""Consul.io K/V store backend for Celery."""
consul = consul
supports_autoexpire = True
client = None
consistency = 'consistent'
path = None
def __init__(self, *args, **kwargs):
super(ConsulBackend, self).__init__(*args, **kwargs)
if self.consul is None:
raise ImproperlyConfigured(CONSUL_MISSING)
self._init_from_params(**parse_url(self.url))
def _init_from_params(self, hostname, port, virtual_host, **params):
logger.debug('Setting on Consul client to connect to %s:%d',
hostname, port)
self.path = virtual_host
self.client = consul.Consul(host=hostname, port=port,
consistency=self.consistency)
def _key_to_consul_key(self, key):
if PY3:
key = key.encode('utf-8')
return key if self.path is None else '{0}/{1}'.format(self.path, key)
def get(self, key):
key = self._key_to_consul_key(key)
logger.debug('Trying to fetch key %s from Consul', key)
try:
_, data = self.client.kv.get(key)
return data['Value']
except TypeError:
pass
def mget(self, keys):
for key in keys:
yield self.get(key)
def set(self, key, value):
"""Set a key in Consul.
Before creating the key it will create a session inside Consul
where it creates a session with a TTL
The key created afterwards will reference to the session's ID.
If the session expires it will remove the key so that results
can auto expire from the K/V store
"""
session_name = key
if PY3:
session_name = key.decode('utf-8')
key = self._key_to_consul_key(key)
logger.debug('Trying to create Consul session %s with TTL %d',
session_name, self.expires)
session_id = self.client.session.create(name=session_name,
behavior='delete',
ttl=self.expires)
logger.debug('Created Consul session %s', session_id)
logger.debug('Writing key %s to Consul', key)
return self.client.kv.put(key=key,
value=value,
acquire=session_id)
def delete(self, key):
key = self._key_to_consul_key(key)
logger.debug('Removing key %s from Consul', key)
return self.client.kv.delete(key)
|