/usr/share/pyshared/async/util.py is in python-async 0.6.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 | """Module with utilities related to async operations"""
from threading import (
Lock,
_allocate_lock,
_Condition,
_sleep,
_time,
)
from Queue import (
Empty,
)
from collections import deque
import sys
import os
#{ Routines
def cpu_count():
""":return:number of CPUs in the system
:note: inspired by multiprocessing"""
num = 0
try:
if sys.platform == 'win32':
num = int(os.environ['NUMBER_OF_PROCESSORS'])
elif 'bsd' in sys.platform or sys.platform == 'darwin':
num = int(os.popen('sysctl -n hw.ncpu').read())
else:
num = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, KeyError, OSError, AttributeError):
pass
# END exception handling
if num == 0:
raise NotImplementedError('cannot determine number of cpus')
return num
#} END routines
class DummyLock(object):
"""An object providing a do-nothing lock interface for use in sync mode"""
__slots__ = tuple()
def acquire(self):
pass
def release(self):
pass
class SyncQueue(deque):
"""Adapter to allow using a deque like a queue, without locking"""
def get(self, block=True, timeout=None):
try:
return self.popleft()
except IndexError:
raise Empty
# END raise empty
def empty(self):
return len(self) == 0
def set_writable(self, state):
pass
def writable(self):
return True
def put(self, item, block=True, timeout=None):
self.append(item)
class HSCondition(deque):
"""Cleaned up code of the original condition object in order
to make it run and respond faster."""
__slots__ = ("_lock")
delay = 0.0002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
def release(self):
self._lock.release()
def acquire(self, block=None):
if block is None:
self._lock.acquire()
else:
self._lock.acquire(block)
def wait(self, timeout=None):
waiter = _allocate_lock()
waiter.acquire() # get it the first time, no blocking
self.append(waiter)
try:
# restore state no matter what (e.g., KeyboardInterrupt)
# now we block, as we hold the lock already
# in the momemnt we release our lock, someone else might actually resume
self._lock.release()
if timeout is None:
waiter.acquire()
else:
# Balancing act: We can't afford a pure busy loop, because of the
# GIL, so we have to sleep
# We try to sleep only tiny amounts of time though to be very responsive
# NOTE: this branch is not used by the async system anyway, but
# will be hit when the user reads with timeout
endtime = _time() + timeout
delay = self.delay
acquire = waiter.acquire
while True:
gotit = acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
# this makes 4 threads working as good as two, but of course
# it causes more frequent micro-sleeping
#delay = min(delay * 2, remaining, .05)
_sleep(delay)
# END endless loop
if not gotit:
try:
self.remove(waiter)
except AttributeError:
# handle python 2.4 - actually this should be made thread-safe
# but lets see ...
try:
# lets hope we pop the right one - we don't loop over it
# yet-we just keep minimal compatability with py 2.4
item = self.pop()
if item != waiter:
self.append(item)
except IndexError:
pass
except ValueError:
pass
# END didn't ever get it
finally:
# reacquire the lock
self._lock.acquire()
# END assure release lock
def notify(self, n=1):
"""Its vital that this method is threadsafe - we absolutely have to
get a lock at the beginning of this method to be sure we get the
correct amount of waiters back. If we bail out, although a waiter
is about to be added, it will miss its wakeup notification, and block
forever (possibly)"""
self._lock.acquire()
try:
if not self: # len(self) == 0, but this should be faster
return
if n == 1:
try:
self.popleft().release()
except IndexError:
pass
else:
for i in range(min(n, len(self))):
self.popleft().release()
# END for each waiter to resume
# END handle n = 1 case faster
finally:
self._lock.release()
# END assure lock is released
def notify_all(self):
self.notify(len(self))
class ReadOnly(Exception):
"""Thrown when trying to write to a read-only queue"""
class AsyncQueue(deque):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
that there is nothing more to get here.
All default-queue code was cleaned up for performance."""
__slots__ = ('mutex', 'not_empty', '_writable')
def __init__(self, maxsize=0):
self.mutex = Lock()
self.not_empty = HSCondition(self.mutex)
self._writable = True
def qsize(self):
self.mutex.acquire()
try:
return len(self)
finally:
self.mutex.release()
def writable(self):
self.mutex.acquire()
try:
return self._writable
finally:
self.mutex.release()
def set_writable(self, state):
"""Set the writable flag of this queue to True or False
:return: The previous state"""
self.mutex.acquire()
try:
old = self._writable
self._writable = state
return old
finally:
self.mutex.release()
# if we won't receive anymore items, inform the getters
if not state:
self.not_empty.notify_all()
# END tell everyone
# END handle locking
def empty(self):
self.mutex.acquire()
try:
return not len(self)
finally:
self.mutex.release()
def put(self, item, block=True, timeout=None):
self.mutex.acquire()
# NOTE: we explicitly do NOT check for our writable state
# Its just used as a notification signal, and we need to be able
# to continue writing to prevent threads ( easily ) from failing
# to write their computed results, which we want in fact
# NO: we want them to fail and stop processing, as the one who caused
# the channel to close had a reason and wants the threads to
# stop on the task as soon as possible
if not self._writable:
self.mutex.release()
raise ReadOnly
# END handle read-only
self.append(item)
self.mutex.release()
self.not_empty.notify()
def get(self, block=True, timeout=None):
self.mutex.acquire()
try:
if block:
if timeout is None:
while not len(self) and self._writable:
self.not_empty.wait()
else:
endtime = _time() + timeout
while not len(self) and self._writable:
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
# END handle timeout mode
# END handle block
# can throw if we woke up because we are not writable anymore
try:
return self.popleft()
except IndexError:
raise Empty
# END handle unblocking reason
finally:
self.mutex.release()
# END assure lock is released
#} END utilities
|