/usr/lib/python2.7/dist-packages/mdp/nodes/em_nodes.py is in python-mdp 3.5-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | from __future__ import print_function
from __future__ import division
from builtins import range
from past.utils import old_div
__docformat__ = "restructuredtext en"
import mdp
from mdp import numx, numx_linalg, utils, NodeException
from mdp.utils import mult, CovarianceMatrix
import warnings
sqrt, inv, det = numx.sqrt, utils.inv, numx_linalg.det
normal = mdp.numx_rand.normal
# decreasing likelihood message
_LHOOD_WARNING = ('Likelihood decreased in FANode. This is probably due '
'to some numerical errors.')
warnings.filterwarnings('always', _LHOOD_WARNING, mdp.MDPWarning)
class FANode(mdp.Node):
"""Perform Factor Analysis.
The current implementation should be most efficient for long
data sets: the sufficient statistics are collected in the
training phase, and all EM-cycles are performed at
its end.
The ``execute`` method returns the Maximum A Posteriori estimate
of the latent variables. The ``generate_input`` method generates
observations from the prior distribution.
**Internal variables of interest**
``self.mu``
Mean of the input data (available after training)
``self.A``
Generating weights (available after training)
``self.E_y_mtx``
Weights for Maximum A Posteriori inference
``self.sigma``
Vector of estimated variance of the noise
for all input components
More information about Factor Analysis can be found in
Max Welling's classnotes:
http://www.ics.uci.edu/~welling/classnotes/classnotes.html ,
in the chapter 'Linear Models'.
"""
def __init__(self, tol=1e-4, max_cycles=100, verbose=False,
input_dim=None, output_dim=None, dtype=None):
"""
:Parameters:
tol
tolerance (minimum change in log-likelihood before exiting
the EM algorithm)
max_cycles
maximum number of EM cycles
verbose
if true, print log-likelihood during the EM-cycles
"""
# Notation as in Max Welling's notes
super(FANode, self).__init__(input_dim, output_dim, dtype)
self.tol = tol
self.max_cycles = max_cycles
self.verbose = verbose
self._cov_mtx = CovarianceMatrix(dtype, bias=True)
def _train(self, x):
# update the covariance matrix
self._cov_mtx.update(x)
def _stop_training(self):
#### some definitions
verbose = self.verbose
typ = self.dtype
tol = self.tol
d = self.input_dim
# if the number of latent variables is not specified,
# set it equal to the number of input components
if not self.output_dim:
self.output_dim = d
k = self.output_dim
# indices of the diagonal elements of a dxd or kxk matrix
idx_diag_d = [i*(d+1) for i in range(d)]
idx_diag_k = [i*(k+1) for i in range(k)]
# constant term in front of the log-likelihood
const = -d/2. * numx.log(2.*numx.pi)
##### request the covariance matrix and clean up
cov_mtx, mu, tlen = self._cov_mtx.fix()
del self._cov_mtx
cov_diag = cov_mtx.diagonal()
##### initialize the parameters
# noise variances
sigma = cov_diag
# loading factors
# Zoubin uses the determinant of cov_mtx^1/d as scale but it's
# too slow for large matrices. Is the product of the diagonal a good
# approximation?
if d<=300:
scale = det(cov_mtx)**(old_div(1.,d))
else:
scale = numx.product(sigma)**(old_div(1.,d))
if scale <= 0.:
err = ("The covariance matrix of the data is singular. "
"Redundant dimensions need to be removed.")
raise NodeException(err)
A = normal(0., sqrt(old_div(scale,k)), size=(d, k)).astype(typ)
##### EM-cycle
lhood_curve = []
base_lhood = None
old_lhood = -numx.inf
for t in range(self.max_cycles):
## compute B = (A A^T + Sigma)^-1
B = mult(A, A.T)
# B += diag(sigma), avoid computing diag(sigma) which is dxd
B.ravel().put(idx_diag_d, B.ravel().take(idx_diag_d)+sigma)
# this quantity is used later for the log-likelihood
# abs is there to avoid numerical errors when det < 0
log_det_B = numx.log(abs(det(B)))
# end the computation of B
B = inv(B)
## other useful quantities
trA_B = mult(A.T, B)
trA_B_cov_mtx = mult(trA_B, cov_mtx)
##### E-step
## E_yyT = E(y_n y_n^T | x_n)
E_yyT = - mult(trA_B, A) + mult(trA_B_cov_mtx, trA_B.T)
# E_yyT += numx.eye(k)
E_yyT.ravel().put(idx_diag_k, E_yyT.ravel().take(idx_diag_k)+1.)
##### M-step
A = mult(trA_B_cov_mtx.T, inv(E_yyT))
sigma = cov_diag - (mult(A, trA_B_cov_mtx)).diagonal()
##### log-likelihood
trace_B_cov = (B*cov_mtx.T).sum()
# this is actually likelihood/tlen.
lhood = const - 0.5*log_det_B - 0.5*trace_B_cov
if verbose:
print('cycle', t, 'log-lhood:', lhood)
##### convergence criterion
if base_lhood is None:
base_lhood = lhood
else:
# convergence criterion
if (lhood-base_lhood)<(1.+tol)*(old_lhood-base_lhood):
break
if lhood < old_lhood:
# this should never happen
# it sometimes does, e.g. if the noise is extremely low,
# because of numerical rounding effects
warnings.warn(_LHOOD_WARNING, mdp.MDPWarning)
old_lhood = lhood
lhood_curve.append(lhood)
self.tlen = tlen
self.A = A
self.mu = mu.reshape(1, d)
self.sigma = sigma
## MAP matrix
# compute B = (A A^T + Sigma)^-1
B = mult(A, A.T).copy()
B.ravel().put(idx_diag_d, B.ravel().take(idx_diag_d)+sigma)
B = inv(B)
self.E_y_mtx = mult(B.T, A)
self.lhood = lhood_curve
def _execute(self, x):
return mult(x-self.mu, self.E_y_mtx)
@staticmethod
def is_invertible():
return False
def generate_input(self, len_or_y=1, noise=False):
"""
Generate data from the prior distribution.
If the training phase has not been completed yet, call stop_training.
:Arguments:
len_or_y
If integer, it specified the number of observation
to generate. If array, it is used as a set of samples
of the latent variables
noise
if true, generation includes the estimated noise
"""
self._if_training_stop_training()
# set the output dimension if necessary
if self.output_dim is None:
# if the input_dim is not defined, raise an exception
if self.input_dim is None:
errstr = ("Number of input dimensions undefined. Inversion "
"not possible.")
raise NodeException(errstr)
self.output_dim = self.input_dim
if isinstance(len_or_y, int):
size = (len_or_y, self.output_dim)
y = self._refcast(mdp.numx_rand.normal(size=size))
else:
y = self._refcast(len_or_y)
self._check_output(y)
res = mult(y, self.A.T)+self.mu
if noise:
ns = mdp.numx_rand.normal(size=(y.shape[0], self.input_dim))
ns *= numx.sqrt(self.sigma)
res += self._refcast(ns)
return res
|