This file is indexed.

/usr/share/pyshared/landscape/manager/aptsources.py is in landscape-common 12.04.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import glob
import os
import pwd
import grp
import shutil
import tempfile

from twisted.internet.defer import succeed

from landscape.lib.twisted_util import spawn_process

from landscape.manager.plugin import ManagerPlugin, SUCCEEDED, FAILED
from landscape.package.reporter import find_reporter_command


class ProcessError(Exception):
    """Exception raised when running a process fails."""


class AptSources(ManagerPlugin):
    """A plugin managing sources.list content."""

    SOURCES_LIST = "/etc/apt/sources.list"
    SOURCES_LIST_D = "/etc/apt/sources.list.d"

    def register(self, registry):
        super(AptSources, self).register(registry)
        registry.register_message(
            "apt-sources-replace", self._wrap_handle_repositories)

    def _run_process(self, command, args, uid=None, gid=None):
        """
        Run the process in an asynchronous fashion, to be overriden in tests.
        """
        return spawn_process(command, args, uid=uid, gid=gid)

    def _wrap_handle_repositories(self, message):
        """
        Wrap C{_handle_repositories} to generate an activity result based on
        the returned value.
        """
        deferred = self._handle_repositories(message)

        operation_result = {"type": "operation-result",
                            "operation-id": message["operation-id"]}

        def success(ignored):
            operation_result["status"] = SUCCEEDED
            return operation_result

        def fail(failure):
            operation_result["status"] = FAILED
            text = "%s: %s" % (failure.type.__name__, failure.value)
            operation_result["result-text"] = text
            return operation_result

        deferred.addCallbacks(success, fail)
        deferred.addBoth(lambda result:
                         self.manager.broker.send_message(result, urgent=True))

    def _handle_process_error(self, result):
        """
        Turn a failed process command (code != 0) to a C{ProcessError}.
        """
        out, err, code = result
        if code:
            raise ProcessError("%s\n%s" % (out, err))

    def _handle_process_failure(self, failure):
        """
        Turn a signaled process command to a C{ProcessError}.
        """
        if not failure.check(ProcessError):
            out, err, signal = failure.value
            raise ProcessError("%s\n%s" % (out, err))
        else:
            return failure

    def _remove_and_continue(self, passthrough, path):
        """
        Remove the temporary file created for the process, and forward the
        result.
        """
        os.unlink(path)
        return passthrough

    def _handle_repositories(self, message):
        """
        Handle a list of repositories to set on the machine.

        The format is the following:

        {"sources": [
          {"name": "repository-name",
           "content":
              "deb http://archive.ubuntu.com/ubuntu/ maverick main\n\
              "deb-src http://archive.ubuntu.com/ubuntu/ maverick main"}
          {"name": "repository-name-dev",
           "content":
              "deb http://archive.ubuntu.com/ubuntu/ maverick universe\n\
              "deb-src http://archive.ubuntu.com/ubuntu/ maverick universe"}],
         "gpg-keys": ["-----BEGIN PGP PUBLIC KEY BLOCK-----\n\
                      XXXX
                      -----END PGP PUBLIC KEY BLOCK-----",
                      "-----BEGIN PGP PUBLIC KEY BLOCK-----\n\
                      YYY
                      -----END PGP PUBLIC KEY BLOCK-----"]}
        """
        deferred = succeed(None)
        for key in message["gpg-keys"]:
            fd, path = tempfile.mkstemp()
            os.close(fd)
            key_file = file(path, "w")
            key_file.write(key)
            key_file.close()
            deferred.addCallback(
                lambda ignore, path=path:
                    self._run_process("/usr/bin/apt-key", ["add", path]))
            deferred.addCallback(self._handle_process_error)
            deferred.addBoth(self._remove_and_continue, path)
        deferred.addErrback(self._handle_process_failure)
        return deferred.addCallback(
            self._handle_sources, message["sources"])

    def _handle_sources(self, ignored, sources):
        """Handle sources repositories."""
        fd, path = tempfile.mkstemp()
        os.close(fd)
        new_sources = file(path, "w")
        for line in file(self.SOURCES_LIST):
            stripped_line = line.strip()
            if not stripped_line or stripped_line.startswith("#"):
                new_sources.write(line)
            else:
                new_sources.write("#%s" % line)
        new_sources.close()

        original_stat = os.stat(self.SOURCES_LIST)
        shutil.move(path, self.SOURCES_LIST)
        os.chmod(self.SOURCES_LIST, original_stat.st_mode)
        os.chown(self.SOURCES_LIST, original_stat.st_uid, original_stat.st_gid)

        for filename in glob.glob(os.path.join(self.SOURCES_LIST_D, "*.list")):
            shutil.move(filename, "%s.save" % filename)

        for source in sources:
            filename = os.path.join(self.SOURCES_LIST_D,
                                    "landscape-%s.list" % source["name"])
            sources_file = file(filename, "w")
            sources_file.write(source["content"])
            sources_file.close()
            os.chmod(filename, 0644)
        return self._run_reporter()

    def _run_reporter(self):
        """Once the repositories are modified, trigger a reporter run."""
        reporter = find_reporter_command()

        # Force a smart-update run, because the sources.list has changed
        args = ["--force-smart-update"]

        if self.registry.config.config is not None:
            args.append("--config=%s" % self.registry.config.config)

        if os.getuid() == 0:
            uid = pwd.getpwnam("landscape").pw_uid
            gid = grp.getgrnam("landscape").gr_gid
        else:
            uid = None
            gid = None
        return self._run_process(reporter, args, uid=uid, gid=gid)