/usr/share/pyshared/nitime/analysis/granger.py is in python-nitime 0.4-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | """
Analyzers for the calculation of Granger 'causality'
"""
import numpy as np
import nitime.algorithms as alg
import nitime.utils as utils
from nitime import descriptors as desc
from .base import BaseAnalyzer
# To suppport older versions of numpy that don't have tril_indices:
from nitime.index_utils import tril_indices_from
def fit_model(x1, x2, order=None, max_order=10,
criterion=utils.bayesian_information_criterion):
"""
Fit the auto-regressive model used in calculation of Granger 'causality'.
Parameters
----------
x1,x2: float arrays (n)
x1,x2 bivariate combination.
order: int (optional)
If known, the order of the autoregressive process
max_order: int (optional)
If the order is not known, this will be the maximal order to fit.
criterion: callable
A function which defines an information criterion, used to determine the
order of the model.
"""
c_old = np.inf
n_process = 2
Ntotal = n_process * x1.shape[-1]
# If model order was provided as an input:
if order is not None:
lag = order + 1
Rxx = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
coef, ecov = alg.lwr_recursion(np.array(Rxx).transpose(2, 0, 1))
# If the model order is not known and provided as input:
else:
for lag in xrange(1, max_order):
Rxx_new = utils.autocov_vector(np.vstack([x1, x2]), nlags=lag)
coef_new, ecov_new = alg.lwr_recursion(
np.array(Rxx_new).transpose(2, 0, 1))
order_new = coef_new.shape[0]
c_new = criterion(ecov_new, n_process, order_new, Ntotal)
if c_new > c_old:
# Keep the values you got in the last round and break out:
break
else:
# Replace the output values with the new calculated values and
# move on to the next order:
c_old = c_new
order = order_new
Rxx = Rxx_new
coef = coef_new
ecov = ecov_new
else:
e_s = ("Model estimation order did not converge at max_order = %s"
% max_order)
raise ValueError(e_s)
return order, Rxx, coef, ecov
class GrangerAnalyzer(BaseAnalyzer):
"""Analyzer for computing all-to-all Granger 'causality' """
def __init__(self, input=None, ij=None, order=None, max_order=10,
criterion=utils.bayesian_information_criterion, n_freqs=1024):
"""
Initializer for the GrangerAnalyzer.
Parameters
----------
input: nitime TimeSeries object
ij: List of tuples of the form: [(0, 1), (0, 2)], etc.
These are the indices of pairs of time-series for which the
analysis will be done. Defaults to all vs. all.
order: int (optional)
The order of the process. If this is not known, it will be
estimated from the data, using the information criterion
max_order: if the order is estimated, this is the maximal order to
estimate for.
n_freqs: int (optional)
The size of the sampling grid in the frequency domain.
Defaults to 1024
criterion:
XXX
"""
self.data = input.data
self.sampling_rate = input.sampling_rate
self._n_process = input.shape[0]
self._n_freqs = n_freqs
self._order = order
self._criterion = criterion
self._max_order = max_order
if ij is None:
# The following gets the full list of combinations of
# non-same i's and j's:
x, y = np.meshgrid(np.arange(self._n_process),
np.arange(self._n_process))
self.ij = zip(x[tril_indices_from(x, -1)],
y[tril_indices_from(y, -1)])
else:
self.ij = ij
@desc.setattr_on_read
def _model(self):
model = dict(order={}, autocov={}, model_coef={}, error_cov={})
for i, j in self.ij:
model[i, j] = dict()
order_t, Rxx_t, coef_t, ecov_t = fit_model(self.data[i],
self.data[j],
order=self._order,
max_order=self._max_order,
criterion=self._criterion)
model['order'][i, j] = order_t
model['autocov'][i, j] = Rxx_t
model['model_coef'][i, j] = coef_t
model['error_cov'][i, j] = ecov_t
return model
@desc.setattr_on_read
def order(self):
if self._order is None:
return self._model['order']
else:
order = {}
for i, j in self.ij:
order[i, j] = self._order
return order
@desc.setattr_on_read
def autocov(self):
return self._model['autocov']
@desc.setattr_on_read
def model_coef(self):
return self._model['model_coef']
@desc.setattr_on_read
def error_cov(self):
return self._model['error_cov']
@desc.setattr_on_read
def _granger_causality(self):
"""
This returns a dict with the values computed by
:func:`granger_causality_xy`, rather than arrays, so that we can delay
the allocation of arrays as much as possible.
"""
gc = dict(frequencies={}, gc_xy={}, gc_yx={}, gc_sim={},
spectral_density={})
for i, j in self.ij:
w, f_x2y, f_y2x, f_xy, Sw = \
alg.granger_causality_xy(self.model_coef[i, j],
self.error_cov[i, j],
n_freqs=self._n_freqs)
# All other measures are dependent on i, j:
gc['gc_xy'][i, j] = f_x2y
gc['gc_yx'][i, j] = f_y2x
gc['gc_sim'][i, j] = f_xy
gc['spectral_density'][i, j] = Sw
return gc
@desc.setattr_on_read
def frequencies(self):
return utils.get_freqs(self.sampling_rate, self._n_freqs)
def _dict2arr(self, key):
"""
A helper function that will generate an array with all nan's and insert
the measure defined by 'key' into the array and return it to the
calling function. This allows us to get matrices of the measures of
interest, instead of a dict.
"""
# Prepare the matrix for the output:
arr = np.empty((self._n_process,
self._n_process,
self.frequencies.shape[0]))
arr.fill(np.nan)
# 'Translate' from dict form into matrix form:
for i, j in self.ij:
arr[j, i, :] = self._granger_causality[key][i, j]
return arr
@desc.setattr_on_read
def causality_xy(self):
return self._dict2arr('gc_xy')
@desc.setattr_on_read
def causality_yx(self):
return self._dict2arr('gc_yx')
@desc.setattr_on_read
def simultaneous_causality(self):
return self._dict2arr('gc_sim')
@desc.setattr_on_read
def spectral_matrix(self):
return self._granger_causality['spectral_density']
|