This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/utils/__init__.py is in python3-maas-provisioningserver 2.0.0~beta3+bzr4941-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# Copyright 2012-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Utilities for the provisioning server."""

__all__ = [
    "CircularDependency",
    "filter_dict",
    "flatten",
    "import_settings",
    "in_develop_mode",
    "locate_config",
    "locate_template",
    "parse_key_value_file",
    "ShellTemplate",
    "sorttop",
    "sudo",
    "typed",
    "warn_deprecated",
    "write_custom_config_section",
]

from collections import Iterable
from functools import reduce
from itertools import chain
import os
from pipes import quote
import sys
from sys import _getframe as getframe
from typing import Tuple
from warnings import warn

import tempita

# Use typecheck-decorator if it's available.
try:
    from maastesting.typecheck import typed
except ImportError:
    typed = lambda func: func


@typed
def locate_config(*path: Tuple[str]):
    """Return the location of a given config file or directory.

    :param path: Path elements to resolve relative to `${MAAS_ROOT}/etc/maas`.
    """
    # The `os.curdir` avoids a crash when `path` is empty.
    path = os.path.join(os.curdir, *path)
    if os.path.isabs(path):
        return path
    else:
        # Avoid circular imports.
        from provisioningserver.path import get_tentative_path
        return get_tentative_path("etc", "maas", path)


def locate_template(*path: Tuple[str]):
    """Return the absolute path of a template.

    :param path: Path elemets to resolve relative to the location the
                 Python library provisioning server is located in.
    """
    return os.path.abspath(
        os.path.join(
            os.path.dirname(__file__),
            '..', 'templates', *path))


setting_expression = r"""
    ^([A-Z0-9_]+)    # Variable name is all caps, alphanumeric and _.
    =                # Assignment operator.
    (?:"|\')?        # Optional leading single or double quote.
    (.*)             # Value
    (?:"|\')?        # Optional trailing single or double quote.
    """


def find_settings(whence):
    """Return settings from `whence`, which is assumed to be a module."""
    # XXX 2012-10-11 JeroenVermeulen, bug=1065456: Put this in a shared
    # location.  It's currently duplicated from elsewhere.
    return {
        name: value
        for name, value in vars(whence).items()
        if not name.startswith("_")
    }


def import_settings(whence):
    """Import settings from `whence` into the caller's global scope."""
    # XXX 2012-10-11 JeroenVermeulen, bug=1065456: Put this in a shared
    # location.  It's currently duplicated from elsewhere.
    source = find_settings(whence)
    target = sys._getframe(1).f_globals
    target.update(source)


def filter_dict(dictionary, desired_keys):
    """Return a version of `dictionary` restricted to `desired_keys`.

    This is like a set union, except the values from `dictionary` come along.
    (Actually `desired_keys` can be a `dict`, but its values will be ignored).
    """
    return {
        key: value
        for key, value in dictionary.items()
        if key in desired_keys
    }


def dict_depth(d, depth=0):
    """Returns the max depth of a dictionary."""
    if not isinstance(d, dict) or not d:
        return depth
    return max(dict_depth(v, depth + 1) for _, v in d.items())


def split_lines(input, separator):
    """Split each item from `input` into a key/value pair."""
    return (line.split(separator, 1) for line in input if line.strip() != '')


def strip_pairs(input):
    """Strip whitespace of each key/value pair in input."""
    return ((key.strip(), value.strip()) for (key, value) in input)


def parse_key_value_file(file_name, separator=":"):
    """Parse a text file into a dict of key/value pairs.

    Use this for simple key:value or key=value files. There are no sections,
    as required for python's ConfigParse. Whitespace and empty lines are
    ignored, and it is assumed that the file is encoded as UTF-8.

    :param file_name: Name of file to parse.
    :param separator: The text that separates each key from its value.
    """
    with open(file_name, 'r', encoding="utf-8") as input:
        return dict(strip_pairs(split_lines(input, separator)))


# Header and footer comments for MAAS custom config sections, as managed
# by write_custom_config_section.
maas_custom_config_markers = (
    "## Begin MAAS settings.  Do not edit; MAAS will overwrite this section.",
    "## End MAAS settings.",
)


def find_list_item(item, in_list, starting_at=0):
    """Return index of `item` in `in_list`, or None if not found."""
    try:
        return in_list.index(item, starting_at)
    except ValueError:
        return None


def write_custom_config_section(original_text, custom_section):
    """Insert or replace a custom section in a configuration file's text.

    This allows you to rewrite configuration files that are not owned by
    MAAS, but where MAAS will have one section for its own settings.  It
    doesn't read or write any files; this is a pure text operation.

    Appends `custom_section` to the end of `original_text` if there was no
    custom MAAS section yet.  Otherwise, replaces the existing custom MAAS
    section with `custom_section`.  Returns the new text.

    Assumes that the configuration file's format accepts lines starting with
    hash marks (#) as comments.  The custom section will be bracketed by
    special marker comments that make it clear that MAAS wrote the section
    and it should not be edited by hand.

    :param original_text: The config file's current text.
    :type original_text: unicode
    :param custom_section: Custom config section to insert.
    :type custom_section: unicode
    :return: New config file text.
    :rtype: unicode
    """
    header, footer = maas_custom_config_markers
    lines = original_text.splitlines()
    header_index = find_list_item(header, lines)
    if header_index is not None:
        footer_index = find_list_item(footer, lines, header_index)
        if footer_index is None:
            # There's a header but no footer.  Pretend we didn't see the
            # header; just append a new custom section at the end.  Any
            # subsequent rewrite will replace the part starting at the
            # header and ending at the header we will add here.  At that
            # point there will be no trace of the strange situation
            # left.
            header_index = None

    if header_index is None:
        # There was no MAAS custom section in this file.  Append it at
        # the end.
        lines += [
            header,
            custom_section,
            footer,
        ]
    else:
        # There is a MAAS custom section in the file.  Replace it.
        lines = (
            lines[:(header_index + 1)] +
            [custom_section] +
            lines[footer_index:])

    return '\n'.join(lines) + '\n'


