This file is indexed.

/usr/share/pyshared/Bio/HMM/Trainer.py is in python-biopython 1.58-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
"""Provide trainers which estimate parameters based on training sequences.

These should be used to 'train' a Markov Model prior to actually using
it to decode state paths. When supplied training sequences and a model
to work from, these classes will estimate paramters of the model.

This aims to estimate two parameters:

* a_{kl} -- the number of times there is a transition from k to l in the
training data.

* e_{k}(b) -- the number of emissions of the state b from the letter k
in the training data.
"""
# standard modules
import math

# local stuff
from DynamicProgramming import ScaledDPAlgorithms

class TrainingSequence(object):
    """Hold a training sequence with emissions and optionally, a state path.
    """
    def __init__(self, emissions, state_path):
        """Initialize a training sequence.

        Arguments:

        o emissions - A Seq object containing the sequence of emissions in
        the training sequence, and the alphabet of the sequence.

        o state_path - A Seq object containing the sequence of states and
        the alphabet of the states. If there is no known state path, then
        the sequence of states should be an empty string.
        """
        if len(state_path) > 0:
            assert len(emissions) == len(state_path), \
                   "State path does not match associated emissions."
        self.emissions = emissions
        self.states = state_path

class AbstractTrainer(object):
    """Provide generic functionality needed in all trainers.
    """
    def __init__(self, markov_model):
        self._markov_model = markov_model

    def log_likelihood(self, probabilities):
        """Calculate the log likelihood of the training seqs.

        Arguments:

        o probabilities -- A list of the probabilities of each training
        sequence under the current paramters, calculated using the forward
        algorithm.
        """
        total_likelihood = 0
        for probability in probabilities:
            total_likelihood += math.log(probability)

        return total_likelihood
                 
    def estimate_params(self, transition_counts, emission_counts):
        """Get a maximum likelihood estimation of transition and emmission.

        Arguments:
        
        o transition_counts -- A dictionary with the total number of counts
        of transitions between two states.

        o emissions_counts -- A dictionary with the total number of counts
        of emmissions of a particular emission letter by a state letter.

        This then returns the maximum likelihood estimators for the
        transitions and emissions, estimated by formulas 3.18 in
        Durbin et al:

        a_{kl} = A_{kl} / sum(A_{kl'})
        e_{k}(b) = E_{k}(b) / sum(E_{k}(b'))

        Returns:
        Transition and emission dictionaries containing the maximum
        likelihood estimators.
        """
        # now calculate the information
        ml_transitions = self.ml_estimator(transition_counts)
        ml_emissions = self.ml_estimator(emission_counts)

        return ml_transitions, ml_emissions

    def ml_estimator(self, counts):
        """Calculate the maximum likelihood estimator.

        This can calculate maximum likelihoods for both transitions
        and emissions.

        Arguments:

        o counts -- A dictionary of the counts for each item.

        See estimate_params for a description of the formula used for
        calculation.
        """
        # get an ordered list of all items
        all_ordered = counts.keys()
        all_ordered.sort()
        
        ml_estimation = {}

        # the total counts for the current letter we are on
        cur_letter = None
        cur_letter_counts = 0
        
        for cur_item in all_ordered:
            # if we are on a new letter (ie. the first letter of the tuple)
            if cur_item[0] != cur_letter:
                # set the new letter we are working with
                cur_letter = cur_item[0]

                # count up the total counts for this letter
                cur_letter_counts = counts[cur_item]
                
                # add counts for all other items with the same first letter
                cur_position = all_ordered.index(cur_item) + 1

                # keep adding while we have the same first letter or until
                # we get to the end of the ordered list
                while (cur_position < len(all_ordered) and
                       all_ordered[cur_position][0] == cur_item[0]):
                    cur_letter_counts += counts[all_ordered[cur_position]]
                    cur_position += 1
            # otherwise we've already got the total counts for this letter
            else:
                pass

            # now calculate the ml and add it to the estimation
            cur_ml = float(counts[cur_item]) / float(cur_letter_counts)
            ml_estimation[cur_item] = cur_ml

        return ml_estimation
            
