This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/import_images/download_descriptions.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# Copyright 2014-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Download boot resource descriptions from Simplestreams repo.

This module is responsible only for syncing the repo's metadata, not the boot
resources themselves.  The two are handled in separate Simplestreams
synchronisation stages.
"""

__all__ = [
    'download_all_image_descriptions',
    'validate_product',
    ]

import re

from provisioningserver.import_images.boot_image_mapping import (
    BootImageMapping,
)
from provisioningserver.import_images.helpers import (
    get_os_from_product,
    get_signing_policy,
    ImageSpec,
    maaslog,
)
from simplestreams.mirrors import (
    BasicMirrorWriter,
    UrlMirrorReader,
)
from simplestreams.util import (
    path_from_mirror_url,
    products_exdata,
)

# Compile a regex to validate Ubuntu product names. This only allows V2 and V3
# Ubuntu images.
UBUNTU_REGEX = re.compile('.*:v[23]:.*', re.IGNORECASE)
# Compile a regex to validate Ubuntu Core. By having 'v4' in the
# product name it is prevented from being shown in older versions of
# MAAS which do not support Ubuntu Core.
UBUNTU_CORE_REGEX = re.compile('.*:v4:.*', re.IGNORECASE)
# Compile a regex to validate bootloader product names. This only allows V1
# bootloaders.
BOOTLOADER_REGEX = re.compile('.*:1:.*', re.IGNORECASE)


def clean_up_repo_item(item):
    """Return a subset of dict `item` for storing in a boot images dict."""
    keys_to_keep = [
        'content_id', 'product_name', 'version_name', 'path', 'subarches',
        'release_codename', 'release_title', 'support_eol', 'kflavor',
        'bootloader-type', 'os_title', 'gadget_title']
    compact_item = {
        key: item[key]
        for key in keys_to_keep
        if key in item
    }
    return compact_item


def validate_ubuntu(data, product_name):
    osystem = data.get('os', '')
    if 'ubuntu' not in osystem.lower():
        # It's not an Ubuntu product, nothing to validate.
        return True
    elif (osystem == 'ubuntu-core' and
          UBUNTU_CORE_REGEX.search(product_name) is not None):
        return True
    elif UBUNTU_REGEX.search(product_name) is None:
        # Only insert v2 or v3 Ubuntu products.
        return False
    else:
        return True


def validate_bootloader(data, product_name):
    bootloader_type = data.get('bootloader-type')
    if bootloader_type is None:
        # It's not a bootloader, nothing to validate
        return True
    if BOOTLOADER_REGEX.search(product_name) is None:
        # Only insert V1 bootloaders from the stream
        return False
    # Validate MAAS supports the specific bootloader_type, os, arch
    # combination.
    SUPPORTED_BOOTLOADERS = {
        'pxe': [
            {
                'os': 'pxelinux',
                'arch': 'i386',
            }
        ],
        'uefi': [
            {
                'os': 'grub-efi-signed',
                'arch': 'amd64',
            },
            {
                'os': 'grub-efi',
                'arch': 'arm64',
            }
        ],
        'open-firmware': [
            {
                'os': 'grub-ieee1275',
                'arch': 'ppc64el',
            }
        ],
    }
    for bootloader in SUPPORTED_BOOTLOADERS.get(bootloader_type, []):
        if (
                data.get('os') == bootloader['os'] and
                data.get('arch') == bootloader['arch']):
            return True

    # Bootloader not supported, ignore
    return False


def validate_product(data, product_name):
    return (validate_ubuntu(data, product_name) and
            validate_bootloader(data, product_name))


class RepoDumper(BasicMirrorWriter):
    """Gather metadata about boot images available in a Simplestreams repo.

    Used inside `download_image_descriptions`.  Stores basic metadata about
    each image it finds upstream in a given `BootImageMapping`.  Each stored
    item is a dict containing the basic metadata for retrieving a boot image.

    Simplestreams' `BasicMirrorWriter` in itself is stateless.  It relies on
    a subclass (such as this one) to store data.

    :ivar boot_images_dict: A `BootImageMapping`.  Image metadata will be
        stored here as it is discovered.  Simplestreams does not interact with
        this variable.
    """

    def __init__(self, boot_images_dict, validate_products=True):
        super(RepoDumper, self).__init__(config={
            # Only download the latest version. Without this all versions
            # will be read, causing miss matches in versions.
            'max_items': 1,
            })
        self.boot_images_dict = boot_images_dict
        self.validate_products = validate_products

    def load_products(self, path=None, content_id=None):
        """Overridable from `BasicMirrorWriter`."""
        # It looks as if this method only makes sense for MirrorReaders, not
        # for MirrorWriters.  The default MirrorWriter implementation just
        # raises NotImplementedError.  Stop it from doing that.
        return

    def insert_item(self, data, src, target, pedigree, contentsource):
        """Overridable from `BasicMirrorWriter`."""
        item = products_exdata(src, pedigree)
        if self.validate_products and not validate_product(item, pedigree[0]):
            maaslog.warning('Ignoring unsupported product %s' % pedigree[0])
            return
        os = get_os_from_product(item)
        arch = item['arch']
        subarches = item.get('subarches', 'generic')
        if item.get('bootloader-type') is None:
            release = item['release']
            kflavor = item.get('kflavor', 'generic')
        else:
            release = item['bootloader-type']
            kflavor = 'bootloader'
        label = item['label']
        base_image = ImageSpec(os, arch, None, kflavor, release, label)
        compact_item = clean_up_repo_item(item)

        if os == 'ubuntu-core':
            # For Ubuntu Core we only want one entry per release/arch/gadget
            gadget = item.get('gadget_snap', 'generic')
            kflavor = item.get('kernel_snap', 'generic')
            release = "%s-%s" % (release, gadget)
            self.boot_images_dict.setdefault(
                base_image._replace(
                    subarch='generic', kflavor=kflavor, release=release),
                compact_item)
        else:
            for subarch in subarches.split(','):
                self.boot_images_dict.setdefault(
                    base_image._replace(subarch=subarch), compact_item)

            # HWE resources need to map to a specfic resource, and not just to
            # any of the supported subarchitectures for that resource.
            subarch = item.get('subarch', 'generic')
            self.boot_images_dict.set(
                base_image._replace(subarch=subarch), compact_item)

            if os == 'ubuntu' and item.get('version') is not None:
                # HWE resources with generic, should map to the HWE that ships
                # with that release. Starting with Xenial kernels changed from
                # using the naming format hwe-<letter> to ga-<version>. Look
                # for both.
                hwe_archs = ["ga-%s" % item['version'], "hwe-%s" % release[0]]
                if subarch in hwe_archs and 'generic' in subarches:
                    self.boot_images_dict.set(
                        base_image._replace(subarch='generic'), compact_item)

    def sync(self, reader, path):
        try:
            super(RepoDumper, self).sync(reader, path)
        except IOError:
            maaslog.warning(
                "I/O error while syncing boot images. If this problem "
                "persists, verify network connectivity and disk usage.")
            # This MUST NOT suppress the I/O error because callers use
            # self.boot_images_dict as the "return" value. Suppressing
            # exceptions here gives the caller no reason to doubt that
            # boot_images_dict is not utter garbage and so pass it up the
            # stack where it is then acted upon, to empty out BootSourceCache
            # for example. True story.
            raise


def value_passes_filter_list(filter_list, property_value):
    """Does the given property of a boot image pass the given filter list?

    The value passes if either it matches one of the entries in the list of
    filter values, or one of the filter values is an asterisk (`*`).
    """
    return '*' in filter_list or property_value in filter_list


def value_passes_filter(filter_value, property_value):
    """Does the given property of a boot image pass the given filter?

    The value passes the filter if either the filter value is an asterisk
    (`*`) or the value is equal to the filter value.
    """
    return filter_value in ('*', property_value)


def image_passes_filter(filters, os, arch, subarch, release, label):
    """Filter a boot image against configured import filters.

    :param filters: A list of dicts describing the filters, as in `boot_merge`.
        If the list is empty, or `None`, any image matches.  Any entry in a
        filter may be a string containing just an asterisk (`*`) to denote that
        the entry will match any value.
    :param os: The given boot image's operating system.
    :param arch: The given boot image's architecture.
    :param subarch: The given boot image's subarchitecture.
    :param release: The given boot image's OS release.
    :param label: The given boot image's label.
    :return: Whether the image matches any of the dicts in `filters`.
    """
    if filters is None or len(filters) == 0:
        return True
    for filter_dict in filters:
        item_matches = (
            value_passes_filter(filter_dict['os'], os) and
            value_passes_filter(filter_dict['release'], release) and
            value_passes_filter_list(filter_dict['arches'], arch) and
            value_passes_filter_list(filter_dict['subarches'], subarch) and
            value_passes_filter_list(filter_dict['labels'], label)
        )
        if item_matches:
            return True
    return False


def boot_merge(destination, additions, filters=None):
    """Complement one `BootImageMapping` with entries from another.

    This adds entries from `additions` (that match `filters`, if given) to
    `destination`, but only for those image specs for which `destination` does
    not have entries yet.

    :param destination: `BootImageMapping` to be updated.  It will be extended
        in-place.
    :param additions: A second `BootImageMapping`, which will be used as a
        source of additional entries.
    :param filters: List of dicts, each of which contains 'os', arch',
        'subarch', 'release', and 'label' keys.  If given, entries are only
        considered for copying from `additions` to `destination` if they match
        at least one of the filters.  Entries in the filter may be the string
        `*` (or for entries that are lists, may contain the string `*`) to make
        them match any value.
    """
    for image, resource in additions.items():
        # Cannot filter by kflavor so it is excluded in the filtering.
        os, arch, subarch, _, release, label = image
        if image_passes_filter(
                filters, os, arch, subarch, release, label):
            # Do not override an existing entry with the same
            # os/arch/subarch/release/label: the first entry found takes
            # precedence.
            destination.setdefault(image, resource)


def download_image_descriptions(
        path, keyring=None, user_agent=None, validate_products=True):
    """Download image metadata from upstream Simplestreams repo.

    :param path: The path to a Simplestreams repo.
    :param keyring: Optional keyring for verifying the repo's signatures.
    :param user_agent: Optional user agent string for downloading the image
        descriptions.
    :return: A `BootImageMapping` describing available boot resources.
    """
    maaslog.info("Downloading image descriptions from %s", path)
    mirror, rpath = path_from_mirror_url(path, None)
    policy = get_signing_policy(rpath, keyring)
    if user_agent is None:
        reader = UrlMirrorReader(mirror, policy=policy)
    else:
        try:
            reader = UrlMirrorReader(
                mirror, policy=policy, user_agent=user_agent)
        except TypeError:
            # UrlMirrorReader doesn't support the user_agent argument.
            # simplestream >=bzr429 is required for this feature.
            reader = UrlMirrorReader(mirror, policy=policy)

    boot_images_dict = BootImageMapping()
    dumper = RepoDumper(boot_images_dict, validate_products=validate_products)
    dumper.sync(reader, rpath)
    return boot_images_dict


def download_all_image_descriptions(
        sources, user_agent=None, validate_products=True):
    """Download image metadata for all sources in `config`."""
    boot = BootImageMapping()
    for source in sources:
        repo_boot = download_image_descriptions(
            source['url'], keyring=source.get('keyring', None),
            user_agent=user_agent, validate_products=validate_products)
        boot_merge(boot, repo_boot, source['selections'])
    return boot