This file is indexed.

/usr/lib/python2.7/dist-packages/pycassa/pool.py is in python-pycassa 1.11.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
""" Connection pooling for Cassandra connections. """

from __future__ import with_statement

import time
import threading
import random
import socket
import sys

if 'gevent.monkey' in sys.modules:
    from gevent import queue as Queue
else:
    import Queue  # noqa

from thrift import Thrift
from thrift.transport.TTransport import TTransportException
from connection import (Connection, default_socket_factory,
        default_transport_factory)
from logging.pool_logger import PoolLogger
from util import as_interface
from cassandra.ttypes import TimedOutException, UnavailableException

_BASE_BACKOFF = 0.01

__all__ = ['QueuePool', 'ConnectionPool', 'PoolListener',
           'ConnectionWrapper', 'AllServersUnavailable',
           'MaximumRetryException', 'NoConnectionAvailable',
           'InvalidRequestError']

class ConnectionWrapper(Connection):
    """
    Creates a wrapper for a :class:`~.pycassa.connection.Connection`
    object, adding pooling related functionality while still allowing
    access to the thrift API calls.

    These should not be created directly, only obtained through
    Pool's :meth:`~.ConnectionPool.get()` method.
    """

    # These mark the state of the connection so that we can
    # check to see that they are not returned, checked out,
    # or disposed twice (or from the wrong state).
    _IN_QUEUE = 0
    _CHECKED_OUT = 1
    _DISPOSED = 2

    def __init__(self, pool, max_retries, *args, **kwargs):
        self._pool = pool
        self._retry_count = 0
        self.max_retries = max_retries
        self.info = {}
        self.starttime = time.time()
        self.operation_count = 0
        self._state = ConnectionWrapper._CHECKED_OUT
        Connection.__init__(self, *args, **kwargs)
        self._pool._notify_on_connect(self)

        # For testing purposes only
        self._should_fail = False
        self._original_meth = self.send_batch_mutate

    def return_to_pool(self):
        """
        Returns this to the pool.

        This has the same effect as calling :meth:`ConnectionPool.put()`
        on the wrapper.

        """
        self._pool.put(self)

    def _checkin(self):
        if self._state == ConnectionWrapper._IN_QUEUE:
            raise InvalidRequestError("A connection has been returned to "
                    "the connection pool twice.")
        elif self._state == ConnectionWrapper._DISPOSED:
            raise InvalidRequestError("A disposed connection has been returned "
                    "to the connection pool.")
        self._state = ConnectionWrapper._IN_QUEUE

    def _checkout(self):
        if self._state != ConnectionWrapper._IN_QUEUE:
            raise InvalidRequestError("A connection has been checked "
                    "out twice.")
        self._state = ConnectionWrapper._CHECKED_OUT

    def _is_in_queue_or_disposed(self):
        ret = self._state == ConnectionWrapper._IN_QUEUE or \
              self._state == ConnectionWrapper._DISPOSED
        return ret

    def _dispose_wrapper(self, reason=None):
        if self._state == ConnectionWrapper._DISPOSED:
            raise InvalidRequestError("A connection has been disposed twice.")
        self._state = ConnectionWrapper._DISPOSED

        self.close()
        self._pool._notify_on_dispose(self, msg=reason)

    def _replace(self, new_conn_wrapper):
        """
        Get another wrapper from the pool and replace our own contents
        with its contents.

        """
        self.server = new_conn_wrapper.server
        self.transport = new_conn_wrapper.transport
        self._iprot = new_conn_wrapper._iprot
        self._oprot = new_conn_wrapper._oprot
        self.info = new_conn_wrapper.info
        self.starttime = new_conn_wrapper.starttime
        self.operation_count = new_conn_wrapper.operation_count
        self._state = ConnectionWrapper._CHECKED_OUT
        self._should_fail = new_conn_wrapper._should_fail

    @classmethod
    def _retry(cls, f):
        def new_f(self, *args, **kwargs):
            self.operation_count += 1
            self.info['request'] = {'method': f.__name__, 'args': args, 'kwargs': kwargs}
            try:
                allow_retries = kwargs.pop('allow_retries', True)
                if kwargs.pop('reset', False):
                    self._pool._replace_wrapper() # puts a new wrapper in the queue
                    self._replace(self._pool.get()) # swaps out transport
                result = f(self, *args, **kwargs)
                self._retry_count = 0 # reset the count after a success
                return result
            except Thrift.TApplicationException:
                self.close()
                self._pool._decrement_overflow()
                self._pool._clear_current()
                raise
            except (TimedOutException, UnavailableException,
                    TTransportException,
                    socket.error, IOError, EOFError), exc:
                self._pool._notify_on_failure(exc, server=self.server, connection=self)

                self.close()
                self._pool._decrement_overflow()
                self._pool._clear_current()

                self._retry_count += 1
                if (not allow_retries or
                    (self.max_retries != -1 and self._retry_count > self.max_retries)):
                    raise MaximumRetryException('Retried %d times. Last failure was %s: %s' %
                                                (self._retry_count, exc.__class__.__name__, exc))
                # Exponential backoff
                time.sleep(_BASE_BACKOFF * (2 ** self._retry_count))

                kwargs['reset'] = True
                return new_f(self, *args, **kwargs)

        new_f.__name__ = f.__name__
        return new_f

    def _fail_once(self, *args, **kwargs):
        if self._should_fail:
            self._should_fail = False
            raise TimedOutException
        else:
            return self._original_meth(*args, **kwargs)

    def get_keyspace_description(self, keyspace=None, use_dict_for_col_metadata=False):
        """
        Describes the given keyspace.

        If `use_dict_for_col_metadata` is ``True``, the column metadata will be stored
        as a dictionary instead of a list

        A dictionary of the form ``{column_family_name: CfDef}`` is returned.

        """
        if keyspace is None:
            keyspace = self.keyspace

        ks_def = self.describe_keyspace(keyspace)
        cf_defs = dict()
        for cf_def in ks_def.cf_defs:
            cf_defs[cf_def.name] = cf_def
            if use_dict_for_col_metadata:
                old_metadata = cf_def.column_metadata
                new_metadata = dict()
                for datum in old_metadata:
                    new_metadata[datum.name] = datum
                cf_def.column_metadata = new_metadata
        return cf_defs

    def __str__(self):
        return "<ConnectionWrapper %s@%s>" % (self.keyspace, self.server)

