This file is indexed.

/usr/lib/python2.7/dist-packages/swiftclient/multithreading.py is in python-swiftclient 1:2.0.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
import sys
from time import sleep
from Queue import Queue
from threading import Thread
from traceback import format_exception

from swiftclient.exceptions import ClientException


class StopWorkerThreadSignal(object):
    pass


class QueueFunctionThread(Thread):
    """
    Calls `func`` for each item in ``queue``; ``func`` is called with a
    de-queued item as the first arg followed by ``*args`` and ``**kwargs``.

    Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.

    If the optional kwarg ``store_results`` is specified, it must be a list and
    each result of invoking ``func`` will be appended to that list.

    Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
    this thread to exit.
    """

    def __init__(self, queue, func, *args, **kwargs):
        """
        :param queue: A :class:`Queue` object from which work jobs will be
                      pulled.
        :param func: A callable which will be invoked with a dequeued item
                     followed by ``*args`` and ``**kwargs``.
        :param \*args: Optional positional arguments for ``func``.
        :param \*\*kwargs: Optional kwargs for func.  If the kwarg
                           ``store_results`` is specified, its value must be a
                           list, and every result from invoking ``func`` will
                           be appended to the supplied list.  The kwarg
                           ``store_results`` will not be passed into ``func``.
        """
        Thread.__init__(self)
        self.queue = queue
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.exc_infos = []
        self.store_results = kwargs.pop('store_results', None)

    def run(self):
        while True:
            item = self.queue.get()
            if isinstance(item, StopWorkerThreadSignal):
                break
            try:
                result = self.func(item, *self.args, **self.kwargs)
                if self.store_results is not None:
                    self.store_results.append(result)
            except Exception:
                self.exc_infos.append(sys.exc_info())


class QueueFunctionManager(object):
    """
    A context manager to handle the life-cycle of a single :class:`Queue`
    and a list of associated :class:`QueueFunctionThread` instances.

    This class is not usually instantiated directly.  Instead, call the
    :meth:`MultiThreadingManager.queue_manager` object method,
    which will return an instance of this class.

    When entering the context, ``thread_count`` :class:`QueueFunctionThread`
    instances are created and started.  The input queue is returned.  Inside
    the context, any work item put into the queue will get worked on by one of
    the :class:`QueueFunctionThread` instances.

    When the context is exited, all threads are sent a
    :class:`StopWorkerThreadSignal` instance and then all threads are waited
    upon.  Finally, any exceptions from any of the threads are reported on via
    the supplied ``thread_manager``'s :meth:`error` method.  If an
    ``error_counter`` list was supplied on instantiation, its first element is
    incremented once for every exception which occurred.
    """

    def __init__(self, func, thread_count, thread_manager, thread_args=None,
                 thread_kwargs=None, error_counter=None,
                 connection_maker=None):
        """
        :param func: The worker function which will be passed into each
                     :class:`QueueFunctionThread`'s constructor.
        :param thread_count: The number of worker threads to run.
        :param thread_manager: An instance of :class:`MultiThreadingManager`.
        :param thread_args: Optional positional arguments to be passed into
                            each invocation of ``func`` after the de-queued
                            work item.
        :param thread_kwargs: Optional keyword arguments to be passed into each
                              invocation of ``func``.  If a list is supplied as
                              the ``store_results`` keyword argument, it will
                              be filled with every result of invoking ``func``
                              in all threads.
        :param error_counter: Optional list containing one integer.  If
                              supplied, the list's first element will be
                              incremented once for each exception in any
                              thread.  This happens only when exiting the
                              context.
        :param connection_maker: Optional callable.  If supplied, this callable
                                 will be invoked once per created thread, and
                                 the result will be passed into func after the
                                 de-queued work item but before ``thread_args``
                                 and ``thread_kwargs``.  This is used to ensure
                                 each thread has its own connection to Swift.
        """
        self.func = func
        self.thread_count = thread_count
        self.thread_manager = thread_manager
        self.error_counter = error_counter
        self.connection_maker = connection_maker
        self.queue = Queue(10000)
        self.thread_list = []
        self.thread_args = thread_args if thread_args else ()
        self.thread_kwargs = thread_kwargs if thread_kwargs else {}

    def __enter__(self):
        for _junk in range(self.thread_count):
            if self.connection_maker:
                thread_args = (self.connection_maker(),) + self.thread_args
            else:
                thread_args = self.thread_args
            qf_thread = QueueFunctionThread(self.queue, self.func,
                                            *thread_args, **self.thread_kwargs)
            qf_thread.start()
            self.thread_list.append(qf_thread)
        return self.queue

    def __exit__(self, exc_type, exc_value, traceback):
        for thread in [t for t in self.thread_list if t.isAlive()]:
            self.queue.put(StopWorkerThreadSignal())

        while any(map(QueueFunctionThread.is_alive, self.thread_list)):
            sleep(0.05)

        for thread in self.thread_list:
            for info in thread.exc_infos:
                if self.error_counter:
                    self.error_counter[0] += 1
                if isinstance(info[1], ClientException):
                    self.thread_manager.error(str(info[1]))
                else:
                    self.thread_manager.error(''.join(format_exception(*info)))


