/usr/share/pyshared/mvpa/featsel/rfe.py is in python-mvpa 0.4.8-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the PyMVPA package for the
# copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Recursive feature elimination."""
__docformat__ = 'restructuredtext'
from mvpa.clfs.transerror import ClassifierError
from mvpa.measures.base import Sensitivity
from mvpa.featsel.base import FeatureSelection
from mvpa.featsel.helpers import BestDetector, \
NBackHistoryStopCrit, \
FractionTailSelector
from numpy import arange
from mvpa.misc.state import StateVariable
if __debug__:
from mvpa.base import debug
# TODO: Abs value of sensitivity should be able to rule RFE
# Often it is what abs value of the sensitivity is what matters.
# So we should either provide a simple decorator around arbitrary
# FeatureSelector to convert sensitivities to abs values before calling
# actual selector, or a decorator around SensitivityEstimators
class RFE(FeatureSelection):
"""Recursive feature elimination.
A `FeaturewiseDatasetMeasure` is used to compute sensitivity maps given a
certain dataset. These sensitivity maps are in turn used to discard
unimportant features. For each feature selection the transfer error on some
testdatset is computed. This procedure is repeated until a given
`StoppingCriterion` is reached.
Such strategy after
Guyon, I., Weston, J., Barnhill, S., & Vapnik, V. (2002). Gene
selection for cancer classification using support vector
machines. Mach. Learn., 46(1-3), 389--422.
was applied to SVM-based analysis of fMRI data in
Hanson, S. J. & Halchenko, Y. O. (2008). Brain reading using
full brain support vector machines for object recognition:
there is no "face identification area". Neural Computation, 20,
486--503.
"""
# TODO: remove
# doesn't work nicely -- if FeatureSelection defines its states via
# _register_states, they would simply be ignored
#_register_states = {'errors':True,
# 'nfeatures':True,
# 'history':True}
errors = StateVariable()
nfeatures = StateVariable()
history = StateVariable()
sensitivities = StateVariable(enabled=False)
def __init__(self,
sensitivity_analyzer,
transfer_error,
feature_selector=FractionTailSelector(0.05),
bestdetector=BestDetector(),
stopping_criterion=NBackHistoryStopCrit(BestDetector()),
train_clf=None,
update_sensitivity=True,
**kargs
):
# XXX Allow for multiple stopping criterions, e.g. error not decreasing
# anymore OR number of features less than threshold
"""Initialize recursive feature elimination
:Parameters:
sensitivity_analyzer : FeaturewiseDatasetMeasure object
transfer_error : TransferError object
used to compute the transfer error of a classifier based on a
certain feature set on the test dataset.
NOTE: If sensitivity analyzer is based on the same
classifier as transfer_error is using, make sure you
initialize transfer_error with train=False, otherwise
it would train classifier twice without any necessity.
feature_selector : Functor
Given a sensitivity map it has to return the ids of those
features that should be kept.
bestdetector : Functor
Given a list of error values it has to return a boolean that
signals whether the latest error value is the total minimum.
stopping_criterion : Functor
Given a list of error values it has to return whether the
criterion is fulfilled.
train_clf : bool
Flag whether the classifier in `transfer_error` should be
trained before computing the error. In general this is
required, but if the `sensitivity_analyzer` and
`transfer_error` share and make use of the same classifier it
can be switched off to save CPU cycles. Default `None` checks
if sensitivity_analyzer is based on a classifier and doesn't train
if so.
update_sensitivity : bool
If False the sensitivity map is only computed once and reused
for each iteration. Otherwise the senstitivities are
recomputed at each selection step.
"""
# base init first
FeatureSelection.__init__(self, **kargs)
self.__sensitivity_analyzer = sensitivity_analyzer
"""Sensitivity analyzer used to call at each step."""
self.__transfer_error = transfer_error
"""Compute transfer error for each feature set."""
self.__feature_selector = feature_selector
"""Functor which takes care about removing some features."""
self.__stopping_criterion = stopping_criterion
self.__bestdetector = bestdetector
if train_clf is None:
self.__train_clf = isinstance(sensitivity_analyzer,
Sensitivity)
else:
self.__train_clf = train_clf
"""Flag whether training classifier is required."""
self.__update_sensitivity = update_sensitivity
"""Flag whether sensitivity map is recomputed for each step."""
# force clf training when sensitivities are not updated as otherwise
# shared classifiers are not retrained
if not self.__update_sensitivity \
and isinstance(self.__transfer_error, ClassifierError) \
and not self.__train_clf:
if __debug__:
debug("RFEC", "Forcing training of classifier since " +
"sensitivities aren't updated at each step")
self.__train_clf = True
def __call__(self, dataset, testdataset):
"""Proceed and select the features recursively eliminating less
important ones.
:Parameters:
dataset : Dataset
used to compute sensitivity maps and train a classifier
to determine the transfer error
testdataset : Dataset
used to test the trained classifer to determine the
transfer error
Returns a tuple of two new datasets with the feature subset of
`dataset` that had the lowest transfer error of all tested
sets until the stopping criterion was reached. The first
dataset is the feature subset of the training data and the
second the selection of the test dataset.
"""
errors = []
"""Computed error for each tested features set."""
self.nfeatures = []
"""Number of features at each step. Since it is not used by the
algorithm it is stored directly in the state variable"""
self.history = arange(dataset.nfeatures)
"""Store the last step # when the feature was still present
"""
self.sensitivities = []
stop = False
"""Flag when RFE should be stopped."""
results = None
"""Will hold the best feature set ever."""
wdataset = dataset
"""Operate on working dataset initially identical."""
wtestdataset = testdataset
"""Same feature selection has to be performs on test dataset as well.
This will hold the current testdataset."""
step = 0
"""Counter how many selection step where done."""
orig_feature_ids = arange(dataset.nfeatures)
"""List of feature Ids as per original dataset remaining at any given
step"""
sensitivity = None
"""Contains the latest sensitivity map."""
result_selected_ids = orig_feature_ids
"""Resultant ids of selected features. Since the best is not
necessarily is the last - we better keep this one around. By
default -- all features are there"""
selected_ids = result_selected_ids
while wdataset.nfeatures > 0:
if __debug__:
debug('RFEC',
"Step %d: nfeatures=%d" % (step, wdataset.nfeatures))
# mark the features which are present at this step
# if it brings anyb mentionable computational burden in the future,
# only mark on removed features at each step
self.history[orig_feature_ids] = step
# Compute sensitivity map
if self.__update_sensitivity or sensitivity == None:
sensitivity = self.__sensitivity_analyzer(wdataset)
if self.states.isEnabled("sensitivities"):
self.sensitivities.append(sensitivity)
# do not retrain clf if not necessary
if self.__train_clf:
error = self.__transfer_error(wtestdataset, wdataset)
else:
error = self.__transfer_error(wtestdataset, None)
# Record the error
errors.append(error)
# Check if it is time to stop and if we got
# the best result
stop = self.__stopping_criterion(errors)
isthebest = self.__bestdetector(errors)
nfeatures = wdataset.nfeatures
if self.states.isEnabled("nfeatures"):
self.nfeatures.append(wdataset.nfeatures)
# store result
if isthebest:
results = (wdataset, wtestdataset)
result_selected_ids = orig_feature_ids
if __debug__:
debug('RFEC',
"Step %d: nfeatures=%d error=%.4f best/stop=%d/%d " %
(step, nfeatures, error, isthebest, stop))
# stop if it is time to finish
if nfeatures == 1 or stop:
break
# Select features to preserve
selected_ids = self.__feature_selector(sensitivity)
if __debug__:
debug('RFEC_',
"Sensitivity: %s, nfeatures_selected=%d, selected_ids: %s" %
(sensitivity, len(selected_ids), selected_ids))
# Create a dataset only with selected features
wdataset = wdataset.selectFeatures(selected_ids)
# select corresponding sensitivity values if they are not
# recomputed
if not self.__update_sensitivity:
sensitivity = sensitivity[selected_ids]
# need to update the test dataset as well
# XXX why should it ever become None?
# yoh: because we can have __transfer_error computed
# using wdataset. See xia-generalization estimate
# in lightsvm. Or for god's sake leave-one-out
# on a wdataset
# TODO: document these cases in this class
if not testdataset is None:
wtestdataset = wtestdataset.selectFeatures(selected_ids)
step += 1
# WARNING: THIS MUST BE THE LAST THING TO DO ON selected_ids
selected_ids.sort()
if self.states.isEnabled("history") or self.states.isEnabled('selected_ids'):
orig_feature_ids = orig_feature_ids[selected_ids]
if hasattr(self.__transfer_error, "clf"):
self.__transfer_error.clf.untrain()
# charge state variables
self.errors = errors
self.selected_ids = result_selected_ids
# best dataset ever is returned
return results
|