This file is indexed.

/usr/share/pyshared/landscape/package/taskhandler.py is in landscape-common 12.04.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import os
import re
import logging

from twisted.internet.defer import succeed, Deferred

from landscape.lib.lock import lock_path, LockError
from landscape.lib.log import log_failure
from landscape.lib.lsb_release import LSB_RELEASE_FILENAME, parse_lsb_release
from landscape.reactor import TwistedReactor
from landscape.deployment import Configuration, init_logging
from landscape.package.store import PackageStore, InvalidHashIdDb
from landscape.broker.amp import RemoteBrokerConnector


class PackageTaskError(Exception):
    """Raised when a task hasn't been successfully completed."""


class PackageTaskHandlerConfiguration(Configuration):
    """Specialized configuration for L{PackageTaskHandler}s."""

    @property
    def package_directory(self):
        """Get the path to the package directory."""
        return os.path.join(self.data_path, "package")

    @property
    def store_filename(self):
        """Get the path to the SQlite file for the L{PackageStore}."""
        return os.path.join(self.package_directory, "database")

    @property
    def hash_id_directory(self):
        """Get the path to the directory holding the stock hash-id stores."""
        return os.path.join(self.package_directory, "hash-id")

    @property
    def update_stamp_filename(self):
        """Get the path to the update-stamp file."""
        return os.path.join(self.package_directory, "update-stamp")


class LazyRemoteBroker(object):
    """Wrapper class around L{RemoteBroker} providing lazy initialization.

    This class is a wrapper around a regular L{RemoteBroker}. It connects to
    the remote broker object only when one of its attributes is first accessed.

    @param connector: The L{RemoteBrokerConnector} which will be used
        to connect to the broker.

    @note: This behaviour is needed in particular by the ReleaseUpgrader and
        the PackageChanger, because if the they connect early and the
        landscape-client package gets upgraded while they run, they will lose
        the connection and will not be able to reconnect for a potentially long
        window of time (till the new landscape-client package version is fully
        configured and the service is started again).
    """

    def __init__(self, connector):
        self._connector = connector
        self._remote = None

    def __getattr__(self, method):

        if self._remote:
            return getattr(self._remote, method)

        def wrapper(*args, **kwargs):

            def got_connection(remote):
                self._remote = remote
                return getattr(self._remote, method)(*args, **kwargs)

            result = self._connector.connect()
            return result.addCallback(got_connection)

        return wrapper


