/usr/share/pyshared/carrot/connection.py is in python-carrot 0.10.7-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | """
Getting a connection to the AMQP server.
"""
import socket
import warnings
from collections import deque
from copy import copy
from Queue import Queue, Empty as QueueEmpty
from amqplib.client_0_8.connection import AMQPConnectionException
from carrot.backends import get_backend_cls
DEFAULT_CONNECT_TIMEOUT = 5 # seconds
SETTING_PREFIX = "BROKER"
COMPAT_SETTING_PREFIX = "AMQP"
ARG_TO_DJANGO_SETTING = {
"hostname": "HOST",
"userid": "USER",
"password": "PASSWORD",
"virtual_host": "VHOST",
"port": "PORT",
}
SETTING_DEPRECATED_FMT = "Setting %s has been renamed to %s and is " \
"scheduled for removal in version 1.0."
class BrokerConnection(object):
"""A network/socket connection to an AMQP message broker.
:param hostname: see :attr:`hostname`.
:param userid: see :attr:`userid`.
:param password: see :attr:`password`.
:keyword virtual_host: see :attr:`virtual_host`.
:keyword port: see :attr:`port`.
:keyword insist: see :attr:`insist`.
:keyword connect_timeout: see :attr:`connect_timeout`.
:keyword ssl: see :attr:`ssl`.
.. attribute:: hostname
The hostname to the AMQP server
.. attribute:: userid
A valid username used to authenticate to the server.
.. attribute:: password
The password used to authenticate to the server.
.. attribute:: virtual_host
The name of the virtual host to work with. This virtual host must
exist on the server, and the user must have access to it. Consult
your brokers manual for help with creating, and mapping
users to virtual hosts.
Default is ``"/"``.
.. attribute:: port
The port of the AMQP server. Default is ``5672`` (amqp).
.. attribute:: insist
Insist on connecting to a server. In a configuration with multiple
load-sharing servers, the insist option tells the server that the
client is insisting on a connection to the specified server.
Default is ``False``.
.. attribute:: connect_timeout
The timeout in seconds before we give up connecting to the server.
The default is no timeout.
.. attribute:: ssl
Use SSL to connect to the server.
The default is ``False``.
.. attribute:: backend_cls
The messaging backend class used. Defaults to the ``pyamqplib``
backend.
"""
virtual_host = "/"
port = None
insist = False
connect_timeout = DEFAULT_CONNECT_TIMEOUT
ssl = False
_closed = True
backend_cls = None
ConnectionException = AMQPConnectionException
@property
def host(self):
"""The host as a hostname/port pair separated by colon."""
return ":".join([self.hostname, str(self.port)])
def __init__(self, hostname=None, userid=None, password=None,
virtual_host=None, port=None, pool=None, **kwargs):
self.hostname = hostname
self.userid = userid
self.password = password
self.virtual_host = virtual_host or self.virtual_host
self.port = port or self.port
self.insist = kwargs.get("insist", self.insist)
self.pool = pool
self.connect_timeout = kwargs.get("connect_timeout",
self.connect_timeout)
self.ssl = kwargs.get("ssl", self.ssl)
self.backend_cls = kwargs.get("backend_cls", None)
self._closed = None
self._connection = None
def __copy__(self):
return self.__class__(self.hostname, self.userid, self.password,
self.virtual_host, self.port,
insist=self.insist,
connect_timeout=self.connect_timeout,
ssl=self.ssl,
backend_cls=self.backend_cls,
pool=self.pool)
@property
def connection(self):
if self._closed == True:
return
if not self._connection:
self._connection = self._establish_connection()
self._closed = False
return self._connection
def __enter__(self):
return self
def __exit__(self, e_type, e_value, e_trace):
if e_type:
raise e_type(e_value)
self.close()
def _establish_connection(self):
return self.create_backend().establish_connection()
def get_backend_cls(self):
"""Get the currently used backend class."""
backend_cls = self.backend_cls
if not backend_cls or isinstance(backend_cls, basestring):
backend_cls = get_backend_cls(backend_cls)
return backend_cls
def create_backend(self):
"""Create a new instance of the current backend in
:attr:`backend_cls`."""
backend_cls = self.get_backend_cls()
return backend_cls(connection=self)
def get_channel(self):
"""Request a new AMQP channel."""
return self.connection.channel()
def connect(self):
"""Establish a connection to the AMQP server."""
self._closed = False
return self.connection
def drain_events(self, **kwargs):
return self.connection.drain_events(**kwargs)
def close(self):
"""Close the currently open connection."""
try:
if self._connection:
backend = self.create_backend()
backend.close_connection(self._connection)
except socket.error:
pass
self._closed = True
def release(self):
if not self.pool:
raise NotImplementedError(
"Trying to release connection not part of a pool")
self.pool.release(self)
# For backwards compatability.
AMQPConnection = BrokerConnection
class ConnectionLimitExceeded(Exception):
"""The maximum number of pool connections has been exceeded."""
class ConnectionPool(object):
def __init__(self, source_connection, min=2, max=None, preload=True):
self.source_connection = source_connection
self.min = min
self.max = max
self.preload = preload
self.source_connection.pool = self
self._connections = Queue()
self._dirty = deque()
self._connections.put(self.source_connection)
for i in range(min - 1):
self._connections.put_nowait(self._new_connection())
def acquire(self, block=False, timeout=None, connect_timeout=None):
try:
conn = self._connections.get(block=block, timeout=timeout)
except QueueEmpty:
conn = self._new_connection()
self._dirty.append(conn)
if connect_timeout is not None:
conn.connect_timeout = connect_timeout
return conn
def release(self, connection):
self._dirty.remove(connection)
self._connections.put_nowait(connection)
def _new_connection(self):
if len(self._dirty) >= self.max:
raise ConnectionLimitExceeded(self.max)
return copy(self.source_connection)
def get_django_conninfo(settings=None):
# FIXME can't wait to remove this mess in 1.0 [askh]
ci = {}
if settings is None:
from django.conf import settings
ci["backend_cls"] = getattr(settings, "CARROT_BACKEND", None)
for arg_name, setting_name in ARG_TO_DJANGO_SETTING.items():
setting = "%s_%s" % (SETTING_PREFIX, setting_name)
compat_setting = "%s_%s" % (COMPAT_SETTING_PREFIX, setting_name)
if hasattr(settings, setting):
ci[arg_name] = getattr(settings, setting, None)
elif hasattr(settings, compat_setting):
ci[arg_name] = getattr(settings, compat_setting, None)
warnings.warn(DeprecationWarning(SETTING_DEPRECATED_FMT % (
compat_setting, setting)))
if "hostname" not in ci:
if hasattr(settings, "AMQP_SERVER"):
ci["hostname"] = settings.AMQP_SERVER
warnings.warn(DeprecationWarning(
"AMQP_SERVER has been renamed to BROKER_HOST and is"
"scheduled for removal in version 1.0."))
return ci
class DjangoBrokerConnection(BrokerConnection):
"""A version of :class:`BrokerConnection` that takes configuration
from the Django ``settings.py`` module.
:keyword hostname: The hostname of the AMQP server to connect to,
if not provided this is taken from ``settings.BROKER_HOST``.
:keyword userid: The username of the user to authenticate to the server
as. If not provided this is taken from ``settings.BROKER_USER``.
:keyword password: The users password. If not provided this is taken
from ``settings.BROKER_PASSWORD``.
:keyword virtual_host: The name of the virtual host to work with.
This virtual host must exist on the server, and the user must
have access to it. Consult your brokers manual for help with
creating, and mapping users to virtual hosts. If not provided
this is taken from ``settings.BROKER_VHOST``.
:keyword port: The port the AMQP server is running on. If not provided
this is taken from ``settings.BROKER_PORT``, or if that is not set,
the default is ``5672`` (amqp).
"""
def __init__(self, *args, **kwargs):
settings = kwargs.pop("settings", None)
kwargs = dict(get_django_conninfo(settings), **kwargs)
super(DjangoBrokerConnection, self).__init__(*args, **kwargs)
# For backwards compatability.
DjangoAMQPConnection = DjangoBrokerConnection
|