This file is indexed.

/usr/share/pyshared/nitime/analysis/granger.py is in python-nitime 0.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""

Analyzers for the calculation of Granger 'causality'

"""

import numpy as np
import nitime.algorithms as alg
import nitime.utils as utils
from nitime import descriptors as desc

from .base import BaseAnalyzer

# To suppport older versions of numpy that don't have tril_indices:
from nitime.index_utils import tril_indices_from

def fit_model(x1, x2, order=None, max_order=10,
              criterion=utils.bayesian_information_criterion):
    """
    Fit the auto-regressive model used in calculation of Granger 'causality'.

    Parameters
    ----------

    x1,x2: float arrays (n)
        x1,x2 bivariate combination.
    order: int (optional)
        If known, the order of the autoregressive process
    max_order: int (optional)
        If the order is not known, this will be the maximal order to fit.
    criterion: callable
       A function which defines an information criterion, used to determine the
        order of the model.

    """
    c_old = np.inf
    n_process = 2
    Ntotal = n_process * x1.shape[-1]

    # If model order was provided as an input:
    if order is not None:
        lag = order + 1
        Rxx = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
        coef, ecov = alg.lwr_recursion(np.array(Rxx).transpose(2, 0, 1))

    # If the model order is not known and provided as input:
    else:
        for lag in range(1, max_order):
            Rxx_new = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
            coef_new, ecov_new = alg.lwr_recursion(
                                        np.array(Rxx_new).transpose(2, 0, 1))
            order_new = coef_new.shape[0]
            c_new = criterion(ecov_new, n_process, order_new, Ntotal)
            if c_new > c_old:
                # Keep the values you got in the last round and break out:
                break

            else:
                # Replace the output values with the new calculated values and
                # move on to the next order:
                c_old = c_new
                order = order_new
                Rxx = Rxx_new
                coef = coef_new
                ecov = ecov_new
        else:
            e_s = ("Model estimation order did not converge at max_order = %s"
                                                                  % max_order)
            raise ValueError(e_s)

    return order, Rxx, coef, ecov


class GrangerAnalyzer(BaseAnalyzer):
    """Analyzer for computing all-to-all Granger 'causality' """
    def __init__(self, input=None, ij=None, order=None, max_order=10,
                 criterion=utils.bayesian_information_criterion, n_freqs=1024):
        """
        Initializer for the GrangerAnalyzer.

        Parameters
        ----------

        input: nitime TimeSeries object
        ij: List of tuples of the form: [(0, 1), (0, 2)], etc.
            These are the indices of pairs of time-series for which the
            analysis will be done. Defaults to all vs. all.
        order: int (optional)
             The order of the process. If this is not known, it will be
             estimated from the data, using the information criterion
        max_order: if the order is estimated, this is the maximal order to
             estimate for.
        n_freqs: int (optional)
            The size of the sampling grid in the frequency domain.
            Defaults to 1024
        criterion:
            XXX
        """
        self.data = input.data
        self.sampling_rate = input.sampling_rate
        self._n_process = input.shape[0]
        self._n_freqs = n_freqs
        self._order = order
        self._criterion = criterion
        self._max_order = max_order
        if ij is None:
            # The following gets the full list of combinations of
            # non-same i's and j's:
            x, y = np.meshgrid(np.arange(self._n_process),
                               np.arange(self._n_process))
            self.ij = list(zip(x[tril_indices_from(x, -1)],
                          y[tril_indices_from(y, -1)]))
        else:
            self.ij = ij

    @desc.setattr_on_read
    def _model(self):
        model = dict(order={}, autocov={}, model_coef={}, error_cov={})
        for i, j in self.ij:
            model[i, j] = dict()
            order_t, Rxx_t, coef_t, ecov_t = fit_model(self.data[i],
                                                   self.data[j],
                                                   order=self._order,
                                                   max_order=self._max_order,
                                                   criterion=self._criterion)
            model['order'][i, j] = order_t
            model['autocov'][i, j] = Rxx_t
            model['model_coef'][i, j] = coef_t
            model['error_cov'][i, j] = ecov_t

        return model

    @desc.setattr_on_read
    def order(self):
        if self._order is None:
            return self._model['order']
        else:
            order = {}
            for i, j in self.ij:
                order[i, j] = self._order
            return order

    @desc.setattr_on_read
    def autocov(self):
        return self._model['autocov']

    @desc.setattr_on_read
    def model_coef(self):
        return self._model['model_coef']

    @desc.setattr_on_read
    def error_cov(self):
        return self._model['error_cov']

    @desc.setattr_on_read
    def _granger_causality(self):
        """
        This returns a dict with the values computed by
        :func:`granger_causality_xy`, rather than arrays, so that we can delay
        the allocation of arrays as much as possible.

        """
        gc = dict(frequencies={}, gc_xy={}, gc_yx={}, gc_sim={},
                  spectral_density={})
        for i, j in self.ij:
            w, f_x2y, f_y2x, f_xy, Sw = \
               alg.granger_causality_xy(self.model_coef[i, j],
                                        self.error_cov[i, j],
                                        n_freqs=self._n_freqs)

            # All other measures are dependent on i, j:
            gc['gc_xy'][i, j] = f_x2y
            gc['gc_yx'][i, j] = f_y2x
            gc['gc_sim'][i, j] = f_xy
            gc['spectral_density'][i, j] = Sw

        return gc

    @desc.setattr_on_read
    def frequencies(self):
        return utils.get_freqs(self.sampling_rate, self._n_freqs)

    def _dict2arr(self, key):
        """
        A helper function that will generate an array with all nan's and insert
        the measure defined by 'key' into the array and return it to the
        calling function. This allows us to get matrices of the measures of
        interest, instead of a dict.
        """
        # Prepare the matrix for the output:
        arr = np.empty((self._n_process,
                        self._n_process,
                        self.frequencies.shape[0]))

        arr.fill(np.nan)

        # 'Translate' from dict form into matrix form:
        for i, j in self.ij:
            arr[j, i, :] = self._granger_causality[key][i, j]
        return arr

    @desc.setattr_on_read
    def causality_xy(self):
        return self._dict2arr('gc_xy')

    @desc.setattr_on_read
    def causality_yx(self):
        return self._dict2arr('gc_yx')

    @desc.setattr_on_read
    def simultaneous_causality(self):
        return self._dict2arr('gc_sim')

    @desc.setattr_on_read
    def spectral_matrix(self):
        return self._granger_causality['spectral_density']