retryable = ('get', 'get_slice', 'multiget_slice', 'get_count', 'multiget_count',
             'get_range_slices', 'get_indexed_slices', 'batch_mutate', 'add',
             'insert', 'remove', 'remove_counter', 'truncate', 'describe_keyspace',
             'atomic_batch_mutate')
for fname in retryable:
    new_f = ConnectionWrapper._retry(getattr(Connection, fname))
    setattr(ConnectionWrapper, fname, new_f)

class ConnectionPool(object):
    """A pool that maintains a queue of open connections."""

    _max_overflow = 0

    def _get_max_overflow(self):
        return self._max_overflow

    def _set_max_overflow(self, max_overflow):
        with self._pool_lock:
            self._max_overflow = max_overflow
            self._overflow_enabled = max_overflow > 0 or max_overflow == -1
            if max_overflow == -1:
                self._max_conns = (2 ** 31) - 1
            else:
                self._max_conns = self._pool_size + max_overflow

    max_overflow = property(_get_max_overflow, _set_max_overflow)
    """ Whether or not a new connection may be opened when the
    pool is empty is controlled by `max_overflow`.  This specifies how many
    additional connections may be opened after the pool has reached `pool_size`;
    keep in mind that these extra connections will be discarded upon checkin
    until the pool is below `pool_size`.  This may be set to -1 to indicate no
    overflow limit. The default value is 0, which does not allow for overflow. """

    pool_timeout = 30
    """ If ``pool_size + max_overflow`` connections have already been checked
    out, an attempt to retrieve a new connection from the pool will wait
    up to `pool_timeout` seconds for a connection to be returned to the
    pool before giving up. Note that this setting is only meaningful when you
    are accessing the pool concurrently, such as with multiple threads.
    This may be set to 0 to fail immediately or -1 to wait forever.
    The default value is 30. """

    recycle = 10000
    """ After performing `recycle` number of operations, connections will
    be replaced when checked back in to the pool.  This may be set to
    -1 to disable connection recycling. The default value is 10,000. """

    max_retries = 5
    """ When an operation on a connection fails due to an :exc:`~.TimedOutException`
    or :exc:`~.UnavailableException`, which tend to indicate single or
    multiple node failure, the operation will be retried on different nodes
    up to `max_retries` times before an :exc:`~.MaximumRetryException` is raised.
    Setting this to 0 disables retries and setting to -1 allows unlimited retries.
    The default value is 5. """

    logging_name = None
    """ By default, each pool identifies itself in the logs using ``id(self)``.
    If multiple pools are in use for different purposes, setting `logging_name` will
    help individual pools to be identified in the logs. """

    socket_factory = default_socket_factory
    """ A function that creates the socket for each connection in the pool.
    This function should take two arguments: `host`, the host the connection is
    being made to, and `port`, the destination port.

    By default, this is function is :func:`~connection.default_socket_factory`.
    """

    transport_factory = default_transport_factory
    """ A function that creates the transport for each connection in the pool.
    This function should take three arguments: `tsocket`, a TSocket object for the
    transport, `host`, the host the connection is being made to, and `port`,
    the destination port.

    By default, this is function is :func:`~connection.default_transport_factory`.
    """

    def __init__(self, keyspace,
                 server_list=['localhost:9160'],
                 credentials=None,
                 timeout=0.5,
                 use_threadlocal=True,
                 pool_size=5,
                 prefill=True,
                 socket_factory=default_socket_factory,
                 transport_factory=default_transport_factory,
                 **kwargs):
        """
        All connections in the pool will be opened to `keyspace`.

        `server_list` is a sequence of servers in the form ``"host:port"`` that
        the pool will connect to. The port defaults to 9160 if excluded.
        The list will be randomly shuffled before being drawn from sequentially.
        `server_list` may also be a function that returns the sequence of servers.

        If authentication or authorization is required, `credentials` must
        be supplied.  This should be a dictionary containing 'username' and
        'password' keys with appropriate string values.

        `timeout` specifies in seconds how long individual connections will
        block before timing out. If set to ``None``, connections will never
        timeout.

        If `use_threadlocal` is set to ``True``, repeated calls to
        :meth:`get()` within the same application thread will
        return the same :class:`ConnectionWrapper` object if one is
        already checked out from the pool.  Be careful when setting `use_threadlocal`
        to ``False`` in a multithreaded application, especially with retries enabled.
        Synchronization may be required to prevent the connection from changing while
        another thread is using it.

        The pool will keep up to `pool_size` open connections in the pool
        at any time.  When a connection is returned to the pool, the
        connection will be discarded if the pool already contains `pool_size`
        connections.  The total number of simultaneous connections the pool will
        allow is ``pool_size + max_overflow``,
        and the number of "sleeping" connections the pool will allow is ``pool_size``.

        A good choice for `pool_size` is a multiple of the number of servers
        passed to the Pool constructor.  If a size less than this is chosen,
        the last ``(len(server_list) - pool_size)`` servers may not be used until
        either overflow occurs, a connection is recycled, or a connection
        fails. Similarly, if a multiple of ``len(server_list)`` is not chosen,
        those same servers would have a decreased load. By default, overflow
        is disabled.

        If `prefill` is set to ``True``, `pool_size` connections will be opened
        when the pool is created.

        Example Usage:

        .. code-block:: python

            >>> pool = pycassa.ConnectionPool(keyspace='Keyspace1', server_list=['10.0.0.4:9160', '10.0.0.5:9160'], prefill=False)
            >>> cf = pycassa.ColumnFamily(pool, 'Standard1')
            >>> cf.insert('key', {'col': 'val'})
            1287785685530679

        """

        self._pool_threadlocal = use_threadlocal
        self.keyspace = keyspace
        self.credentials = credentials
        self.timeout = timeout
        self.socket_factory = socket_factory
        self.transport_factory = transport_factory
        if use_threadlocal:
            self._tlocal = threading.local()

        self._pool_size = pool_size
        self._q = Queue.Queue(pool_size)
        self._pool_lock = threading.Lock()
        self._current_conns = 0

        # Listener groups
        self.listeners = []
        self._on_connect = []
        self._on_checkout = []
        self._on_checkin = []
        self._on_dispose = []
        self._on_recycle = []
        self._on_failure = []
        self._on_server_list = []
        self._on_pool_dispose = []
        self._on_pool_max = []

        self.add_listener(PoolLogger())

        if "listeners" in kwargs:
            listeners = kwargs["listeners"]
            for l in listeners:
                self.add_listener(l)

        self.logging_name = kwargs.get("logging_name", None)
        if not self.logging_name:
            self.logging_name = id(self)

        if "max_overflow" not in kwargs:
            self._set_max_overflow(0)

        recognized_kwargs = ["pool_timeout", "recycle", "max_retries", "max_overflow"]
        for kw in recognized_kwargs:
            if kw in kwargs:
                setattr(self, kw, kwargs[kw])

        self.set_server_list(server_list)

        self._prefill = prefill
        if self._prefill:
            self.fill()

    def set_server_list(self, server_list):
        """
        Sets the server list that the pool will make connections to.

        `server_list` should be sequence of servers in the form ``"host:port"`` that
        the pool will connect to.  The list will be randomly permuted before
        being used. `server_list` may also be a function that returns the
        sequence of servers.
        """
        if callable(server_list):
            self.server_list = list(server_list())
        else:
            self.server_list = list(server_list)

        random.shuffle(self.server_list)
        self._list_position = 0
        self._notify_on_server_list(self.server_list)

    def _get_next_server(self):
        """
        Gets the next 'localhost:port' combination from the list of
        servers and increments the position. This is not thread-safe,
        but client-side load-balancing isn't so important that this is
        a problem.
        """
        if self._list_position >= len(self.server_list):
            self._list_position = 0
        server = self.server_list[self._list_position]
        self._list_position += 1
        return server

    def _create_connection(self):
        """Creates a ConnectionWrapper, which opens a
        pycassa.connection.Connection."""
        if not self.server_list:
            raise AllServersUnavailable('Cannot connect to any servers as server list is empty!')
        failure_count = 0
        while failure_count < 2 * len(self.server_list):
            try:
                server = self._get_next_server()
                wrapper = self._get_new_wrapper(server)
                return wrapper
            except (TTransportException, socket.error, IOError, EOFError), exc:
                self._notify_on_failure(exc, server)
                failure_count += 1
        raise AllServersUnavailable('An attempt was made to connect to each of the servers ' +
                                    'twice, but none of the attempts succeeded. The last failure was %s: %s' %
                                    (exc.__class__.__name__, exc))

    def fill(self):
        """
        Adds connections to the pool until at least ``pool_size`` connections
        exist, whether they are currently checked out from the pool or not.

        .. versionadded:: 1.2.0
        """
        with self._pool_lock:
            while self._current_conns < self._pool_size:
                conn = self._create_connection()
                conn._checkin()
                self._q.put(conn, False)
                self._current_conns += 1

    def _get_new_wrapper(self, server):
        return ConnectionWrapper(self, self.max_retries,
                                 self.keyspace, server,
                                 timeout=self.timeout,
                                 credentials=self.credentials,
                                 socket_factory=self.socket_factory,
                                 transport_factory=self.transport_factory)

    def _replace_wrapper(self):
        """Try to replace the connection."""
        if not self._q.full():
            conn = self._create_connection()
            conn._checkin()

            try:
                self._q.put(conn, False)
            except Queue.Full:
                conn._dispose_wrapper(reason="pool is already full")
            else:
                with self._pool_lock:
                    self._current_conns += 1

    def _clear_current(self):
        """ If using threadlocal, clear our threadlocal current conn. """
        if self._pool_threadlocal:
            self._tlocal.current = None

    def put(self, conn):
        """ Returns a connection to the pool. """
        if not conn.transport.isOpen():
            return

        if self._pool_threadlocal:
            if hasattr(self._tlocal, 'current') and self._tlocal.current:
                conn = self._tlocal.current
                self._tlocal.current = None
            else:
                conn = None
        if conn:
            conn._retry_count = 0
            if conn._is_in_queue_or_disposed():
                raise InvalidRequestError("Connection was already checked in or disposed")

            if self.recycle > -1 and conn.operation_count > self.recycle:
                new_conn = self._create_connection()
                self._notify_on_recycle(conn, new_conn)
                conn._dispose_wrapper(reason="recyling connection")
                conn = new_conn
            conn._checkin()
            self._notify_on_checkin(conn)

            try:
                self._q.put_nowait(conn)
            except Queue.Full:
                conn._dispose_wrapper(reason="pool is already full")
                self._decrement_overflow()
    return_conn = put

    def _decrement_overflow(self):
        with self._pool_lock:
            self._current_conns -= 1

    def _new_if_required(self, max_conns, check_empty_queue=False):
        """ Creates new connection if there is room """
        with self._pool_lock:
            if (not check_empty_queue or self._q.empty()) and self._current_conns < max_conns:
                new_conn = True
                self._current_conns += 1
            else:
                new_conn = False

        if new_conn:
            try:
                return self._create_connection()
            except:
                with self._pool_lock:
                    self._current_conns -= 1
                raise
        return None

    def get(self):
        """ Gets a connection from the pool. """
        conn = None
        if self._pool_threadlocal:
            try:
                if self._tlocal.current:
                    conn = self._tlocal.current
                if conn:
                    return conn
            except AttributeError:
                pass

        conn = self._new_if_required(self._pool_size)
        if not conn:
            # if queue is empty and max_overflow is not reached, create new conn
            conn = self._new_if_required(self._max_conns, check_empty_queue=True)

        if not conn:
            # We will have to fetch from the queue, and maybe block
            timeout = self.pool_timeout
            if timeout == -1:
                timeout = None

            try:
                conn = self._q.get(timeout=timeout)
            except Queue.Empty:
                self._notify_on_pool_max(pool_max=self._max_conns)
                size_msg = "size %d" % (self._pool_size, )
                if self._overflow_enabled:
                    size_msg += "overflow %d" % (self._max_overflow)
                message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
                          % (size_msg, self.pool_timeout)
                raise NoConnectionAvailable(message)
            else:
                conn._checkout()

        if self._pool_threadlocal:
            self._tlocal.current = conn
        self._notify_on_checkout(conn)
        return conn

    def execute(self, f, *args, **kwargs):
        """
        Get a connection from the pool, execute
        `f` on it with `*args` and `**kwargs`, return the
        connection to the pool, and return the result of `f`.
        """
        conn = None
        try:
            conn = self.get()
            return getattr(conn, f)(*args, **kwargs)
        finally:
            if conn:
                conn.return_to_pool()

    def dispose(self):
        """ Closes all checked in connections in the pool. """
        while True:
            try:
                conn = self._q.get(False)
                conn._dispose_wrapper(
                        reason="Pool %s is being disposed" % id(self))
                self._decrement_overflow()
            except Queue.Empty:
                break

        self._notify_on_pool_dispose()

    def size(self):
        """ Returns the capacity of the pool. """
        return self._pool_size

    def checkedin(self):
        """ Returns the number of connections currently in the pool. """
        return self._q.qsize()

    def overflow(self):
        """ Returns the number of overflow connections that are currently open. """
        return max(self._current_conns - self._pool_size, 0)

    def checkedout(self):
        """ Returns the number of connections currently checked out from the pool. """
        return self._current_conns - self.checkedin()

    def add_listener(self, listener):
        """
        Add a :class:`PoolListener`-like object to this pool.

        `listener` may be an object that implements some or all of
        :class:`PoolListener`, or a dictionary of callables containing implementations
        of some or all of the named methods in :class:`PoolListener`.

        """

        listener = as_interface(listener,
            methods=('connection_created', 'connection_checked_out',
                     'connection_checked_in', 'connection_disposed',
                     'connection_recycled', 'connection_failed',
                     'obtained_server_list', 'pool_disposed',
                     'pool_at_max'))

        self.listeners.append(listener)
        if hasattr(listener, 'connection_created'):
            self._on_connect.append(listener)
        if hasattr(listener, 'connection_checked_out'):
            self._on_checkout.append(listener)
        if hasattr(listener, 'connection_checked_in'):
            self._on_checkin.append(listener)
        if hasattr(listener, 'connection_disposed'):
            self._on_dispose.append(listener)
        if hasattr(listener, 'connection_recycled'):
            self._on_recycle.append(listener)
        if hasattr(listener, 'connection_failed'):
            self._on_failure.append(listener)
        if hasattr(listener, 'obtained_server_list'):
            self._on_server_list.append(listener)
        if hasattr(listener, 'pool_disposed'):
            self._on_pool_dispose.append(listener)
        if hasattr(listener, 'pool_at_max'):
            self._on_pool_max.append(listener)

    def _notify_on_pool_dispose(self):
        if self._on_pool_dispose:
            dic = {'pool_id': self.logging_name,
                   'level': 'info'}
            for l in self._on_pool_dispose:
                l.pool_disposed(dic)

    def _notify_on_pool_max(self, pool_max):
        if self._on_pool_max:
            dic = {'pool_id': self.logging_name,
                   'level': 'info',
                   'pool_max': pool_max}
            for l in self._on_pool_max:
                l.pool_at_max(dic)

    def _notify_on_dispose(self, conn_record, msg=""):
        if self._on_dispose:
            dic = {'pool_id': self.logging_name,
                   'level': 'debug',
                   'connection': conn_record}
            if msg:
                dic['message'] = msg
            for l in self._on_dispose:
                l.connection_disposed(dic)

    def _notify_on_server_list(self, server_list):
        dic = {'pool_id': self.logging_name,
               'level': 'debug',
               'server_list': server_list}
        if self._on_server_list:
            for l in self._on_server_list:
                l.obtained_server_list(dic)

    def _notify_on_recycle(self, old_conn, new_conn):
        if self._on_recycle:
            dic = {'pool_id': self.logging_name,
                   'level': 'debug',
                   'old_conn': old_conn,
                   'new_conn': new_conn}
        for l in self._on_recycle:
            l.connection_recycled(dic)

    def _notify_on_connect(self, conn_record, msg="", error=None):
        if self._on_connect:
            dic = {'pool_id': self.logging_name,
                   'level': 'debug',
                   'connection': conn_record}
            if msg:
                dic['message'] = msg
            if error:
                dic['error'] = error
                dic['level'] = 'warn'
            for l in self._on_connect:
                l.connection_created(dic)

    def _notify_on_checkin(self, conn_record):
        if self._on_checkin:
            dic = {'pool_id': self.logging_name,
                   'level': 'debug',
                   'connection': conn_record}
            for l in self._on_checkin:
                l.connection_checked_in(dic)

    def _notify_on_checkout(self, conn_record):
        if self._on_checkout:
            dic = {'pool_id': self.logging_name,
                   'level': 'debug',
                   'connection': conn_record}
            for l in self._on_checkout:
                l.connection_checked_out(dic)

    def _notify_on_failure(self, error, server, connection=None):
        if self._on_failure:
            dic = {'pool_id': self.logging_name,
                   'level': 'info',
                   'error': error,
                   'server': server,
                   'connection': connection}
            for l in self._on_failure:
                l.connection_failed(dic)

