/usr/lib/python3/dist-packages/ccdproc/combiner.py is in python3-ccdproc 0.3.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | # Licensed under a 3-clause BSD style license - see LICENSE.rst
# This module implements the combiner class.
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import numpy as np
from numpy import ma
from .ccddata import CCDData
from astropy.stats import median_absolute_deviation
from astropy.nddata import StdDevUncertainty
__all__ = ['Combiner']
class Combiner(object):
"""A class for combining CCDData objects.
The Combiner class is used to combine together CCDData objects
including the method for combining the data, rejecting outlying data,
and weighting used for combining frames
Parameters
-----------
ccd_list : `list`
A list of CCDData objects that will be combined together.
dtype : 'numpy dtype'
Allows user to set dtype.
Raises
------
TypeError
If the `ccd_list` are not `~ccdproc.CCDData` objects, have different
units, or are different shapes
Notes
-----
The following is an example of combining together different
`~ccdproc.CCDData` objects:
>>> from combiner import combiner
>>> c = combiner([ccddata1, cccdata2, ccddata3])
>>> ccdall = c.median_combine()
"""
def __init__(self, ccd_list, dtype=None):
if ccd_list is None:
raise TypeError("ccd_list should be a list of CCDData objects")
if dtype is None:
dtype = np.float64
default_shape = None
default_unit = None
for ccd in ccd_list:
#raise an error if the objects aren't CCDDAata objects
if not isinstance(ccd, CCDData):
raise TypeError("ccd_list should only contain CCDData objects")
#raise an error if the shape is different
if default_shape is None:
default_shape = ccd.shape
else:
if not (default_shape == ccd.shape):
raise TypeError("CCDData objects are not the same size")
#raise an error if the units are different
if default_unit is None:
default_unit = ccd.unit
else:
if not (default_unit == ccd.unit):
raise TypeError("CCDdata objects are not the same unit")
self.ccd_list = ccd_list
self.unit = default_unit
self.weights = None
self._dtype = dtype
#set up the data array
ydim, xdim = default_shape
new_shape = (len(ccd_list), ydim, xdim)
self.data_arr = ma.masked_all(new_shape, dtype=dtype)
#populate self.data_arr
for i, ccd in enumerate(ccd_list):
self.data_arr[i] = ccd.data
if ccd.mask is not None:
self.data_arr.mask[i] = ccd.mask
else:
self.data_arr.mask[i] = ma.zeros((ydim, xdim))
# Must be after self.data_arr is defined because it checks the
# length of the data array.
self.scaling = None
@property
def dtype(self):
return self._dtype
@property
def weights(self):
return self._weights
@weights.setter
def weights(self, value):
if value is not None:
if isinstance(value, np.ndarray):
if value.shape == self.data_arr.data.shape:
self._weights = value
else:
raise ValueError("dimensions of weights do not match data")
else:
raise TypeError("mask must be a Numpy array")
else:
self._weights = None
@property
def scaling(self):
"""
Scaling factor used in combining images.
Parameters
----------
scale : function or array-like or None, optional
Images are multiplied by scaling prior to combining them. Scaling
may be either a function, which will be applied to each image
to determine the scaling factor, or a list or array whose length
is the number of images in the `Combiner`. Default is ``None``.
"""
return self._scaling
@scaling.setter
def scaling(self, value):
if value is None:
self._scaling = value
else:
n_images = self.data_arr.data.shape[0]
if callable(value):
self._scaling = [value(self.data_arr[i]) for
i in range(n_images)]
self._scaling = np.array(self._scaling)
else:
try:
len(value) == n_images
self._scaling = np.array(value)
except TypeError:
raise TypeError("Scaling must be a function or an array "
"the same length as the number of images.")
# reshape so that broadcasting occurs properly
self._scaling = self.scaling[:, np.newaxis, np.newaxis]
#set up min/max clipping algorithms
def minmax_clipping(self, min_clip=None, max_clip=None):
"""Mask all pixels that are below min_clip or above max_clip.
Parameters
-----------
min_clip : None or float
If specified, all pixels with values below min_clip will be masked
max_clip : None or float
If specified, all pixels with values above min_clip will be masked
"""
if min_clip is not None:
mask = (self.data_arr < min_clip)
self.data_arr.mask[mask] = True
if max_clip is not None:
mask = (self.data_arr > max_clip)
self.data_arr.mask[mask] = True
#set up sigma clipping algorithms
def sigma_clipping(self, low_thresh=3, high_thresh=3,
func=ma.mean, dev_func=ma.std):
"""Pixels will be rejected if they have deviations greater than those
set by the threshold values. The algorithm will first calculated
a baseline value using the function specified in func and deviation
based on dev_func and the input data array. Any pixel with a
deviation from the baseline value greater than that set by
high_thresh or lower than that set by low_thresh will be rejected.
Parameters
-----------
low_thresh : positive float or None
Threshold for rejecting pixels that deviate below the baseline
value. If negative value, then will be convert to a positive
value. If None, no rejection will be done based on low_thresh.
high_thresh : positive float or None
Threshold for rejecting pixels that deviate above the baseline
value. If None, no rejection will be done based on high_thresh.
func : function
Function for calculating the baseline values (i.e. mean or median).
This should be a function that can handle
numpy.ma.core.MaskedArray objects.
dev_func : function
Function for calculating the deviation from the baseline value
(i.e. std). This should be a function that can handle
numpy.ma.core.MaskedArray objects.
"""
#check for negative numbers in low_thresh
#setup baseline values
baseline = func(self.data_arr, axis=0)
dev = dev_func(self.data_arr, axis=0)
#reject values
if low_thresh is not None:
if low_thresh < 0:
low_thresh = abs(low_thresh)
mask = (self.data_arr - baseline < -low_thresh * dev)
self.data_arr.mask[mask] = True
if high_thresh is not None:
mask = (self.data_arr - baseline > high_thresh * dev)
self.data_arr.mask[mask] = True
#set up the combining algorithms
def median_combine(self, median_func=ma.median):
"""Median combine a set of arrays.
A CCDData object is returned
with the data property set to the median of the arrays. If the data
was masked or any data have been rejected, those pixels will not be
included in the median. A mask will be returned, and if a pixel
has been rejected in all images, it will be masked. The
uncertainty of the combined image is set by 1.4826 times the median
absolute deviation of all input images.
Parameters
----------
median_func : function, optional
Function that calculates median of a ``numpy.ma.masked_array``.
Default is to use ``np.ma.median`` to calculate median.
Returns
-------
combined_image: `~ccdproc.CCDData`
CCDData object based on the combined input of CCDData objects.
Warnings
--------
The uncertainty currently calculated using the median absolute
deviation does not account for rejected pixels
"""
if self.scaling is not None:
scalings = self.scaling
else:
scalings = 1.0
#set the data
data = median_func(scalings * self.data_arr, axis=0)
#set the mask
mask = self.data_arr.mask.sum(axis=0)
mask = (mask == len(self.data_arr))
#set the uncertainty
uncertainty = 1.4826 * median_absolute_deviation(self.data_arr.data,
axis=0)
# create the combined image with a dtype matching the combiner
combined_image = CCDData(np.asarray(data.data, dtype=self.dtype),
mask=mask, unit=self.unit,
uncertainty=StdDevUncertainty(uncertainty))
#update the meta data
combined_image.meta['NCOMBINE'] = len(self.data_arr)
#return the combined image
return combined_image
def average_combine(self, scale_func=None, scale_to=1.0):
"""Average combine together a set of arrays. A CCDData object is
returned with the data property set to the average of the arrays.
If the data was masked or any data have been rejected, those pixels
will not be included in the median. A mask will be returned, and
if a pixel has been rejected in all images, it will be masked. The
uncertainty of the combined image is set by the standard deviation
of the input images.
Returns
-------
combined_image: `~ccdproc.CCDData`
CCDData object based on the combined input of CCDData objects.
"""
if self.scaling is not None:
scalings = self.scaling
else:
scalings = 1.0
#set up the data
data, wei = ma.average(scalings * self.data_arr,
axis=0, weights=self.weights,
returned=True)
#set up the mask
mask = self.data_arr.mask.sum(axis=0)
mask = (mask == len(self.data_arr))
#set up the deviation
uncertainty = ma.std(self.data_arr, axis=0)
# create the combined image with a dtype that matches the combiner
combined_image = CCDData(np.asarray(data.data, dtype=self.dtype),
mask=mask, unit=self.unit,
uncertainty=StdDevUncertainty(uncertainty))
#update the meta data
combined_image.meta['NCOMBINE'] = len(self.data_arr)
#return the combined image
return combined_image
|