/usr/lib/python2.7/dist-packages/mdp/nodes/xsfa_nodes.py is in python-mdp 3.5-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 | from __future__ import print_function
from __future__ import division
from builtins import range
from past.utils import old_div
__docformat__ = "restructuredtext en"
import mdp
class XSFANode(mdp.Node):
"""Perform Non-linear Blind Source Separation using Slow Feature Analysis.
This node is designed to iteratively extract statistically
independent sources from (in principle) arbitrary invertible
nonlinear mixtures. The method relies on temporal correlations in
the sources and consists of a combination of nonlinear SFA and a
projection algorithm. More details can be found in the reference
given below (once it's published).
The node has multiple training phases. The number of training
phases depends on the number of sources that must be
extracted. The recommended way of training this node is through a
container flow::
>>> flow = mdp.Flow([XSFANode()])
>>> flow.train(x)
doing so will automatically train all training phases. The argument
``x`` to the ``Flow.train`` method can be an array or a list of iterables
(see the section about Iterators in the MDP tutorial for more info).
If the number of training samples is large, you may run into
memory problems: use data iterators and chunk training to reduce
memory usage.
If you need to debug training and/or execution of this node, the
suggested approach is to use the capabilities of BiMDP. For example::
>>> flow = mdp.Flow([XSFANode()])
>>> tr_filename = bimdp.show_training(flow=flow, data_iterators=x)
>>> ex_filename, out = bimdp.show_execution(flow, x=x)
this will run training and execution with bimdp inspection. Snapshots
of the internal flow state for each training phase and execution step
will be opened in a web brower and presented as a slideshow.
References:
Sprekeler, H., Zito, T., and Wiskott, L. (2009).
An Extension of Slow Feature Analysis for Nonlinear Blind Source Separation.
Journal of Machine Learning Research.
http://cogprints.org/7056/1/SprekelerZitoWiskott-Cogprints-2010.pdf
"""
def __init__(self, basic_exp=None, intern_exp=None, svd=False, verbose=False,
input_dim=None, output_dim=None, dtype=None):
"""
:Keywords:
basic_exp
a tuple ``(node, args, kwargs)`` defining the node used for
the basic nonlinear expansion. It is assumed that the
mixture is linearly invertible after this expansion. The
higher the complexity of the nonlinearity, the higher are
the chances of inverting the unknown mixture. On the
other hand, high complexity of the nonlinear expansion
increases the danger of numeric instabilities, which can
cause singularities in the simulation or errors in the
source estimation. The trade-off has to be evaluated
carefully.
Default: ``(mdp.nodes.PolynomialExpansionNode, (2, ), {})``
intern_exp
a tuple ``(node, args, kwargs)`` defining the node used
for the internal nonlinear expansion of the estimated
sources to be removed from the input space. The same
trade-off as for basic_exp is valid here.
Default: ``(mdp.nodes.PolynomialExpansionNode, (10, ), {})``
svd
enable Singular Value Decomposition for normalization
and regularization. Use it if the node complains about
singular covariance matrices.
verbose
show some progress during training.
Default: False
"""
# set up basic expansion
if basic_exp is None:
self.basic_exp = mdp.nodes.PolynomialExpansionNode
self.basic_exp_args = (2, )
self.basic_exp_kwargs = {}
else:
self.basic_exp = basic_exp[0]
self.basic_exp_args = basic_exp[1]
self.basic_exp_kwargs = basic_exp[2]
# set up internal expansion
if intern_exp is None:
self.exp = mdp.nodes.PolynomialExpansionNode
self.exp_args = (10, )
self.exp_kwargs = {}
else:
self.exp = intern_exp[0]
self.exp_args = intern_exp[1]
self.exp_kwargs = intern_exp[2]
# number of sources already extracted
self.n_extracted_src = 0
# internal network
self._flow = None
self.verbose = verbose
self.svd = svd
super(XSFANode, self).__init__(input_dim=input_dim,
output_dim=output_dim, dtype=dtype)
@property
def flow(self):
"""Read-only internal flow property."""
return self._flow
def _get_train_seq(self):
#XXX: this is a hack
# In order to enable the output_dim to be set automatically
# after input_dim is known, instead of forcing the user to specify
# it by hand, we need to initialize the internal flow just before
# starting the first training (input_dim are known at that moment).
# Problem is that when XSFANode is trained through a container flow,
# which is the standard way of training this kind of nodes,
# the flow checks that the data_iterators are *not* generators
# for multiple phases nodes. To assess if a node has multiple phases
# it checks that len(self._train_seq) > 1. But we still
# don't know the number of training_phases at this point, because we
# first need to know input_dim, which we will know after we receive the
# first chunk of data. To avoid the flow to complain we just return
# a bogus list of training phases: it should break anything else.
if self._flow is None:
# we still don't know the number of training_phases yet,
# but we can assure that we will have more than 1:
return [(None, None), (None, None)]
else:
return ([(self._train, self._stop_training)] *
sum(self._training_phases))
def _set_input_dim(self, n):
self._input_dim = n
# set output_dim if thery are still not set
if self.output_dim is None:
self.output_dim = n
def _check_train_args(self, x):
# this method will be called before starting training.
# it is the right moment to initialize the internal flow
if self._flow is None:
self._initialize_internal_flow()
if self.verbose:
print("Extracting source 1...")
def _initialize_internal_flow(self):
# create the initial flow if it's not there already
# set input_dim is needed to correctly create the first
# network layer
self.basic_exp_kwargs['input_dim'] = self.input_dim
exp = self.basic_exp(*self.basic_exp_args, **self.basic_exp_kwargs)
# first element of the flow is the basic expansion node
# after that the first source extractor module is appended
self._flow = (exp + self._get_source_extractor(exp.output_dim, 0))
# set the training phases
# set the total number of training phases
training_phases = []
for S in range(self.output_dim):
# get the number of training phases of every single
# source extractor module
mod = self._get_source_extractor(S+1, S)
training_phases.append(len(mod._train_seq))
self._training_phases = training_phases
# this is a list of the training phases the correspond to
# completed training of a source extractor module
self._training_phases_mods = [sum(training_phases[:i+1]) for i in
range(len(training_phases[:-1]))]
@staticmethod
def is_invertible():
return False
def _train(self, x):
# train the last source extractor module in the flow
self._flow[-1].train(self._flow[:-1](x))
def _stop_training(self):
# stop the current training phase
self._flow[-1].stop_training()
# update the current training phase
cur_tr_ph = self.get_current_train_phase() + 1
# if we finished to train the current source extractor module
# and we still have to extract some sources
# append a new source extractor module
if (cur_tr_ph in self._training_phases_mods and
self.n_extracted_src != (self.output_dim - 1)):
self.n_extracted_src += 1
mod = self._get_source_extractor(self._flow[-1].output_dim,
self.n_extracted_src)
self._flow.append(mod)
if self.verbose:
print("Extracting source %d..." % (self.n_extracted_src+1))
def _execute(self, x):
return self._flow(x)[:,:self.output_dim]
def _get_source_extractor(self, dim, nsources):
# returns a module to extract the next source and remove its
# projections in the data space
S = nsources
L = dim-S
# sfa - extracts the next source
sfa = mdp.nodes.SFANode(input_dim=L, output_dim=L)
# identity - copies the new sources
idn_new1 = mdp.nodes.IdentityNode(input_dim=S+1)
# source expansion
self.exp_kwargs['input_dim'] = S + 1
# N2
src_exp = mdp.hinet.FlowNode(self.exp(*self.exp_args,
**self.exp_kwargs) +
NormalizeNode() +
mdp.nodes.WhiteningNode(svd=self.svd,
reduce=True))
N2Layer = mdp.hinet.SameInputLayer((src_exp, idn_new1))
N2ContLayer = mdp.hinet.Layer((N2Layer,
mdp.nodes.IdentityNode(input_dim=L-1)))
if S == 0:
# don't need to copy the current sources (there are none)
N1 = mdp.hinet.FlowNode(sfa + N2ContLayer)
elif S == self.output_dim - 1:
# the last source does not need to be removed
# take care of passing the sources down along the flow
idn_old = mdp.nodes.IdentityNode(input_dim=S)
return mdp.hinet.Layer((idn_old,
mdp.nodes.SFANode(input_dim=L,
output_dim=1)))
else:
# take care of passing the sources down along the flow
idn_old = mdp.nodes.IdentityNode(input_dim=S)
N1 = mdp.hinet.FlowNode(mdp.hinet.Layer((idn_old, sfa)) +
N2ContLayer)
# expanded sources projection
proj = ProjectionNode(S, L-1)
# use another identity node to copy the sources
# we could in principle reuse the idn_new1 but using a new
# node will make debugging much easier
idn_new2 = mdp.nodes.IdentityNode(input_dim=S+1)
# regularization after projection + new source copying
reg_and_copy = mdp.hinet.Layer((idn_new2,
mdp.nodes.WhiteningNode(input_dim=L-1,
svd=self.svd,
reduce=True)))
# actual source removal flow
src_rem = mdp.hinet.FlowNode( proj + reg_and_copy )
# return the actual source extraction module
return mdp.hinet.FlowNode(N1 + src_rem)
class ProjectionNode(mdp.Node):
"""Get expanded sources and input signals, and return
the sources and the input signals projected into the space
orthogonal to the expanded sources and their products."""
def __init__(self, S, L):
#!! IMPORTANT!!
# this node *must* return the sources together with the
# projected input signals
self.proj_mtx = None
self.L = L
super(ProjectionNode, self).__init__(output_dim=S+1+L)
self._cov_mtx = mdp.utils.CrossCovarianceMatrix(self.dtype)
def _train(self, x):
# compute covariance between expanded sources
# and input signals
self._cov_mtx.update(x[:,:-self.output_dim], x[:,-self.L:])
def _stop_training(self):
self.proj_mtx, avgx, avgy, self.tlen = self._cov_mtx.fix()
def _execute(self, x):
src = x[:, -self.output_dim:-self.L]
exp = x[:, :-self.output_dim]
inp = x[:, -self.L:]
# result container
result = mdp.numx.zeros((x.shape[0], self.output_dim))
# project input on the plane orthogonal to the expanded sources
result[:, -self.L:] = inp - mdp.utils.mult(exp, self.proj_mtx)
# copy the sources
result[:, :-self.L] = src
return result
class NormalizeNode(mdp.PreserveDimNode):
"""Make input signal meanfree and unit variance"""
def __init__(self, input_dim=None, output_dim=None, dtype=None):
self._cov_mtx = mdp.utils.CovarianceMatrix(dtype)
super(NormalizeNode, self).__init__(input_dim, output_dim, dtype)
@staticmethod
def is_trainable():
return True
def _train(self, x):
self._cov_mtx.update(x)
def _stop_training(self):
cov_mtx, avg, tlen = self._cov_mtx.fix()
self.m = avg
self.s = mdp.numx.sqrt(mdp.numx.diag(cov_mtx))
def _execute(self, x):
return old_div((x - self.m),self.s)
def _inverse(self, y):
return y*self.s + self.m
|