QueuePool = ConnectionPool

class PoolListener(object):
    """Hooks into the lifecycle of connections in a :class:`ConnectionPool`.

    Usage::

        class MyListener(PoolListener):
            def connection_created(self, dic):
                '''perform connect operations'''
            # etc.

        # create a new pool with a listener
        p = ConnectionPool(..., listeners=[MyListener()])

        # or add a listener after the fact
        p.add_listener(MyListener())

    Listeners receive a dictionary that contains event information and
    is indexed by a string describing that piece of info.  For example,
    all event dictionaries include 'level', so dic['level'] will return
    the prescribed logging level.

    There is no need to subclass :class:`PoolListener` to handle events.
    Any class that implements one or more of these methods can be used
    as a pool listener.  The :class:`ConnectionPool` will inspect the methods
    provided by a listener object and add the listener to one or more
    internal event queues based on its capabilities.  In terms of
    efficiency and function call overhead, you're much better off only
    providing implementations for the hooks you'll be using.

    Each of the :class:`PoolListener` methods wil be called with a
    :class:`dict` as the single parameter. This :class:`dict` may
    contain the following fields:

        * `connection`: The :class:`ConnectionWrapper` object that persistently
          manages the connection

        * `message`: The reason this event happened

        * `error`: The :class:`Exception` that caused this event

        * `pool_id`: The id of the :class:`ConnectionPool` that this event came from

        * `level`: The prescribed logging level for this event.  Can be 'debug', 'info',
          'warn', 'error', or 'critical'

    Entries in the :class:`dict` that are specific to only one event type are
    detailed with each method.


    """

    def connection_created(self, dic):
        """Called once for each new Cassandra connection.

        Fields: `pool_id`, `level`, and `connection`.
        """

    def connection_checked_out(self, dic):
        """Called when a connection is retrieved from the Pool.

        Fields: `pool_id`, `level`, and `connection`.
        """

    def connection_checked_in(self, dic):
        """Called when a connection returns to the pool.

        Fields: `pool_id`, `level`, and `connection`.
        """

    def connection_disposed(self, dic):
        """Called when a connection is closed.

        ``dic['message']``: A reason for closing the connection, if any.

        Fields: `pool_id`, `level`, `connection`, and `message`.
        """

    def connection_recycled(self, dic):
        """Called when a connection is recycled.

        ``dic['old_conn']``: The :class:`ConnectionWrapper` that is being recycled

        ``dic['new_conn']``: The :class:`ConnectionWrapper` that is replacing it

        Fields: `pool_id`, `level`, `old_conn`, and `new_conn`.
        """

    def connection_failed(self, dic):
        """Called when a connection to a single server fails.

        ``dic['server']``: The server the connection was made to.

        Fields: `pool_id`, `level`, `error`, `server`, and `connection`.
        """
    def server_list_obtained(self, dic):
        """Called when the pool finalizes its server list.

        ``dic['server_list']``: The randomly permuted list of servers that the
        pool will choose from.

        Fields: `pool_id`, `level`, and `server_list`.
        """

    def pool_disposed(self, dic):
        """Called when a pool is disposed.

        Fields: `pool_id`, and `level`.
        """

    def pool_at_max(self, dic):
        """
        Called when an attempt is made to get a new connection from the
        pool, but the pool is already at its max size.

        ``dic['pool_max']``: The max number of connections the pool will
        keep open at one time.

        Fields: `pool_id`, `pool_max`, and `level`.
        """


class AllServersUnavailable(Exception):
    """Raised when none of the servers given to a pool can be connected to."""

class NoConnectionAvailable(Exception):
    """Raised when there are no connections left in a pool."""

class MaximumRetryException(Exception):
    """
    Raised when a :class:`ConnectionWrapper` has retried the maximum
    allowed times before being returned to the pool; note that all of
    the retries do not have to be on the same operation.
    """

class InvalidRequestError(Exception):
    """
    Pycassa was asked to do something it can't do.

    This error generally corresponds to runtime state errors.
    """