/usr/share/pyshared/mdp/parallel/parallelhinet.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | """
Parallel versions of hinet nodes.
Note that internal nodes are referenced instead of copied, in order to save
memory.
"""
import mdp.hinet as hinet
import parallelnodes
class ParallelFlowNode(hinet.FlowNode, parallelnodes.ParallelExtensionNode):
"""Parallel version of FlowNode."""
def _fork(self):
"""Fork nodes that require it, reference all other nodes.
If a required fork() fails the exception is not caught here.
"""
node_list = []
found_train_node = False # set to True at the first training node
for node in self._flow:
if not found_train_node and node.is_training():
found_train_node = True
node_list.append(node.fork())
elif node.use_execute_fork():
node_list.append(node.fork())
else:
node_list.append(node)
return self.__class__(self._flow.__class__(node_list))
def _join(self, forked_node):
"""Join the required nodes from the forked node into this FlowNode."""
found_train_node = False # set to True at the first training node
for i_node, node in enumerate(forked_node._flow):
if not found_train_node and node.is_training():
found_train_node = True
self._flow[i_node].join(node)
elif node.use_execute_fork():
self._flow[i_node].join(node)
def use_execute_fork(self):
return any(node.use_execute_fork() for node in self._flow)
class ParallelLayer(hinet.Layer, parallelnodes.ParallelExtensionNode):
"""Parallel version of a Layer."""
def _fork(self):
"""Fork or copy all the nodes in the layer to fork the layer."""
forked_nodes = []
for node in self.nodes:
if node.is_training():
forked_nodes.append(node.fork())
else:
forked_nodes.append(node)
return self.__class__(forked_nodes)
def _join(self, forked_node):
"""Join the trained nodes from the forked layer."""
for i_node, layer_node in enumerate(self.nodes):
if layer_node.is_training():
layer_node.join(forked_node.nodes[i_node])
def use_execute_fork(self):
return any(node.use_execute_fork() for node in self.nodes)
class ParallelCloneLayer(hinet.CloneLayer, parallelnodes.ParallelExtensionNode):
"""Parallel version of CloneLayer class."""
def _fork(self):
"""Fork the internal node in the clone layer."""
return self.__class__(self.node.fork(), n_nodes=len(self.nodes))
def _join(self, forked_node):
"""Join the internal node in the clone layer."""
self.node.join(forked_node.node)
def use_execute_fork(self):
return self.node.use_execute_fork()
|