/usr/lib/python2.7/dist-packages/kombu/transport/zookeeper.py is in python-kombu 4.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | """Zookeeper transport.
:copyright: (c) 2010 - 2013 by Mahendra M.
:license: BSD, see LICENSE for more details.
**Synopsis**
Connects to a zookeeper node as <server>:<port>/<vhost>
The <vhost> becomes the base for all the other znodes. So we can use
it like a vhost.
This uses the built-in kazoo recipe for queues
**References**
- https://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html
**Limitations**
This queue does not offer reliable consumption. An entry is removed from
the queue prior to being processed. So if an error occurs, the consumer
has to re-queue the item or it will be lost.
"""
from __future__ import absolute_import, unicode_literals
import os
import socket
from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str, ensure_bytes
from kombu.utils.json import dumps, loads
from . import virtual
try:
import kazoo
from kazoo.client import KazooClient
from kazoo.recipe.queue import Queue
KZ_CONNECTION_ERRORS = (
kazoo.exceptions.SystemErrorException,
kazoo.exceptions.ConnectionLossException,
kazoo.exceptions.MarshallingErrorException,
kazoo.exceptions.UnimplementedException,
kazoo.exceptions.OperationTimeoutException,
kazoo.exceptions.NoAuthException,
kazoo.exceptions.InvalidACLException,
kazoo.exceptions.AuthFailedException,
kazoo.exceptions.SessionExpiredException,
)
KZ_CHANNEL_ERRORS = (
kazoo.exceptions.RuntimeInconsistencyException,
kazoo.exceptions.DataInconsistencyException,
kazoo.exceptions.BadArgumentsException,
kazoo.exceptions.MarshallingErrorException,
kazoo.exceptions.UnimplementedException,
kazoo.exceptions.OperationTimeoutException,
kazoo.exceptions.ApiErrorException,
kazoo.exceptions.NoNodeException,
kazoo.exceptions.NoAuthException,
kazoo.exceptions.NodeExistsException,
kazoo.exceptions.NoChildrenForEphemeralsException,
kazoo.exceptions.NotEmptyException,
kazoo.exceptions.SessionExpiredException,
kazoo.exceptions.InvalidCallbackException,
socket.error,
)
except ImportError:
kazoo = None # noqa
KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = () # noqa
DEFAULT_PORT = 2181
__author__ = 'Mahendra M <mahendra.m@gmail.com>'
class Channel(virtual.Channel):
"""Zookeeper Channel."""
_client = None
_queues = {}
def __init__(self, connection, **kwargs):
super(Channel, self).__init__(connection, **kwargs)
vhost = self.connection.client.virtual_host
self._vhost = '/{}'.format(vhost.strip('/'))
def _get_path(self, queue_name):
return os.path.join(self._vhost, queue_name)
def _get_queue(self, queue_name):
queue = self._queues.get(queue_name, None)
if queue is None:
queue = Queue(self.client, self._get_path(queue_name))
self._queues[queue_name] = queue
# Ensure that the queue is created
len(queue)
return queue
def _put(self, queue, message, **kwargs):
return self._get_queue(queue).put(
ensure_bytes(dumps(message)),
priority=self._get_message_priority(message, reverse=True),
)
def _get(self, queue):
queue = self._get_queue(queue)
msg = queue.get()
if msg is None:
raise Empty()
return loads(bytes_to_str(msg))
def _purge(self, queue):
count = 0
queue = self._get_queue(queue)
while True:
msg = queue.get()
if msg is None:
break
count += 1
return count
def _delete(self, queue, *args, **kwargs):
if self._has_queue(queue):
self._purge(queue)
self.client.delete(self._get_path(queue))
def _size(self, queue):
queue = self._get_queue(queue)
return len(queue)
def _new_queue(self, queue, **kwargs):
if not self._has_queue(queue):
queue = self._get_queue(queue)
def _has_queue(self, queue):
return self.client.exists(self._get_path(queue)) is not None
def _open(self):
conninfo = self.connection.client
hosts = []
if conninfo.alt:
for host_port in conninfo.alt:
if host_port.startswith('zookeeper://'):
host_port = host_port[len('zookeeper://'):]
if not host_port:
continue
try:
host, port = host_port.split(':', 1)
host_port = (host, int(port))
except ValueError:
if host_port == conninfo.hostname:
host_port = (host_port, conninfo.port or DEFAULT_PORT)
else:
host_port = (host_port, DEFAULT_PORT)
hosts.append(host_port)
host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT)
if host_port not in hosts:
hosts.insert(0, host_port)
conn_str = ','.join(['%s:%s' % (h, p) for h, p in hosts])
conn = KazooClient(conn_str)
conn.start()
return conn
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
"""Zookeeper Transport."""
Channel = Channel
polling_interval = 1
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + KZ_CONNECTION_ERRORS
)
channel_errors = (
virtual.Transport.channel_errors + KZ_CHANNEL_ERRORS
)
driver_type = 'zookeeper'
driver_name = 'kazoo'
def __init__(self, *args, **kwargs):
if kazoo is None:
raise ImportError('The kazoo library is not installed')
super(Transport, self).__init__(*args, **kwargs)
def driver_version(self):
return kazoo.__version__
|