This file is indexed.

/usr/lib/python3/dist-packages/kombu/pools.py is in python3-kombu 3.0.33-1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
kombu.pools
===========

Public resource pools.

"""
from __future__ import absolute_import

import os

from itertools import chain

from .connection import Resource
from .five import range, values
from .messaging import Producer
from .utils import EqualityDict
from .utils.functional import lazy

__all__ = ['ProducerPool', 'PoolGroup', 'register_group',
           'connections', 'producers', 'get_limit', 'set_limit', 'reset']
_limit = [200]
_used = [False]
_groups = []
use_global_limit = object()
disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION')


class ProducerPool(Resource):
    Producer = Producer

    def __init__(self, connections, *args, **kwargs):
        self.connections = connections
        self.Producer = kwargs.pop('Producer', None) or self.Producer
        super(ProducerPool, self).__init__(*args, **kwargs)

    def _acquire_connection(self):
        return self.connections.acquire(block=True)

    def create_producer(self):
        conn = self._acquire_connection()
        try:
            return self.Producer(conn)
        except BaseException:
            conn.release()
            raise

    def new(self):
        return lazy(self.create_producer)

    def setup(self):
        if self.limit:
            for _ in range(self.limit):
                self._resource.put_nowait(self.new())

    def close_resource(self, resource):
        pass

    def prepare(self, p):
        if callable(p):
            p = p()
        if p._channel is None:
            conn = self._acquire_connection()
            try:
                p.revive(conn)
            except BaseException:
                conn.release()
                raise
        return p

    def release(self, resource):
        if resource.__connection__:
            resource.__connection__.release()
        resource.channel = None
        super(ProducerPool, self).release(resource)


class PoolGroup(EqualityDict):

    def __init__(self, limit=None):
        self.limit = limit

    def create(self, resource, limit):
        raise NotImplementedError('PoolGroups must define ``create``')

    def __missing__(self, resource):
        limit = self.limit
        if limit is use_global_limit:
            limit = get_limit()
        if not _used[0]:
            _used[0] = True
        k = self[resource] = self.create(resource, limit)
        return k


def register_group(group):
    _groups.append(group)
    return group


class Connections(PoolGroup):

    def create(self, connection, limit):
        return connection.Pool(limit=limit)
connections = register_group(Connections(limit=use_global_limit))


class Producers(PoolGroup):

    def create(self, connection, limit):
        return ProducerPool(connections[connection], limit=limit)
producers = register_group(Producers(limit=use_global_limit))


def _all_pools():
    return chain(*[(values(g) if g else iter([])) for g in _groups])


def get_limit():
    return _limit[0]


def set_limit(limit, force=False, reset_after=False):
    limit = limit or 0
    glimit = _limit[0] or 0
    if limit < glimit:
        if not disable_limit_protection and (_used[0] and not force):
            raise RuntimeError("Can't lower limit after pool in use.")
        reset_after = True
    if limit != glimit:
        _limit[0] = limit
        for pool in _all_pools():
            pool.limit = limit
        if reset_after:
            reset()
    return limit


def reset(*args, **kwargs):
    for pool in _all_pools():
        try:
            pool.force_close_all()
        except Exception:
            pass
    for group in _groups:
        group.clear()
    _used[0] = False

try:
    from multiprocessing.util import register_after_fork
    register_after_fork(connections, reset)
except ImportError:  # pragma: no cover
    pass