class MultiThreadingManager(object):
    """
    One object to manage context for multi-threading.  This should make
    bin/swift less error-prone and allow us to test this code.

    This object is a context manager and returns itself into the context.  When
    entering the context, two printing threads are created (see below) and they
    are waited on and cleaned up when exiting the context.

    A convenience method, :meth:`queue_manager`, is provided to create a
    :class:`QueueFunctionManager` context manager (a thread-pool with an
    associated input queue for work items).

    Also, thread-safe printing to two streams is provided.  The
    :meth:`print_msg` method will print to the supplied ``print_stream``
    (defaults to ``sys.stdout``) and the :meth:`error` method will print to the
    supplied ``error_stream`` (defaults to ``sys.stderr``).  Both of these
    printing methods will format the given string with any supplied ``*args``
    (a la printf) and encode the result to utf8 if necessary.

    The attribute :attr:`self.error_count` is incremented once per error
    message printed, so an application can tell if any worker threads
    encountered exceptions or otherwise called :meth:`error` on this instance.
    The swift command-line tool uses this to exit non-zero if any error strings
    were printed.
    """

    def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
        """
        :param print_stream: The stream to which :meth:`print_msg` sends
                             formatted messages, encoded to utf8 if necessary.
        :param error_stream: The stream to which :meth:`error` sends formatted
                             messages, encoded to utf8 if necessary.
        """
        self.print_stream = print_stream
        self.printer = QueueFunctionManager(self._print, 1, self)
        self.error_stream = error_stream
        self.error_printer = QueueFunctionManager(self._print_error, 1, self)
        self.error_count = 0

    def __enter__(self):
        self.printer.__enter__()
        self.error_printer.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.error_printer.__exit__(exc_type, exc_value, traceback)
        self.printer.__exit__(exc_type, exc_value, traceback)

    def queue_manager(self, func, thread_count, *args, **kwargs):
        connection_maker = kwargs.pop('connection_maker', None)
        error_counter = kwargs.pop('error_counter', None)
        return QueueFunctionManager(func, thread_count, self, thread_args=args,
                                    thread_kwargs=kwargs,
                                    connection_maker=connection_maker,
                                    error_counter=error_counter)

    def print_msg(self, msg, *fmt_args):
        if fmt_args:
            msg = msg % fmt_args
        self.printer.queue.put(msg)

    def print_items(self, items, offset=14, skip_missing=False):
        lines = []
        template = '%%%ds: %%s' % offset
        for k, v in items:
            if skip_missing and not v:
                continue
            lines.append((template % (k, v)).rstrip())
        self.print_msg('\n'.join(lines))

    def print_headers(self, headers, meta_prefix='', exclude_headers=None,
                      offset=14):
        exclude_headers = exclude_headers or []
        meta_headers = []
        other_headers = []
        template = '%%%ds: %%s' % offset
        for key, value in headers.items():
            if key.startswith(meta_prefix):
                meta_key = 'Meta %s' % key[len(meta_prefix):].title()
                meta_headers.append(template % (meta_key, value))
            elif key not in exclude_headers:
                other_headers.append(template % (key.title(), value))
        self.print_msg('\n'.join(chain(meta_headers, other_headers)))

    def error(self, msg, *fmt_args):
        if fmt_args:
            msg = msg % fmt_args
        self.error_printer.queue.put(msg)

    def _print(self, item, stream=None):
        if stream is None:
            stream = self.print_stream
        if isinstance(item, unicode):
            item = item.encode('utf8')
        print >>stream, item

    def _print_error(self, item):
        self.error_count += 1
        return self._print(item, stream=self.error_stream)