/usr/lib/python2.7/dist-packages/dipy/tracking/markov.py is in python-dipy 0.10.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 | # -*- coding: utf-8 -*-
"""Implemention of various Tractography methods.
these tools are meant to be paired with diffusion reconstruction methods from
dipy.reconst
This module uses the trackvis coordinate system, for more information about
this coordinate system please see dipy.tracking.utils
The following modules also use this coordinate system:
dipy.tracking.utils
dipy.tracking.integration
dipy.reconst.interpolate
"""
from __future__ import division, print_function, absolute_import
from ..utils.six.moves import xrange
import numpy as np
from ..reconst.interpolate import OutsideImage, NearestNeighborInterpolator
from dipy.direction.peaks import default_sphere, peak_directions
from . import utils
class DirectionFinder(object):
sphere = default_sphere
relative_peak_threshold = .5
min_seperation_angle = 45
def __call__(self, fit):
discrete_odf = fit.odf(self.sphere)
directions, _, _ = peak_directions(discrete_odf, self.sphere,
self.relative_peak_threshold,
self.min_seperation_angle)
return directions
class BoundaryStepper(object):
"""Steps along a direction past the closest voxel boundary
Parameters
----------
voxel_size : array-like
Size of voxels in data volume
overstep : float
A small number used to prevent the track from getting stuck at the
edge of a voxel.
"""
def __init__(self, voxel_size=(1, 1, 1), overstep=.1):
self.overstep = overstep
self.voxel_size = np.array(voxel_size, 'float')
def __call__(self, location, step):
"""takes a step just past the edge of the next voxel along step
given a location and a step, finds the smallest step needed to move
into the next voxel
Parameters
----------
location : ndarray, (3,)
location to integrate from
step : ndarray, (3,)
direction in 3 space to integrate along
"""
step_sizes = self.voxel_size * (~np.signbit(step))
step_sizes -= location % self.voxel_size
step_sizes /= step
smallest_step = min(step_sizes) + self.overstep
return location + smallest_step * step
class FixedSizeStepper(object):
"""A stepper that uses a fixed step size"""
def __init__(self, step_size=.5):
self.step_size = step_size
def __call__(self, location, step):
"""Takes a step of step_size from location"""
new_location = self.step_size * step + location
return new_location
def markov_streamline(get_direction, take_step, seed, first_step, maxlen):
"""Creates a streamline from seed
Parameters
----------
get_direction : callable
This function should return a direction for the streamline given a
location and the previous direction.
take_step : callable
Take step should take a step from a location given a direction.
seed : array (3,)
The seed point of the streamline
first_step : array (3,)
A unit vector giving the direction of the first step
maxlen : int
The maximum number of segments allowed in the streamline. This is good
for preventing infinite loops.
Returns
-------
streamline : array (N, 3)
A streamline.
"""
streamline = []
location = seed
direction = first_step
try:
for i in xrange(maxlen):
streamline.append(location)
location = take_step(location, direction)
direction = get_direction(location, direction)
if direction is None:
streamline.append(location)
break
except OutsideImage:
pass
return np.array(streamline)
class MarkovIntegrator(object):
"""An abstract class for fiber-tracking"""
_get_directions = DirectionFinder()
def __init__(self, model, interpolator, mask, take_step, angle_limit,
seeds, max_cross=None, maxlen=500, mask_voxel_size=None,
affine=None):
"""Creates streamlines by using a Markov approach.
Parameters
----------
model : model
The model used to fit diffusion data.
interpolator : interpolator
Diffusion weighted data wrapped in an interpolator. Data should be
normalized.
mask : array, 3D
Used to confine tracking, streamlines are terminated if the
tracking leaves the mask.
take_step : callable
Determines the length of each step.
angle_limit : float [0, 90]
Maximum angle allowed between successive steps of the streamline.
seeds : array (N, 3)
Points to seed the tracking. Seed points should be given in point
space of the track (see ``affine``).
max_cross : int or None
The maximum number of direction to track from each seed in crossing
voxels. By default track all peaks of the odf, otherwise track the
largest `max_cross` peaks.
maxlen : int
Maximum number of steps to track from seed. Used to prevent
infinite loops.
mask_voxel_size : array (3,)
Voxel size for the mask. `mask` should cover the same FOV as data,
but it can have a different voxel size. Same as the data by
default.
affine : array (4, 4)
Coordinate space for the streamline point with respect to voxel
indices of input data.
"""
self.model = model
self.interpolator = interpolator
self.seeds = seeds
self.max_cross = max_cross
self.maxlen = maxlen
voxel_size = np.asarray(interpolator.voxel_size)
self._tracking_space = tracking_space = np.eye(4)
tracking_space[[0, 1, 2], [0, 1, 2]] = voxel_size
tracking_space[:3, 3] = voxel_size / 2.
if affine is None:
self.affine = tracking_space.copy()
else:
self.affine = affine
self._take_step = take_step
self._cos_similarity = np.cos(np.deg2rad(angle_limit))
if mask_voxel_size is None:
if mask.shape != interpolator.data.shape[:-1]:
raise ValueError("The shape of the mask and the shape of the "
"data do not match")
mask_voxel_size = interpolator.voxel_size
else:
mask_voxel_size = np.asarray(mask_voxel_size)
mask_FOV = mask_voxel_size * mask.shape
data_FOV = interpolator.voxel_size * interpolator.data.shape[:-1]
if not np.allclose(mask_FOV, data_FOV):
raise ValueError("The FOV of the data and the FOV of the mask "
"do not match")
self._mask = NearestNeighborInterpolator(mask.copy(), mask_voxel_size)
def __iter__(self):
# Check that seeds are reasonable
seeds = np.asarray(self.seeds)
if seeds.ndim != 2 or seeds.shape[1] != 3:
raise ValueError("Seeds should be an (N, 3) array of points")
# Compute affine from point space to tracking space, apply to seeds
inv_A = np.dot(self._tracking_space, np.linalg.inv(self.affine))
tracking_space_seeds = np.dot(seeds, inv_A[:3, :3].T) + inv_A[:3, 3]
# Make tracks, move them to point space and return
track = self._generate_streamlines(tracking_space_seeds)
return utils.move_streamlines(track, output_space=self.affine,
input_space=self._tracking_space)
def _generate_streamlines(self, seeds):
"""A streamline generator"""
for s in seeds:
directions = self._next_step(s, prev_step=None)
directions = directions[:self.max_cross]
for first_step in directions:
F = markov_streamline(self._next_step, self._take_step, s,
first_step, self.maxlen)
first_step = -first_step
B = markov_streamline(self._next_step, self._take_step, s,
first_step, self.maxlen)
yield np.concatenate([B[:0:-1], F], axis=0)
def _closest_peak(peak_directions, prev_step, cos_similarity):
"""Return the closest direction to prev_step from peak_directions.
All directions should be unit vectors. Antipodal symmetry is assumed, ie
direction x is the same as -x.
Parameters
----------
peak_directions : array (N, 3)
N unit vectors.
prev_step : array (3,) or None
Previous direction.
cos_similarity : float
`cos(max_angle)` where `max_angle` is the maximum allowed angle between
prev_step and the returned direction.
Returns
-------
direction : array or None
If prev_step is None, returns peak_directions. Otherwise returns the
closest direction to prev_step. If no directions are close enough to
prev_step, returns None
"""
if prev_step is None:
return peak_directions
if len(peak_directions) == 0:
return None
peak_dots = np.dot(peak_directions, prev_step)
closest_peak = abs(peak_dots).argmax()
dot_closest_peak = peak_dots[closest_peak]
if dot_closest_peak >= cos_similarity:
return peak_directions[closest_peak]
elif dot_closest_peak <= -cos_similarity:
return -peak_directions[closest_peak]
else:
return None
class ClosestDirectionTracker(MarkovIntegrator):
def _next_step(self, location, prev_step):
"""Returns the direction closest to prev_step at location
Fits the data from location using model and returns the tracking
direction closest to prev_step. If prev_step is None, all the
directions are returned.
Parameters
----------
location : point in space
location is passed to the interpolator in order to get data
prev_step : array_like (3,)
the direction of the previous tracking step
"""
if not self._mask[location]:
return None
vox_data = self.interpolator[location]
fit = self.model.fit(vox_data)
directions = self._get_directions(fit)
return _closest_peak(directions, prev_step, self._cos_similarity)
class ProbabilisticOdfWeightedTracker(MarkovIntegrator):
"""A stochastic (probabilistic) fiber tracking method
Stochastically tracks streamlines by randomly choosing directions from
sphere. The likelihood of a direction being chosen is taken from
`model.fit(data).odf(sphere)`. Negative values are set to 0. If no
directions less than `angle_limit` degrees are from the incoming direction
have a positive likelihood, the streamline is terminated.
Parameters
----------
model : model
The model used to fit diffusion data.
interpolator : interpolator
Diffusion weighted data wrapped in an interpolator. Data should be
normalized.
mask : array, 3D
Used to confine tracking, streamlines end when they leave the mask.
take_step : callable
Determines the length of each step.
angle_limit : float [0, 90]
The angle between successive steps in the streamlines cannot be more
than `angle_limit` degrees.
seeds : array (N, 3)
Points to seed the tracking.
sphere : Sphere
sphere used to evaluate the likelihood. A Sphere or a HemiSphere can be
used here. A HemiSphere is more efficient.
max_cross : int or None
Max number of directions to follow at each seed. By default follow all
peaks of the odf.
maxlen : int
Maximum number of segments to follow from seed. Used to prevent
infinite loops.
mask_voxel_size : array (3,)
Voxel size for the mask. `mask` should cover the same FOV as data, but
it can have a different voxel size. Same as the data by default.
Notes
-----
The tracker is based on a method described in [1]_ and [2]_ as fiber
orientation distribution (FOD) sampling.
References
----------
.. [1] Jeurissen, B., Leemans, A., Jones, D. K., Tournier, J.-D., & Sijbers,
J. (2011). Probabilistic fiber tracking using the residual bootstrap
with constrained spherical deconvolution. Human Brain Mapping, 32(3),
461-479. doi:10.1002/hbm.21032
.. [2] J-D. Tournier, F. Calamante, D. G. Gadian, A. Connelly (2005).
Probabilistic fibre tracking through regions containing crossing
fibres. http://cds.ismrm.org/ismrm-2005/Files/01343.pdf
"""
def __init__(self, model, interpolator, mask, take_step, angle_limit,
seeds, sphere, max_cross=None, maxlen=500,
mask_voxel_size=None, affine=None):
MarkovIntegrator.__init__(self, model, interpolator, mask, take_step,
angle_limit, seeds, max_cross, maxlen,
mask_voxel_size, affine)
self.sphere = sphere
self._set_adjacency_matrix(sphere, self._cos_similarity)
self._get_directions.sphere = sphere
def _set_adjacency_matrix(self, sphere, cos_similarity):
"""A boolean array of where the angle between vertices i and j of
sphere is less than `angle_limit` apart."""
matrix = np.dot(sphere.vertices, sphere.vertices.T)
matrix = abs(matrix) >= cos_similarity
keys = [tuple(v) for v in sphere.vertices]
adj_matrix = dict(zip(keys, matrix))
keys = [tuple(-v) for v in sphere.vertices]
adj_matrix.update(zip(keys, matrix))
self._adj_matrix = adj_matrix
def _next_step(self, location, prev_step):
"""Returns the direction closest to prev_step at location
Fits the data from location using model and returns the tracking
direction closest to prev_step. If prev_step is None, all the
directions are returned.
Parameters
----------
location : point in space
location is passed to the interpolator in order to get data
prev_step : array_like (3,)
the direction of the previous tracking step
"""
if not self._mask[location]:
return None
vox_data = self.interpolator[location]
fit = self.model.fit(vox_data)
if prev_step is None:
return self._get_directions(fit)
odf = fit.odf(self.sphere)
odf.clip(0, out=odf)
cdf = (self._adj_matrix[tuple(prev_step)] * odf).cumsum()
if cdf[-1] == 0:
return None
random_sample = np.random.random() * cdf[-1]
idx = cdf.searchsorted(random_sample, 'right')
direction = self.sphere.vertices[idx]
if np.dot(direction, prev_step) > 0:
return direction
else:
return -direction
class CDT_NNO(ClosestDirectionTracker):
"""ClosestDirectionTracker optimized for NearestNeighbor interpolator
For use with Nearest Neighbor interpolation, directions at each voxel are
remembered to avoid recalculating.
Parameters
----------
model : model
A model used to fit data. Should return a some fit object with
directions.
interpolator : interpolator
A NearestNeighbor interpolator, for other interpolators do not use this
class.
angle_limit : float [0, 90]
Maximum angle allowed between prev_step and next_step.
"""
def __init__(self, model, interpolator, mask, take_step, angle_limit,
seeds, max_cross=None, maxlen=500, mask_voxel_size=None,
affine=None):
if not isinstance(interpolator, NearestNeighborInterpolator):
msg = ("CDT_NNO is an optimized version of "
"ClosestDirectionTracker that requires a "
"NearestNeighborInterpolator")
raise ValueError(msg)
ClosestDirectionTracker.__init__(self, model, interpolator, mask,
take_step, angle_limit, seeds,
max_cross=max_cross, maxlen=maxlen,
mask_voxel_size=mask_voxel_size,
affine=None)
self._data = self.interpolator.data
self._voxel_size = self.interpolator.voxel_size
self.reset_cache()
def reset_cache(self):
"""Clear saved directions"""
lookup = np.empty(self._data.shape[:-1], 'int')
lookup.fill(-1)
self._lookup = lookup
self._peaks = []
def _next_step(self, location, prev_step):
"""Returns the direction closest to prev_step at location"""
if not self._mask[location]:
return None
vox_loc = tuple(location // self._voxel_size)
hash = self._lookup[vox_loc]
if hash >= 0:
directions = self._peaks[hash]
else:
vox_data = self._data[vox_loc]
fit = self.model.fit(vox_data)
directions = self._get_directions(fit)
self._lookup[vox_loc] = len(self._peaks)
self._peaks.append(directions)
return _closest_peak(directions, prev_step, self._cos_similarity)
|