class BaumWelchTrainer(AbstractTrainer):
    """Trainer that uses the Baum-Welch algorithm to estimate parameters.

    These should be used when a training sequence for an HMM has unknown
    paths for the actual states, and you need to make an estimation of the
    model parameters from the observed emissions.

    This uses the Baum-Welch algorithm, first described in
    Baum, L.E. 1972. Inequalities. 3:1-8
    This is based on the description in 'Biological Sequence Analysis' by
    Durbin et al. in section 3.3

    This algorithm is guaranteed to converge to a local maximum, but not
    necessarily to the global maxima, so use with care!
    """
    def __init__(self, markov_model):
        """Initialize the trainer.

        Arguments:
        
        o markov_model - The model we are going to estimate parameters for.
        This should have the parameters with some initial estimates, that
        we can build from.
        """
        AbstractTrainer.__init__(self, markov_model)

    def train(self, training_seqs, stopping_criteria,
              dp_method = ScaledDPAlgorithms):
        """Estimate the parameters using training sequences.

        The algorithm for this is taken from Durbin et al. p64, so this
        is a good place to go for a reference on what is going on.
        
        Arguments:

        o training_seqs -- A list of TrainingSequence objects to be used
        for estimating the parameters.
        
        o stopping_criteria -- A function, that when passed the change
        in log likelihood and threshold, will indicate if we should stop
        the estimation iterations.

        o dp_method -- A class instance specifying the dynamic programming
        implementation we should use to calculate the forward and
        backward variables. By default, we use the scaling method.
        """
        prev_log_likelihood = None
        num_iterations = 1
        
        while 1:            
            transition_count = self._markov_model.get_blank_transitions()
            emission_count = self._markov_model.get_blank_emissions()

            # remember all of the sequence probabilities
            all_probabilities = []
            
            for training_seq in training_seqs:
                # calculate the forward and backward variables
                DP = dp_method(self._markov_model, training_seq)
                forward_var, seq_prob = DP.forward_algorithm()
                backward_var =  DP.backward_algorithm()
                
                all_probabilities.append(seq_prob)

                # update the counts for transitions and emissions
                transition_count = self.update_transitions(transition_count,
                                                           training_seq,
                                                           forward_var,
                                                           backward_var,
                                                           seq_prob)
                emission_count = self.update_emissions(emission_count,
                                                       training_seq,
                                                       forward_var,
                                                       backward_var,
                                                       seq_prob)

            # update the markov model with the new probabilities
            ml_transitions, ml_emissions = \
                self.estimate_params(transition_count, emission_count)
            self._markov_model.transition_prob = ml_transitions
            self._markov_model.emission_prob = ml_emissions

            cur_log_likelihood =  self.log_likelihood(all_probabilities)

            # if we have previously calculated the log likelihood (ie.
            # not the first round), see if we can finish
            if prev_log_likelihood is not None:
                # XXX log likelihoods are negatives -- am I calculating
                # the change properly, or should I use the negatives...
                # I'm not sure at all if this is right.
                log_likelihood_change = abs(abs(cur_log_likelihood) -
                                            abs(prev_log_likelihood))

                # check whether we have completed enough iterations to have
                # a good estimation
                if stopping_criteria(log_likelihood_change, num_iterations):
                    break

            # set up for another round of iterations
            prev_log_likelihood = cur_log_likelihood
            num_iterations += 1

        return self._markov_model

    def update_transitions(self, transition_counts, training_seq,
                           forward_vars, backward_vars, training_seq_prob):
        """Add the contribution of a new training sequence to the transitions.

        Arguments:

        o transition_counts -- A dictionary of the current counts for the
        transitions

        o training_seq -- The training sequence we are working with

        o forward_vars -- Probabilities calculated using the forward
        algorithm.

        o backward_vars -- Probabilities calculated using the backwards
        algorithm.

        o training_seq_prob - The probability of the current sequence.

        This calculates A_{kl} (the estimated transition counts from state
        k to state l) using formula 3.20 in Durbin et al.
        """
        # set up the transition and emission probabilities we are using
        transitions = self._markov_model.transition_prob
        emissions = self._markov_model.emission_prob
        
        # loop over the possible combinations of state path letters
        for k in training_seq.states.alphabet.letters:
            for l in self._markov_model.transitions_from(k):
                estimated_counts = 0
                # now loop over the entire training sequence
                for i in range(len(training_seq.emissions) - 1):
                    # the forward value of k at the current position
                    forward_value = forward_vars[(k, i)]

                    # the backward value of l in the next position
                    backward_value = backward_vars[(l, i + 1)]

                    # the probability of a transition from k to l
                    trans_value = transitions[(k, l)]

                    # the probability of getting the emission at the next pos
                    emm_value = emissions[(l, training_seq.emissions[i + 1])]

                    estimated_counts += (forward_value * trans_value *
                                         emm_value * backward_value)

                # update the transition approximation
                transition_counts[(k, l)] += (float(estimated_counts) /
                                              training_seq_prob)
                    
        return transition_counts

    def update_emissions(self, emission_counts, training_seq,
                           forward_vars, backward_vars, training_seq_prob):
        """Add the contribution of a new training sequence to the emissions

        Arguments:

        o emission_counts -- A dictionary of the current counts for the
        emissions

        o training_seq -- The training sequence we are working with

        o forward_vars -- Probabilities calculated using the forward
        algorithm.

        o backward_vars -- Probabilities calculated using the backwards
        algorithm.

        o training_seq_prob - The probability of the current sequence.

        This calculates E_{k}(b) (the estimated emission probability for
        emission letter b from state k) using formula 3.21 in Durbin et al.
        """
        # loop over the possible combinations of state path letters
        for k in training_seq.states.alphabet.letters:
            # now loop over all of the possible emissions
            for b in training_seq.emissions.alphabet.letters:
                expected_times = 0
                # finally loop over the entire training sequence
                for i in range(len(training_seq.emissions)):
                    # only count the forward and backward probability if the
                    # emission at the position is the same as b
                    if training_seq.emissions[i] == b:
                        # f_{k}(i) b_{k}(i)
                        expected_times += (forward_vars[(k, i)] *
                                           backward_vars[(k, i)])

                # add to E_{k}(b)
                emission_counts[(k, b)] += (float(expected_times) /
                                            training_seq_prob)

        return emission_counts

