/usr/lib/python3/dist-packages/astroML/density_estimation/bayesian_blocks.py is in python3-astroml 0.3-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | """
Bayesian Block implementation
=============================
Dynamic programming algorithm for finding the optimal adaptive-width histogram.
Based on Scargle et al 2012 [1]_
References
----------
.. [1] http://adsabs.harvard.edu/abs/2012arXiv1207.5578S
"""
import numpy as np
# TODO: implement other fitness functions from appendix B of Scargle 2012
class FitnessFunc(object):
"""Base class for fitness functions
Each fitness function class has the following:
- fitness(...) : compute fitness function.
Arguments accepted by fitness must be among [T_k, N_k, a_k, b_k, c_k]
- prior(N, Ntot) : compute prior on N given a total number of points Ntot
"""
def __init__(self, p0=0.05, gamma=None):
self.p0 = p0
self.gamma = gamma
def validate_input(self, t, x, sigma):
"""Check that input is valid"""
pass
def fitness(**kwargs):
raise NotImplementedError()
def prior(self, N, Ntot):
if self.gamma is None:
return self.p0_prior(N, Ntot)
else:
return self.gamma_prior(N, Ntot)
def p0_prior(self, N, Ntot):
# eq. 21 from Scargle 2012
return 4 - np.log(73.53 * self.p0 * (N ** -0.478))
def gamma_prior(self, N, Ntot):
"""Basic prior, parametrized by gamma (eq. 3 in Scargle 2012)"""
if self.gamma == 1:
return 0
else:
return (np.log(1 - self.gamma)
- np.log(1 - self.gamma ** (Ntot + 1))
+ N * np.log(self.gamma))
# the fitness_args property will return the list of arguments accepted by
# the method fitness(). This allows more efficient computation below.
@property
def args(self):
try:
# Python 2
return self.fitness.func_code.co_varnames[1:]
except AttributeError:
return self.fitness.__code__.co_varnames[1:]
class Events(FitnessFunc):
"""Fitness for binned or unbinned events
Parameters
----------
p0 : float
False alarm probability, used to compute the prior on N
(see eq. 21 of Scargle 2012). Default prior is for p0 = 0.
gamma : float or None
If specified, then use this gamma to compute the general prior form,
p ~ gamma^N. If gamma is specified, p0 is ignored.
"""
def fitness(self, N_k, T_k):
# eq. 19 from Scargle 2012
return N_k * (np.log(N_k) - np.log(T_k))
def prior(self, N, Ntot):
if self.gamma is not None:
return self.gamma_prior(N, Ntot)
else:
# eq. 21 from Scargle 2012
return 4 - np.log(73.53 * self.p0 * (N ** -0.478))
class RegularEvents(FitnessFunc):
"""Fitness for regular events
This is for data which has a fundamental "tick" length, so that all
measured values are multiples of this tick length. In each tick, there
are either zero or one counts.
Parameters
----------
dt : float
tick rate for data
gamma : float
specifies the prior on the number of bins: p ~ gamma^N
"""
def __init__(self, dt, p0=0.05, gamma=None):
self.dt = dt
self.p0 = p0
self.gamma = gamma
def validate_input(self, t, x, sigma):
unique_x = np.unique(x)
if list(unique_x) not in ([0], [1], [0, 1]):
raise ValueError("Regular events must have only 0 and 1 in x")
def fitness(self, T_k, N_k):
# Eq. 75 of Scargle 2012
M_k = T_k / self.dt
N_over_M = N_k * 1. / M_k
eps = 1E-8
if np.any(N_over_M > 1 + eps):
import warnings
warnings.warn('regular events: N/M > 1. '
'Is the time step correct?')
one_m_NM = 1 - N_over_M
N_over_M[N_over_M <= 0] = 1
one_m_NM[one_m_NM <= 0] = 1
return N_k * np.log(N_over_M) + (M_k - N_k) * np.log(one_m_NM)
class PointMeasures(FitnessFunc):
"""Fitness for point measures
Parameters
----------
gamma : float
specifies the prior on the number of bins: p ~ gamma^N
if gamma is not specified, then a prior based on simulations
will be used (see sec 3.3 of Scargle 2012)
"""
def __init__(self, p0=None, gamma=None):
self.p0 = p0
self.gamma = gamma
def fitness(self, a_k, b_k):
# eq. 41 from Scargle 2012
return (b_k * b_k) / (4 * a_k)
def prior(self, N, Ntot):
if self.gamma is not None:
return self.gamma_prior(N, Ntot)
elif self.p0 is not None:
return self.p0_prior(N, Ntot)
else:
# eq. at end of sec 3.3 in Scargle 2012
return 1.32 + 0.577 * np.log10(N)
def bayesian_blocks(t, x=None, sigma=None,
fitness='events', **kwargs):
"""Bayesian Blocks Implementation
This is a flexible implementation of the Bayesian Blocks algorithm
described in Scargle 2012 [1]_
Parameters
----------
t : array_like
data times (one dimensional, length N)
x : array_like (optional)
data values
sigma : array_like or float (optional)
data errors
fitness : str or object
the fitness function to use.
If a string, the following options are supported:
- 'events' : binned or unbinned event data
extra arguments are `p0`, which gives the false alarm probability
to compute the prior, or `gamma` which gives the slope of the
prior on the number of bins.
- 'regular_events' : non-overlapping events measured at multiples
of a fundamental tick rate, `dt`, which must be specified as an
additional argument. The prior can be specified through `gamma`,
which gives the slope of the prior on the number of bins.
- 'measures' : fitness for a measured sequence with Gaussian errors
The prior can be specified using `gamma`, which gives the slope
of the prior on the number of bins. If `gamma` is not specified,
then a simulation-derived prior will be used.
Alternatively, the fitness can be a user-specified object of
type derived from the FitnessFunc class.
Returns
-------
edges : ndarray
array containing the (N+1) bin edges
Examples
--------
Event data:
>>> t = np.random.normal(size=100)
>>> bins = bayesian_blocks(t, fitness='events', p0=0.01)
Event data with repeats:
>>> t = np.random.normal(size=100)
>>> t[80:] = t[:20]
>>> bins = bayesian_blocks(t, fitness='events', p0=0.01)
Regular event data:
>>> dt = 0.01
>>> t = dt * np.arange(1000)
>>> x = np.zeros(len(t))
>>> x[np.random.randint(0, len(t), len(t) / 10)] = 1
>>> bins = bayesian_blocks(t, fitness='regular_events', dt=dt, gamma=0.9)
Measured point data with errors:
>>> t = 100 * np.random.random(100)
>>> x = np.exp(-0.5 * (t - 50) ** 2)
>>> sigma = 0.1
>>> x_obs = np.random.normal(x, sigma)
>>> bins = bayesian_blocks(t, fitness='measures')
References
----------
.. [1] Scargle, J `et al.` (2012)
http://adsabs.harvard.edu/abs/2012arXiv1207.5578S
See Also
--------
astroML.plotting.hist : histogram plotting function which can make use
of bayesian blocks.
"""
# validate array input
t = np.asarray(t, dtype=float)
if x is not None:
x = np.asarray(x)
if sigma is not None:
sigma = np.asarray(sigma)
# verify the fitness function
if fitness == 'events':
if x is not None and np.any(x % 1 > 0):
raise ValueError("x must be integer counts for fitness='events'")
fitfunc = Events(**kwargs)
elif fitness == 'regular_events':
if x is not None and (np.any(x % 1 > 0) or np.any(x > 1)):
raise ValueError("x must be 0 or 1 for fitness='regular_events'")
fitfunc = RegularEvents(**kwargs)
elif fitness == 'measures':
if x is None:
raise ValueError("x must be specified for fitness='measures'")
fitfunc = PointMeasures(**kwargs)
else:
if not (hasattr(fitness, 'args') and
hasattr(fitness, 'fitness') and
hasattr(fitness, 'prior')):
raise ValueError("fitness not understood")
fitfunc = fitness
# find unique values of t
t = np.array(t, dtype=float)
assert t.ndim == 1
unq_t, unq_ind, unq_inv = np.unique(t, return_index=True,
return_inverse=True)
# if x is not specified, x will be counts at each time
if x is None:
if sigma is not None:
raise ValueError("If sigma is specified, x must be specified")
if len(unq_t) == len(t):
x = np.ones_like(t)
else:
x = np.bincount(unq_inv)
t = unq_t
sigma = 1
# if x is specified, then we need to sort t and x together
else:
x = np.asarray(x)
if len(t) != len(x):
raise ValueError("Size of t and x does not match")
if len(unq_t) != len(t):
raise ValueError("Repeated values in t not supported when "
"x is specified")
t = unq_t
x = x[unq_ind]
# verify the given sigma value
N = t.size
if sigma is not None:
sigma = np.asarray(sigma)
if sigma.shape not in [(), (1,), (N,)]:
raise ValueError('sigma does not match the shape of x')
else:
sigma = 1
# validate the input
fitfunc.validate_input(t, x, sigma)
# compute values needed for computation, below
if 'a_k' in fitfunc.args:
ak_raw = np.ones_like(x) / sigma / sigma
if 'b_k' in fitfunc.args:
bk_raw = x / sigma / sigma
if 'c_k' in fitfunc.args:
ck_raw = x * x / sigma / sigma
# create length-(N + 1) array of cell edges
edges = np.concatenate([t[:1],
0.5 * (t[1:] + t[:-1]),
t[-1:]])
block_length = t[-1] - edges
# arrays to store the best configuration
best = np.zeros(N, dtype=float)
last = np.zeros(N, dtype=int)
#-----------------------------------------------------------------
# Start with first data cell; add one cell at each iteration
#-----------------------------------------------------------------
for R in range(N):
# Compute fit_vec : fitness of putative last block (end at R)
kwds = {}
# T_k: width/duration of each block
if 'T_k' in fitfunc.args:
kwds['T_k'] = block_length[:R + 1] - block_length[R + 1]
# N_k: number of elements in each block
if 'N_k' in fitfunc.args:
kwds['N_k'] = np.cumsum(x[:R + 1][::-1])[::-1]
# a_k: eq. 31
if 'a_k' in fitfunc.args:
kwds['a_k'] = 0.5 * np.cumsum(ak_raw[:R + 1][::-1])[::-1]
# b_k: eq. 32
if 'b_k' in fitfunc.args:
kwds['b_k'] = - np.cumsum(bk_raw[:R + 1][::-1])[::-1]
# c_k: eq. 33
if 'c_k' in fitfunc.args:
kwds['c_k'] = 0.5 * np.cumsum(ck_raw[:R + 1][::-1])[::-1]
# evaluate fitness function
fit_vec = fitfunc.fitness(**kwds)
A_R = fit_vec - fitfunc.prior(R + 1, N)
A_R[1:] += best[:R]
i_max = np.argmax(A_R)
last[R] = i_max
best[R] = A_R[i_max]
#-----------------------------------------------------------------
# Now find changepoints by iteratively peeling off the last block
#-----------------------------------------------------------------
change_points = np.zeros(N, dtype=int)
i_cp = N
ind = N
while True:
i_cp -= 1
change_points[i_cp] = ind
if ind == 0:
break
ind = last[ind - 1]
change_points = change_points[i_cp:]
return edges[change_points]
|