class Safe:
    """An object that is safe to render as-is."""

    __slots__ = ("value",)

    def __init__(self, value):
        self.value = value

    def __repr__(self):
        return "<%s %r>" % (
            self.__class__.__name__, self.value)


def escape_py_literal(string):
    """Escape and quote a string for use as a python literal."""
    return repr(string).decode('ascii')


class ShellTemplate(tempita.Template):
    """A Tempita template specialised for writing shell scripts.

    By default, substitutions will be escaped using `pipes.quote`, unless
    they're marked as safe. This can be done using Tempita's filter syntax::

      {{foobar|safe}}

    or as a plain Python expression::

      {{safe(foobar)}}

    """

    default_namespace = dict(
        tempita.Template.default_namespace,
        safe=Safe)

    def _repr(self, value, pos):
        """Shell-quote the value by default."""
        rep = super(ShellTemplate, self)._repr
        if isinstance(value, Safe):
            return rep(value.value, pos)
        else:
            return quote(rep(value, pos))


def classify(func, subjects):
    """Classify `subjects` according to `func`.

    Splits `subjects` into two lists: one for those which `func`
    returns a truth-like value, and one for the others.

    :param subjects: An iterable of `(ident, subject)` tuples, where
        `subject` is an argument that can be passed to `func` for
        classification.
    :param func: A function that takes a single argument.

    :return: A ``(matched, other)`` tuple, where ``matched`` and
        ``other`` are `list`s of `ident` values; `subject` values are
        not returned.
    """
    matched, other = [], []
    for ident, subject in subjects:
        bucket = matched if func(subject) else other
        bucket.append(ident)
    return matched, other


def warn_deprecated(alternative=None):
    """Issue a `DeprecationWarning` for the calling function.

    :param alternative: Text describing an alternative to using this
        deprecated function.
    """
    target = getframe(1).f_code.co_name
    message = "%s is deprecated" % target
    if alternative is None:
        message = "%s." % (message,)
    else:
        message = "%s; %s" % (message, alternative)
    warn(message, DeprecationWarning, 1)


def flatten(*things):
    """Recursively flatten iterable parts of `things`.

    For example::

      >>> sorted(flatten([1, 2, {3, 4, (5, 6)}]))
      [1, 2, 3, 4, 5, 6]

    :return: An iterator.
    """
    def _flatten(things):
        if isinstance(things, str):
            # String-like objects are treated as leaves; iterating through a
            # string yields more strings, each of which is also iterable, and
            # so on, until the heat-death of the universe.
            return iter((things,))
        elif isinstance(things, Iterable):
            # Recurse and merge in order to flatten nested structures.
            return chain.from_iterable(map(_flatten, things))
        else:
            # This is a leaf; return an single-item iterator so that it can be
            # chained with any others.
            return iter((things,))

    return _flatten(things)


def is_true(value):
    if value is None:
        return False
    return value.lower() in ("yes", "true", "t", "1")


def in_develop_mode():
    """Return True if `MAAS_RACK_DEVELOP` environment variable is true."""
    return is_true(os.getenv('MAAS_RACK_DEVELOP', None))


def sudo(command_args):
    """Wrap the command arguments in a sudo command, if not in debug mode."""
    if in_develop_mode():
        return command_args
    else:
        return ['sudo', '-n'] + command_args


class CircularDependency(ValueError):
    """A circular dependency has been found."""


def sorttop(data):
    """Sort `data` topologically.

    `data` should be a `dict`, where each entry maps a "thing" to a `set` of
    other things they depend on, or should be sorted after. For example:

      >>> list(sorttop({1: {2}, 2: {3, 4}}))
      [{3, 4}, {2}, {1}]

    :raises CircularDependency: If two or more things depend on one another,
        making it impossible to resolve their relative ordering.
    """
    empty = frozenset()
    # Copy data and discard self-referential dependencies.
    data = {thing: set(deps) for thing, deps in data.items()}
    for thing, deps in data.items():
        deps.discard(thing)
    # Find ghost dependencies and add them as "things".
    ghosts = reduce(set.union, data.values(), set()).difference(data)
    for ghost in ghosts:
        data[ghost] = empty
    # Skim batches off the top until we're done.
    while len(data) != 0:
        batch = {thing for thing, deps in data.items() if deps == empty}
        if len(batch) == 0:
            raise CircularDependency(data)
        else:
            for thing in batch:
                del data[thing]
            for deps in data.values():
                deps.difference_update(batch)
            yield batch