class KnownStateTrainer(AbstractTrainer):
    """Estimate probabilities with known state sequences.

    This should be used for direct estimation of emission and transition
    probabilities when both the state path and emission sequence are
    known for the training examples.
    """
    def __init__(self, markov_model):
        AbstractTrainer.__init__(self, markov_model)

    def train(self, training_seqs):
        """Estimate the Markov Model parameters with known state paths.

        This trainer requires that both the state and the emissions are
        known for all of the training sequences in the list of
        TrainingSequence objects.
        This training will then count all of the transitions and emissions,
        and use this to estimate the parameters of the model.
        """
        # count up all of the transitions and emissions
        transition_counts = self._markov_model.get_blank_transitions()
        emission_counts = self._markov_model.get_blank_emissions()

        for training_seq in training_seqs:
            emission_counts = self._count_emissions(training_seq,
                                                    emission_counts)
            transition_counts = self._count_transitions(training_seq.states,
                                                        transition_counts)

        # update the markov model from the counts
        ml_transitions, ml_emissions = \
                        self.estimate_params(transition_counts,
                                             emission_counts)
        self._markov_model.transition_prob = ml_transitions
        self._markov_model.emission_prob = ml_emissions

        return self._markov_model

    def _count_emissions(self, training_seq, emission_counts):
        """Add emissions from the training sequence to the current counts.

        Arguments:

        o training_seq -- A TrainingSequence with states and emissions
        to get the counts from

        o emission_counts -- The current emission counts to add to.
        """
        for index in range(len(training_seq.emissions)):
            cur_state = training_seq.states[index]
            cur_emission = training_seq.emissions[index]

            try:
                emission_counts[(cur_state, cur_emission)] += 1
            except KeyError:
                raise KeyError("Unexpected emission (%s, %s)"
                               % (cur_state, cur_emission))
        return emission_counts

    def _count_transitions(self, state_seq, transition_counts):
        """Add transitions from the training sequence to the current counts.

        Arguments:

        o state_seq -- A Seq object with the states of the current training
        sequence.

        o transition_counts -- The current transition counts to add to.
        """
        for cur_pos in range(len(state_seq) - 1):
            cur_state = state_seq[cur_pos]
            next_state = state_seq[cur_pos + 1]

            try:
                transition_counts[(cur_state, next_state)] += 1
            except KeyError:
                raise KeyError("Unexpected transition (%s, %s)" %
                               (cur_state, next_state))

        return transition_counts