/usr/share/pyshared/dtest/strategy.py is in python-dtest 0.4.0-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | # Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
==========================
Parallelization Strategies
==========================
This module contains all the classes necessary for identifying
parallelization strategies. A parallelization strategy provides
support for alternate modes of parallelizing multiple-result tests,
i.e., tests on which @repeat() has been used or which are generators
providing lists of other test functions to execute. This module
contains SerialStrategy, UnlimitedParallelStrategy, and
LimitedParallelStrategy.
"""
import dtest
from eventlet import spawn_n
from eventlet.event import Event
from eventlet.semaphore import Semaphore
class SerialStrategy(object):
"""
SerialStrategy
==============
The SerialStrategy class is a parallelization strategy that causes
spawned tests to be executed serially, one after another.
"""
def prepare(self):
"""
Prepares the SerialStrategy object to spawn a set of tests.
Since SerialStrategy "spawns" tests by running them
synchronously, this function is a no-op.
"""
pass
def spawn(self, call, *args, **kwargs):
"""
Spawn a function. The callable ``call`` will be executed with
the provided positional and keyword arguments. Since
SerialStrategy "spawns" tests by running them synchronously,
this function simply calls ``call`` directly.
"""
call(*args, **kwargs)
def wait(self):
"""
Waits for spawned tests to complete. Since SerialStrategy
"spawns" tests by running them synchronously, this function is
a no-op.
"""
pass
class UnlimitedParallelStrategy(object):
"""
UnlimitedParallelStrategy
=========================
The UnlimitedParallelStrategy class is a parallelization strategy
that causes spawned tests to be executed in parallel, with no
limit on the maximum number of tests that can be executing at one
time.
"""
def prepare(self):
"""
Prepares the UnlimitedParallelStrategy object to spawn a set
of tests. Simply initializes a counter to zero and sets up an
event to be signaled when all tests are done.
"""
# Initialize the counter and the event
self.count = 0
self.lock = Semaphore()
self.event = None
# Save the output and test for the status stream
self.output = dtest.status.output
self.test = dtest.status.test
def spawn(self, call, *args, **kwargs):
"""
Spawn a function. The callable ``call`` will be executed with
the provided positional and keyword arguments. The ``call``
will be executed in a separate thread.
"""
# Spawn our internal function in a separate thread
self.count += 1
spawn_n(self._spawn, call, args, kwargs)
def _spawn(self, call, args, kwargs):
"""
Executes ``call`` in a separate thread of control. This
helper method maintains the count and arranges for the event
to be signaled when appropriate.
"""
# Initialize the status stream
dtest.status.setup(self.output, self.test)
# Call the call
call(*args, **kwargs)
# Decrement the count
self.count -= 1
# Signal the event, if necessary
with self.lock:
if self.count == 0 and self.event is not None:
self.event.send()
def wait(self):
"""
Waits for spawned tests to complete.
"""
# Check for completion...
with self.lock:
if self.count == 0:
# No tests still going, so just return
return
# OK, let's initialize the event...
self.event = Event()
# Now we wait on the event
self.event.wait()
# End by clearing the event
self.event = None
class LimitedParallelStrategy(UnlimitedParallelStrategy):
"""
LimitedParallelStrategy
=======================
The LimitedParallelStrategy class is an extension of the
UnlimitedParallelStrategy that additionally limits the maximum
number of threads that may be executing at any given time.
"""
def __init__(self, limit):
"""
Initializes a LimitedParallelStrategy object. The ``limit``
parameter specifies the maximum number of threads that may
execute at any given time.
"""
# Save the limit
self.limit = limit
def prepare(self):
"""
Prepares the LimitedParallelStrategy to spawn a set of tests.
In addition to the tasks performed by
UnlimitedParallelStrategy.prepare(), sets up a semaphore to
limit the maximum number of threads that may execute at once.
"""
# Call our superclass prepare method
super(LimitedParallelStrategy, self).prepare()
# Also initialize a limiting semaphore
self.limit_sem = Semaphore(self.limit)
def _spawn(self, call, args, kwargs):
"""
Executes ``call`` in a separate thread of control. This
helper method extends UnlimitedParallelStrategy._spawn() to
acquire the limiting semaphore prior to executing the call.
"""
# Call our superclass _spawn method with the limit semaphore
with self.limit_sem:
super(LimitedParallelStrategy, self)._spawn(call, args, kwargs)
|