class PackageTaskHandler(object):

    config_factory = PackageTaskHandlerConfiguration

    queue_name = "default"
    lsb_release_filename = LSB_RELEASE_FILENAME
    package_store_class = PackageStore

    def __init__(self, package_store, package_facade, remote_broker, config):
        self._store = package_store
        self._facade = package_facade
        self._broker = remote_broker
        self._config = config
        self._count = 0

    def run(self):
        return self.handle_tasks()

    def handle_tasks(self):
        """Handle the tasks in the queue.

        The tasks will be handed over one by one to L{handle_task} until the
        queue is empty or a task fails.

        @see: L{handle_tasks}
        """
        return self._handle_next_task(None)

    def _handle_next_task(self, result, last_task=None):
        """Pick the next task from the queue and pass it to C{handle_task}."""

        if last_task is not None:
            # Last task succeeded.  We can safely kill it now.
            last_task.remove()
            self._count += 1

        task = self._store.get_next_task(self.queue_name)

        if task:
            # We have another task.  Let's handle it.
            result = self.handle_task(task)
            result.addCallback(self._handle_next_task, last_task=task)
            result.addErrback(self._handle_task_failure)
            return result

        else:
            # No more tasks!  We're done!
            return succeed(None)

    def _handle_task_failure(self, failure):
        """Gracefully handle a L{PackageTaskError} and stop handling tasks."""
        failure.trap(PackageTaskError)

    def handle_task(self, task):
        """Handle a single task.

        Sub-classes must override this method in order to trigger task-specific
        actions.

        This method must return a L{Deferred} firing the task result. If the
        deferred is successful the task will be removed from the queue and the
        next one will be picked. If the task can't be completed, this method
        must raise a L{PackageTaskError}, in this case the handler will stop
        processing tasks and the failed task won't be removed from the queue.
        """
        return succeed(None)

    @property
    def handled_tasks_count(self):
        """
        Return the number of tasks that have been successfully handled so far.
        """
        return self._count

    def use_hash_id_db(self):
        """
        Attach the appropriate pre-canned hash=>id database to our store.
        """

        def use_it(hash_id_db_filename):

            if hash_id_db_filename is None:
                # Couldn't determine which hash=>id database to use,
                # just ignore the failure and go on
                return

            if not os.path.exists(hash_id_db_filename):
                # The appropriate database isn't there, but nevermind
                # and just go on
                return

            try:
                self._store.add_hash_id_db(hash_id_db_filename)
            except InvalidHashIdDb:
                # The appropriate database is there but broken,
                # let's remove it and go on
                logging.warning("Invalid hash=>id database %s" %
                                hash_id_db_filename)
                os.remove(hash_id_db_filename)
                return

        result = self._determine_hash_id_db_filename()
        result.addCallback(use_it)
        return result

    def _determine_hash_id_db_filename(self):
        """Build up the filename of the hash=>id database to use.

        @return: a deferred resulting in the filename to use or C{None}
            in case of errors.
        """

        def got_server_uuid(server_uuid):

            warning = "Couldn't determine which hash=>id database to use: %s"

            if server_uuid is None:
                logging.warning(warning % "server UUID not available")
                return None

            try:
                lsb_release_info = parse_lsb_release(self.lsb_release_filename)
            except IOError, error:
                logging.warning(warning % str(error))
                return None
            try:
                codename = lsb_release_info["code-name"]
            except KeyError:
                logging.warning(warning % "missing code-name key in %s" %
                                self.lsb_release_filename)
                return None

            arch = self._facade.get_arch()
            if not arch:
                # The Apt code should always return a non-empty string,
                # so this branch shouldn't get executed at all. However
                # this check is kept as an extra paranoia sanity check.
                logging.warning(warning % "unknown dpkg architecture")
                return None

            return os.path.join(self._config.hash_id_directory,
                                "%s_%s_%s" % (server_uuid, codename, arch))

        result = self._broker.get_server_uuid()
        result.addCallback(got_server_uuid)
        return result


def run_task_handler(cls, args, reactor=None):
    # please only pass reactor when you have totally mangled everything with
    # mocker. Otherwise bad things will happen.
    if reactor is None:
        reactor = TwistedReactor()

    config = cls.config_factory()
    config.load(args)

    for directory in [config.package_directory, config.hash_id_directory]:
        if not os.path.isdir(directory):
            os.mkdir(directory)

    program_name = cls.queue_name
    lock_filename = os.path.join(config.package_directory,
                                 program_name + ".lock")
    try:
        lock_path(lock_filename)
    except LockError:
        if config.quiet:
            raise SystemExit()
        raise SystemExit("error: package %s is already running"
                         % program_name)

    words = re.findall("[A-Z][a-z]+", cls.__name__)
    init_logging(config, "-".join(word.lower() for word in words))

    # Setup our umask for Smart to use, this needs to setup file permissions to
    # 0644 so...
    os.umask(022)

    package_store = cls.package_store_class(config.store_filename)
    # Delay importing of the facades so that we don't
    # import Smart unless we need to.
    from landscape.package.facade import (
        AptFacade, SmartFacade, has_new_enough_apt)
    if  has_new_enough_apt:
        package_facade = AptFacade()
    else:
        package_facade = SmartFacade()

    def finish():
        connector.disconnect()
        # For some obscure reason our TwistedReactor.stop method calls
        # reactor.crash() instead of reactor.stop(), which doesn't work
        # here. Maybe TwistedReactor.stop should simply use reactor.stop().
        reactor.call_later(0, reactor._reactor.stop)

    def got_error(failure):
        log_failure(failure)
        finish()

    connector = RemoteBrokerConnector(reactor, config, retry_on_reconnect=True)
    remote = LazyRemoteBroker(connector)
    handler = cls(package_store, package_facade, remote, config)
    result = Deferred()
    result.addCallback(lambda x: handler.run())
    result.addCallback(lambda x: finish())
    result.addErrback(got_error)
    reactor.call_when_running(lambda: result.callback(None))
    reactor.run()

    return result