This file is indexed.

/usr/lib/python3/dist-packages/radio_beam/multiple_beams.py is in python3-radio-beam 0.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from astropy import units as u
from astropy.io import fits
from astropy import constants
from astropy import wcs
from astropy.extern import six
import numpy as np
import warnings

from .beam import Beam, _to_area, SIGMA_TO_FWHM
# from .commonbeam import commonbeam


class Beams(u.Quantity):
    """
    An object to handle a set of radio beams for a data cube.
    """

    def __new__(cls, major=None, minor=None, pa=None,
                areas=None, default_unit=u.arcsec, meta=None):
        """
        Create a new set of Gaussian beams

        Parameters
        ----------
        major : :class:`~astropy.units.Quantity` with angular equivalency
        minor : :class:`~astropy.units.Quantity` with angular equivalency
        pa : :class:`~astropy.units.Quantity` with angular equivalency
        area : :class:`~astropy.units.Quantity` with steradian equivalency
        default_unit : :class:`~astropy.units.Unit`
            The unit to impose on major, minor if they are specified as floats
        """

        # improve to some kwargs magic later

        # error checking

        # ... given an area make a round beam assuming it is Gaussian
        if areas is not None:
            rad = np.sqrt(areas / (2 * np.pi)) * u.deg
            major = rad * SIGMA_TO_FWHM
            minor = rad * SIGMA_TO_FWHM
            pa = np.zeros_like(areas) * u.deg

        # give specified values priority
        if major is not None:
            if u.deg.is_equivalent(major.unit):
                pass
            else:
                warnings.warn("Assuming major axes has been specified in degrees")
                major = major * u.deg
        if minor is not None:
            if u.deg.is_equivalent(minor.unit):
                pass
            else:
                warnings.warn("Assuming minor axes has been specified in degrees")
                minor = minor * u.deg
        if pa is not None:
            if len(pa) != len(major):
                raise ValueError("Number of position angles must match number of major axis lengths")
            if u.deg.is_equivalent(pa.unit):
                pass
            else:
                warnings.warn("Assuming position angles has been specified in degrees")
                pa = pa * u.deg
        else:
            pa = np.zeros_like(major.value) * u.deg

        # some sensible defaults
        if minor is None:
            minor = major
        elif len(minor) != len(major):
            raise ValueError("Minor and major axes must have same number of values")

        self = super(Beams, cls).__new__(cls, value=_to_area(major, minor).value, unit=u.sr)
        self.major = major
        self.minor = minor
        self.pa = pa
        self.default_unit = default_unit

        if meta is None:
            self.meta = [{}]*len(self)
        else:
            self.meta = meta

        return self

    @property
    def meta(self):
        return self._meta

    @meta.setter
    def meta(self, value):
        if len(value) == len(self):
            self._meta = value
        else:
            raise TypeError("metadata must be a list of dictionaries")

    def __len__(self):
        return len(self.major)


    @property
    def isfinite(self):
        return ((self.major > 0) & (self.minor > 0) & np.isfinite(self.major) &
                np.isfinite(self.minor) & np.isfinite(self.pa))

    def __getslice__(self, start, stop, increment=None):
        return self.__getitem__(slice(start, stop, increment))

    def __getitem__(self, view):
        if isinstance(view, int):
            return Beam(major=self.major[view],
                        minor=self.minor[view],
                        pa=self.pa[view],
                        meta=self.meta[view])
        elif isinstance(view, slice):
            return Beams(major=self.major[view],
                         minor=self.minor[view],
                         pa=self.pa[view],
                         meta=self.meta[view])
        elif isinstance(view, np.ndarray):
            if view.dtype.name != 'bool':
                raise ValueError("If using an array to index beams, it must "
                                 "be a boolean array.")
            return Beams(major=self.major[view],
                         minor=self.minor[view],
                         pa=self.pa[view],
                         meta=[x for ii,x in zip(view, self.meta) if ii])
        else:
            raise ValueError("Invalid slice")

    def __array_finalize__(self, obj):
        # If our unit is not set and obj has a valid one, use it.
        if self._unit is None:
            unit = getattr(obj, '_unit', None)
            if unit is not None:
                self._set_unit(unit)

        if isinstance(obj, Beams):
            self.major = obj.major
            self.minajor = obj.minor
            self.pa = obj.pa
            self.meta = obj.meta

        # Copy info if the original had `info` defined.  Because of the way the
        # DataInfo works, `'info' in obj.__dict__` is False until the
        # `info` attribute is accessed or set.  Note that `obj` can be an
        # ndarray which doesn't have a `__dict__`.
        if 'info' in getattr(obj, '__dict__', ()):
            self.info = obj.info

    @property
    def sr(self):
        return _to_area(self.major, self.minor)

    @classmethod
    def from_fits_bintable(cls, bintable):
        """
        Instantiate a Beams list from a bintable from a CASA-produced image
        HDU.

        Parameters
        ----------
        bintable : fits.BinTableHDU
            The table data containing the beam information

        Returns
        -------
        beams : Beams
            A new Beams object
        """
        major = u.Quantity(bintable.data['BMAJ'], u.arcsec)
        minor = u.Quantity(bintable.data['BMIN'], u.arcsec)
        pa = u.Quantity(bintable.data['BPA'], u.deg)
        meta = [{key: row[key] for key in bintable.columns.names
                 if key not in ('BMAJ', 'BPA', 'BMIN')}
                for row in bintable.data]

        return cls(major=major, minor=minor, pa=pa, meta=meta)

    @classmethod
    def from_casa_image(cls, imagename):
        '''
        Instantiate beams from a CASA image. Cannot currently handle beams for
        different polarizations.

        ** Must be run in a CASA environment! **

        Parameters
        ----------
        imagename : str
            Name of CASA image.
        '''

        try:
            import casac
        except ImportError:
            raise ImportError("Could not import CASA (casac) and therefore"
                              " cannot read CASA .image files")

        ia.open(imagename)
        beam_props = ia.restoringbeam()
        ia.close()

        nchans = beam_props['nChannels']

        # Assuming there is always a 0th channel...
        maj_unit = u.Unit(beam_props['beams']['*0']['*0']['major']['unit'])
        min_unit = u.Unit(beam_props['beams']['*0']['*0']['minor']['unit'])
        pa_unit = u.Unit(beam_props['beams']['*0']['*0']['positionangle']['unit'])

        major = np.empty((nchans)) * maj_unit
        minor = np.empty((nchans)) * min_unit
        pa = np.empty((nchans)) * pa_unit

        for chan in range(nchans):

            chan_name = '*{}'.format(chan)
            chanbeam_props = beam_props['beams'][chan_name]['*0']

            # Can CASA have mixes of units between channels? Let's test just
            # in case
            assert maj_unit == u.Unit(chanbeam_props['major']['unit'])
            assert min_unit == u.Unit(chanbeam_props['minor']['unit'])
            assert pa_unit == u.Unit(chanbeam_props['positionangle']['unit'])

            major[chan] = chanbeam_props['major']['value'] * maj_unit
            minor[chan] = chanbeam_props['minor']['value'] * min_unit
            pa[chan] = chanbeam_props['positionangle']['value'] * pa_unit

        return cls(major=major, minor=minor, pa=pa)

    def average_beam(self, includemask=None, raise_for_nan=True):
        """
        Average the beam major, minor, and PA attributes.

        This is usually a dumb thing to do!
        """

        from astropy.stats import circmean

        if includemask is None:
            includemask = self.isfinite
        else:
            includemask = np.logical_and(includemask, self.isfinite)

        new_beam = Beam(major=self.major[includemask].mean(),
                        minor=self.minor[includemask].mean(),
                        pa=circmean(self.pa[includemask],
                        weights=(self.major / self.minor)[includemask]))

        if raise_for_nan and np.any(np.isnan(new_beam)):
            raise ValueError("NaNs after averaging.  This is a bug.")

        return new_beam

    def largest_beam(self, includemask=None):
        """
        Returns the largest beam (by area) in a list of beams.
        """

        if includemask is None:
            includemask = self.isfinite
        else:
            includemask = np.logical_and(includemask, self.isfinite)

        largest_idx = (self.major * self.minor)[includemask].argmax()
        new_beam = Beam(major=self.major[includemask][largest_idx],
                        minor=self.minor[includemask][largest_idx],
                        pa=self.pa[includemask][largest_idx])

        return new_beam

    def smallest_beam(self, includemask=None):
        """
        Returns the smallest beam (by area) in a list of beams.
        """

        if includemask is None:
            includemask = self.isfinite
        else:
            includemask = np.logical_and(includemask, self.isfinite)

        largest_idx = (self.major * self.minor)[includemask].argmin()
        new_beam = Beam(major=self.major[includemask][largest_idx],
                        minor=self.minor[includemask][largest_idx],
                        pa=self.pa[includemask][largest_idx])

        return new_beam

    def extrema_beams(self, includemask=None):
        return [self.smallest_beam(includemask),
                self.largest_beam(includemask)]

    def common_beam(self, includemask=None):
        '''
        Return the smallest common beam size.
        '''
        raise NotImplementedError("")
        # return commonbeam(self if includemask is None else self[includemask])

    def __iter__(self):
        for i in range(len(self)):
            yield self[i]