/usr/lib/python2.7/dist-packages/scoop/shared.py is in python-scoop 0.7.1.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | #!/usr/bin/env python
#
# This file is part of Scalable COncurrent Operations in Python (SCOOP).
#
# SCOOP is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# SCOOP is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with SCOOP. If not, see <http://www.gnu.org/licenses/>.
#
import itertools
from inspect import ismethod
from functools import reduce
import time
from . import encapsulation, utils
import scoop
from .fallbacks import ensureScoopStartedProperly, NotStartedProperly
elements = None
def _ensureAtomicity(fn):
"""Ensure atomicity of passed elements on the whole worker pool"""
@ensureScoopStartedProperly
def wrapper(*args, **kwargs):
"""setConst(**kwargs)
Set a constant that will be shared to every workers.
This call blocks until the constant has propagated to at least one
worker.
:param \*\*kwargs: One or more combination(s) key=value. Key being the
variable name and value the object to share.
:returns: None.
Usage: setConst(name=value)
"""
# Note that the docstring is the one of setConst.
# This is because of the documentation framework (sphinx) limitations.
from . import _control
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
for key, value in kwargs.items():
# Object name existence check
if key in itertools.chain(*(elem.keys() for elem in elements.values())):
raise TypeError("This constant already exists: {0}.".format(key))
# Retry element propagation until it is returned
while all(key in elements.get(scoop.worker, []) for key in kwargs.keys()) is not True:
scoop.logger.debug("Sending global variables {0}...".format(
list(kwargs.keys())
))
# Call the function
fn(*args, **kwargs)
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
# TODO: Make previous blocking instead of sleep
time.sleep(0.1)
# Atomicity check
elementNames = list(itertools.chain(*(elem.keys() for elem in elements.values())))
if len(elementNames) != len(set(elementNames)):
raise TypeError("This constant already exists: {0}.".format(key))
return wrapper
@_ensureAtomicity
def setConst(**kwargs):
"""setConst(**kwargs)
Set a constant that will be shared to every workers.
:param **kwargs: One or more combination(s) key=value. Key being the
variable name and value the object to share.
:returns: None.
Usage: setConst(name=value)
"""
from . import _control
sendVariable = _control.execQueue.socket.sendVariable
for key, value in kwargs.items():
# Propagate the constant
# for file-like objects, see encapsulation.py where copyreg was
# used to overload standard pickling.
if callable(value):
sendVariable(key, encapsulation.FunctionEncapsulation(value, key))
else:
sendVariable(key, value)
def getConst(name, timeout=0.1):
"""Get a shared constant.
:param name: The name of the shared variable to retrieve.
:param timeout: The maximum time to wait in seconds for the propagation of
the constant.
:returns: The shared object.
Usage: value = getConst('name')
"""
from . import _control
import time
timeStamp = time.time()
while True:
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
# Constants concatenation
constants = dict(reduce(
lambda x, y: x + list(y.items()),
elements.values(),
[]
))
timeoutHappened = time.time() - timeStamp > timeout
if constants.get(name) is not None or timeoutHappened:
return constants.get(name)
time.sleep(0.01)
class SharedElementEncapsulation(object):
"""Encapsulates a reference to an element available in the shared module.
This is used by Futures (map on lambda, for instance)."""
def __init__(self, element):
self.isMethod = False
if utils.isStr(element):
# Already shared element
assert getConst(element, timeout=0) != None, (
"Element must already be shared."
)
self.uniqueID = element
else:
# Element to share
# Determine if function is a method. Methods derived from external
# languages such as C++ aren't detected by ismethod.
if ismethod(element):
# Must share whole object before ability to use its method
self.isMethod = True
self.methodName = element.__name__
element = element.__self__
# Lambda-like or unshared code to share
uniqueID = str(scoop.worker) + str(id(element)) + str(hash(element))
self.uniqueID = uniqueID
if getConst(uniqueID, timeout=0) == None:
funcRef = {uniqueID: element}
setConst(**funcRef)
def __repr__(self):
return self.uniqueID
def __call__(self, *args, **kwargs):
if self.isMethod:
wholeObj = getConst(
self.__repr__(),
timeout=float("inf"),
)
return getattr(wholeObj, self.methodName)(*args, **kwargs)
else:
return getConst(self.__repr__(),
timeout=float("inf"))(*args, **kwargs)
def __name__(self):
return self.__repr__()
|