This file is indexed.

/usr/lib/python3/dist-packages/invoke/executor.py is in python3-invoke 0.11.1+dfsg1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from .config import Config
from .context import Context
from .util import debug
from .tasks import Call

import six


class Executor(object):
    """
    An execution strategy for Task objects.

    Subclasses may override various extension points to change, add or remove
    behavior.
    """
    def __init__(self, collection, config=None):
        """
        Initialize executor with handles to a task collection & config.

        :param collection:
            A `.Collection` used to look up requested tasks (and their default
            config data, if any) by name during execution.

        :param config:
            An optional `.Config` holding configuration state Defaults to an
            empty `.Config` if not given.
        """
        self.collection = collection
        if config is None:
            config = Config()
        self.config = config

    def execute(self, *tasks, **kwargs):
        """
        Execute one or more ``tasks`` in sequence.

        :param tasks:
            An iterable of two-tuples whose first element is a task name and
            whose second element is a dict suitable for use as ``**kwargs``.
            E.g.::

                [
                    ('task1', {}),
                    ('task2', {'arg1': 'val1'}),
                    ...
                ]

            As a shorthand, a string instead of a two-tuple may be given,
            implying an empty kwargs dict.

            The string specifies which task from the Executor's `.Collection`
            is to be executed. It may contain dotted syntax appropriate for
            calling namespaced tasks, e.g. ``subcollection.taskname``.

            Thus the above list-of-tuples is roughly equivalent to::

                task1()
                task2(arg1='val1')

        :returns:
            A dict mapping task objects to their return values.

            This dict may include pre- and post-tasks if any were executed. For
            example, in a collection with a ``build`` task depending on another
            task named ``setup``, executing ``build`` will result in a dict
            with two keys, one for ``build`` and one for ``setup``.
        """
        # Handle top level kwargs (the name gets overwritten below)
        # Normalize input
        debug("Examining top level tasks {0!r}".format([x[0] for x in tasks]))
        tasks = self._normalize(tasks)
        debug("Tasks with kwargs: {0!r}".format(tasks))
        # Obtain copy of directly-given tasks since they should sometimes
        # behave differently
        direct = list(tasks)
        # Expand pre/post tasks & then dedupe the entire run.
        # Load config at this point to get latest value of dedupe option
        config = self.config.clone()
        # Get some good value for dedupe option, even if config doesn't have
        # the tree we expect. (This is a concession to testing.)
        try:
            dedupe = config.tasks.dedupe
        except AttributeError:
            dedupe = True
        # Actual deduping here
        tasks = self._dedupe(self._expand_tasks(tasks), dedupe)
        # Execute
        results = {}
        for task in tasks:
            args, kwargs = tuple(), {}
            # Unpack Call objects, including given-name handling
            name = None
            autoprint = task in direct and task.autoprint
            if isinstance(task, Call):
                c = task
                task = c.task
                args, kwargs = c.args, c.kwargs
                name = c.name
            result = self._execute(
                task=task, name=name, args=args, kwargs=kwargs, config=config
            )
            if autoprint:
                print(result)
            # TODO: handle the non-dedupe case / the same-task-different-args
            # case, wherein one task obj maps to >1 result.
            results[task] = result
        return results

    def _normalize(self, tasks):
        # To two-tuples from potential combo of two-tuples & strings
        tuples = [
            (x, {}) if isinstance(x, six.string_types) else x
            for x in tasks
        ]
        # Then to call objects (binding the task obj + kwargs together)
        calls = []
        for name, kwargs in tuples:
            c = Call(self.collection[name], **kwargs)
            c.name = name
            calls.append(c)
        return calls

    def _dedupe(self, tasks, dedupe):
        deduped = []
        if dedupe:
            debug("Deduplicating tasks...")
            for task in tasks:
                if task not in deduped:
                    debug("{0!r}: ok".format(task))
                    deduped.append(task)
                else:
                    debug("{0!r}: skipping".format(task))
        else:
            deduped = tasks
        return deduped

    def _execute(self, task, name, args, kwargs, config):
        # Need task + possible name when invoking CLI-given tasks, so we can
        # pass a dotted path to Collection.configuration()
        debug("Executing {0!r}{1}".format(
            task,
            (" as {0}".format(name)) if name else ""),
        )
        if task.contextualized:
            # Load per-task/collection config
            config.load_collection(self.collection.configuration(name))
            # Load env vars, as the last step (so users can override
            # per-collection keys via the env)
            config.load_shell_env()
            # Set up context w/ that config
            context = Context(config=config)
            args = (context,) + args
        result = task(*args, **kwargs)
        return result

    def _expand_tasks(self, tasks):
        ret = []
        for task in tasks:
            ret.extend(self._expand_tasks(task.pre))
            ret.append(task)
            ret.extend(self._expand_tasks(task.post))
        return ret