/usr/share/pyshared/mdp/test/test_parallelhinet.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | from _tools import *
import mdp.parallel as parallel
import mdp.hinet as hinet
n = numx
class TestParallelHinetNodes():
"""Tests for ParallelFlowNode."""
def setup_method(self, method):
if "parallel" in mdp.get_active_extensions():
self.set_parallel = False
else:
mdp.activate_extension("parallel")
self.set_parallel = True
def teardown_method(self, method):
if self.set_parallel:
mdp.deactivate_extension("parallel")
def test_flownode(self):
"""Test ParallelFlowNode."""
flow = mdp.Flow([mdp.nodes.SFANode(output_dim=5),
mdp.nodes.PolynomialExpansionNode(degree=2),
mdp.nodes.SFANode(output_dim=3)])
flownode = mdp.hinet.FlowNode(flow)
x = n.random.random([100,50])
chunksize = 25
chunks = [x[i*chunksize : (i+1)*chunksize]
for i in xrange(len(x)//chunksize)]
while flownode.get_remaining_train_phase() > 0:
for chunk in chunks:
forked_node = flownode.fork()
forked_node.train(chunk)
flownode.join(forked_node)
flownode.stop_training()
# test execution
flownode.execute(x)
def test_flownode_forksingle(self):
"""Test that ParallelFlowNode forks only the first training node."""
flow = mdp.Flow([mdp.nodes.SFANode(output_dim=5),
mdp.nodes.PolynomialExpansionNode(degree=2),
mdp.nodes.SFANode(output_dim=3)])
flownode = mdp.hinet.FlowNode(flow)
forked_flownode = flownode.fork()
assert flownode._flow[0] is not forked_flownode._flow[0]
assert flownode._flow[1] is forked_flownode._flow[1]
assert flownode._flow[2] is forked_flownode._flow[2]
# Sabotage joining for the second SFANode, which should not be joined,
# causing AttributeError: 'NoneType' ... when it is joined.
flownode._flow[2]._cov_mtx = None
flownode.join(forked_flownode)
def test_parallelnet(self):
"""Test a simple parallel net with big data.
Includes ParallelFlowNode, ParallelCloneLayer, ParallelSFANode
and training via a ParallelFlow.
"""
noisenode = mdp.nodes.NormalNoiseNode(input_dim=20*20,
noise_args=(0,0.0001))
sfa_node = mdp.nodes.SFANode(input_dim=20*20, output_dim=10)
switchboard = hinet.Rectangular2dSwitchboard(in_channels_xy=100,
field_channels_xy=20,
field_spacing_xy=10)
flownode = mdp.hinet.FlowNode(mdp.Flow([noisenode, sfa_node]))
sfa_layer = mdp.hinet.CloneLayer(flownode,
switchboard.output_channels)
flow = parallel.ParallelFlow([switchboard, sfa_layer])
data_iterables = [None,
[n.random.random((10, 100*100)) for _ in xrange(3)]]
scheduler = parallel.Scheduler()
flow.train(data_iterables, scheduler=scheduler)
def test_layer(self):
"""Test Simple random test with three nodes."""
node1 = mdp.nodes.SFANode(input_dim=10, output_dim=5)
node2 = mdp.nodes.SFANode(input_dim=17, output_dim=3)
node3 = mdp.nodes.SFANode(input_dim=3, output_dim=1)
layer = mdp.hinet.Layer([node1, node2, node3])
flow = parallel.ParallelFlow([layer])
data_iterables = [[n.random.random((10, 30)) for _ in xrange(3)]]
scheduler = parallel.Scheduler()
flow.train(data_iterables, scheduler=scheduler)
|