/usr/share/pyshared/cogent/maths/markov.py is in python-cogent 1.5.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | #!/usr/bin/env python
import numpy
import bisect
Float = numpy.core.numerictypes.sctype2char(float)
__author__ = "Peter Maxwell"
__copyright__ = "Copyright 2007-2011, The Cogent Project"
__credits__ = ["Peter Maxwell", "Gavin Huttley"]
__license__ = "GPL"
__version__ = "1.5.1"
__maintainer__ = "Peter Maxwell"
__email__ = "pm67nz@gmail.com"
__status__ = "Production"
class TransitionMatrix(object):
"""The transition matrix for a Markov process. Just a square
numpy array plus a list of state 'tags', eg:
>>> a = numpy.array([
[.9, .0, .1],
[.0, .9, .1],
[.5, .5, .0],], Float)
>>> T = TransitionMatrix(a, ['x', 'y', 'm'])
"""
def __init__(self, matrix, tags, stationary_probs=None):
self.Matrix = numpy.array(matrix, Float)
self.Tags = list(tags)
self.size = len(matrix)
assert matrix.shape == (self.size, self.size)
assert len(tags) == self.size
if stationary_probs is None:
self._stationary_probs = None
else:
# Could recalculate, but if it is provided then faster to
# just trust it
assert len(stationary_probs) == self.size
self._stationary_probs = numpy.array(
stationary_probs, Float)
def _getStationaryProbs(self):
if self._stationary_probs is None:
matrix = self.Matrix
for i in range(10):
matrix = numpy.core.multiarray.dot(matrix, matrix)
self._stationary_probs = matrix[0]
return self._stationary_probs
StationaryProbs = property(_getStationaryProbs)
def emit(self, random_series):
"""Generates an infinite sequence of states"""
partitions = numpy.add.accumulate(self.Matrix, axis=1)
for (state, row) in enumerate(partitions[:-1]):
assert abs(row[-1]-1.0) < 1e-6, (state, self.Matrix[state])
x = random_series.uniform(0.0, 1.0)
state = bisect.bisect_left(
numpy.add.accumulate(self.StationaryProbs), x)
while 1:
yield self.Tags[state]
x = random_series.uniform(0.0, 1.0)
state = bisect.bisect_left(partitions[state], x)
def __repr__(self):
from cogent.util.table import Table
labels = []
for (i, label) in enumerate(self.Tags):
if hasattr(label, '__len__') and not isinstance(
label, basestring):
label = ','.join(str(z) for z in label)
# Table needs unique labels
label = "%s (%s)" % (label, i)
labels.append(label)
heading = [''] + labels
a = [[name] + list(row) for (name, row) in zip(labels, self.Matrix)]
return str(Table(header = heading, rows = a))
def withoutSilentStates(self):
"""An equivalent matrix without any of the states that have a
false tag value"""
N = self.size
silent = numpy.array(
[(not max(tag)) for tag in self.Tags], Float)
matrix = numpy.zeros([N, N], Float)
for i in range(N):
row = numpy.zeros([N], Float)
emt = numpy.zeros([N], Float)
row[i] = 1.0
while max((row+emt)-emt):
row = numpy.dot(row, self.Matrix)
nul = silent * row
emt += row - nul
row = nul
matrix[i] = emt
keep = [i for i in range(self.size) if max(matrix[:,i])]
matrix = numpy.take(matrix, keep, axis=0)
matrix = numpy.take(matrix, keep, axis=1)
tags = numpy.take(self.Tags, keep, axis=0)
return type(self)(matrix, tags)
def getLikelihoodOfSequence(self, obs, backward=False):
"""Just for testing really"""
profile = numpy.zeros([len(obs), self.size], Float)
for (i,a) in enumerate(obs):
# This is suspiciously alphabet-like!
profile[i, self.Tags.index(obs[i])] = 1.0
return self.getLikelihoodOfProfile(profile, backward=backward)
def getLikelihoodOfProfile(self, obs, backward=False):
"""Just for testing really"""
if not backward:
state_probs = self.StationaryProbs.copy()
for i in range(len(obs)):
state_probs = numpy.dot(state_probs, self.Matrix) * obs[i]
return sum(state_probs)
else:
state_probs = numpy.ones([self.size], Float)
for i in range(len(obs)-1, -1, -1):
state_probs = numpy.dot(self.Matrix, state_probs * obs[i])
return sum(state_probs * self.StationaryProbs)
def getPosteriorProbs(self, obs):
"""'obs' is a sequence of state probability vectors"""
result = numpy.zeros([len(obs), self.size], Float)
# Forward
state_probs = self.StationaryProbs.copy()
for i in range(len(obs)):
state_probs = numpy.dot(state_probs, self.Matrix) * obs[i]
state_probs /= sum(state_probs)
result[i] = state_probs
# and Backward
state_probs = numpy.ones([self.size], Float)
for i in range(len(obs)-1, -1, -1):
state_probs = numpy.dot(self.Matrix, state_probs)
state_probs /= sum(state_probs)
result[i] *= state_probs
result[i] /= sum(result[i])
state_probs *= obs[i]
return result
def nestTransitionMatricies(self, Ts, blended=None):
"""Useful for combining several X/Y/M Pair HMMs into
one large HMM. The transition matricies 'Ts' end up
along the diagonal blocks of the result, and the off
diagonal values depend on the region switching probablities
defined by 'self'"""
if blended is None:
blended = lambda a,b: (a+b)/2.0
#blended = lambda a,b: numpy.sqrt(a*b)
#blended = lambda a,b: b
R = self.Matrix
n = len(R)
assert len(Ts) == n
result = None
for (x, a) in enumerate(self.Tags):
a = Ts[a-1]
if result is None:
tags = a.Tags
c = len(tags)
result = numpy.zeros([c*n, c*n], Float)
else:
assert a.Tags == tags, (a.Tags, tags)
a = a.Matrix
for (y, b) in enumerate(self.Tags):
b = Ts[b-1].Matrix
block = self.Matrix[x,y] * blended(a, b)
result[x*c:(x+1)*c, y*c:(y+1)*c] = block
all_tags = []
for i in self.Tags:
all_tags.extend([[i*e for e in tag] for tag in tags])
return TransitionMatrix(result, all_tags)
def SiteClassTransitionMatrix(switch, probs):
"""TM defined by stationary probabilities and a 'switch' parameter,
switch=0.0 gives an identity Matrix, switch=1.0 gives independence,
ie: zero-order markov process"""
probs = numpy.asarray(probs)
assert numpy.allclose(sum(probs), 1.0), probs
I = numpy.identity(len(probs), Float)
switch_probs = (1.0 - I) * (probs * switch) + \
I * (1.0 - (1.0 - probs) * switch)
tags = [i+1 for i in range(len(switch_probs))]
return TransitionMatrix(
switch_probs, tags, stationary_probs=probs.copy())
|