This file is indexed.

/usr/lib/python3/dist-packages/plainbox/impl/result.py is in python3-plainbox 0.5.3-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# This file is part of Checkbox.
#
# Copyright 2012 Canonical Ltd.
# Written by:
#   Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.

#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`plainbox.impl.result` -- job result
=========================================

This module has two basic implementation of :class:`IJobResult`:
:class:`MemoryJobResult` and :class:`DiskJobResult`.
"""

from collections import namedtuple
import base64
import gzip
import io
import json
import logging
import inspect

from plainbox.abc import IJobResult
from plainbox.i18n import gettext as _
from plainbox.impl.signal import Signal

logger = logging.getLogger("plainbox.result")


# Tuple representing entries in the JobResult.io_log
# Each entry has three fields:
#
#   delay - time elapsed since the previous record was created (in seconds,
#   floating point unit represent fractional parts)
#
#   stream_name - name of the stream the IO was observed on, currently
#   'stdout' and 'stderr' are supported.
#
#   data - the actual IO seen (bytes)
IOLogRecord = namedtuple("IOLogRecord", "delay stream_name data".split())


class _JobResultBase(IJobResult):
    """
    Base class for :`IJobResult` implementations.

    This class defines base properties common to all variants of `IJobResult`
    """

    def __init__(self, data):
        """
        Initialize a new result with the specified data

        Data is a dictionary that can hold arbitrary values. At least some
        values are explicitly used, such as 'outcome', 'comments' and
        'return_code' but all of those are optional.
        """
        # Filter out boring items so that stuff that is rally identical,
        # behaves as if it was identical. This is especially important for
        # __eq__() below as various types of IJobResult are constructed and
        # compared with default entries that should not compare differently.
        self._data = {
            key: value for key, value in data.items()
            if value is not None and value != []}

    def __eq__(self, other):
        if not isinstance(other, _JobResultBase):
            return NotImplemented
        return self._data == other._data

    def __str__(self):
        return str(self.outcome)

    def __repr__(self):
        return "<{}>".format(
            ' '.join([self.__class__.__name__] + [
                "{}:{!r}".format(key, self._data[key])
                for key in sorted(self._data.keys())]))

    @Signal.define
    def on_outcome_changed(self, old, new):
        """
        Signal sent when ``outcome`` property value is changed
        """

    @property
    def outcome(self):
        """
        outcome of running this job.

        The outcome ultimately classifies jobs (tests) as failures or
        successes.  There are several other types of outcome that all basically
        mean that the job did not run for some particular reason.
        """
        return self._data.get('outcome', self.OUTCOME_NONE)

    @outcome.setter
    def outcome(self, new):
        old = self.outcome
        if old != new:
            self._data['outcome'] = new
            self.on_outcome_changed(old, new)

    @property
    def execution_duration(self):
        """
        The amount of time in seconds it took to run this
        jobs command.
        """
        return self._data.get('execution_duration')

    @property
    def comments(self):
        """
        comments of the test operator
        """
        return self._data.get('comments')

    @comments.setter
    def comments(self, new):
        old = self.comments
        if old != new:
            self._data['comments'] = new
            self.on_comments_changed(old, new)

    @Signal.define
    def on_comments_changed(self, old, new):
        """
        Signal sent when ``comments`` property value is changed
        """

    @property
    def return_code(self):
        """
        return code of the command associated with the job, if any
        """
        return self._data.get('return_code')

    @property
    def io_log(self):
        return tuple(self.get_io_log())


class MemoryJobResult(_JobResultBase):
    """
    A :class:`IJobResult` that keeps IO logs in memory.

    This type of JobResult is indented for writing unit tests where the hassle
    of going through the filesystem would make them needlessly complicated.
    """

    def get_io_log(self):
        io_log_data = self._data.get('io_log', ())
        for entry in io_log_data:
            if isinstance(entry, IOLogRecord):
                yield entry
            elif isinstance(entry, tuple):
                yield IOLogRecord(*entry)
            else:
                raise TypeError(
                    "each item in io_log must be either a tuple"
                    " or special the IOLogRecord tuple")


class GzipFile(gzip.GzipFile):
    """
    Subclass of GzipFile that works around missing read1() on python3.2

    See: http://bugs.python.org/issue10791
    """

    def read1(self, n):
        return self.read(n)


class DiskJobResult(_JobResultBase):
    """
    A :class:`IJobResult` that keeps IO logs on disk.

    This type of JobResult is intended for working with most results. It does
    not store IO logs in memory so it is scalable to arbitrary IO log sizes.
    Each instance just knows where the log file is located (using the
    'io_log_filename' attribute for that) and offers streaming API for
    accessing particular parts of the log.
    """

    @property
    def io_log_filename(self):
        """
        pathname of the file containing serialized IO log records
        """
        return self._data.get("io_log_filename")

    def get_io_log(self):
        record_path = self.io_log_filename
        if record_path:
            with GzipFile(record_path, mode='rb') as gzip_stream, \
                    io.TextIOWrapper(gzip_stream, encoding='UTF-8') as stream:
                for record in IOLogRecordReader(stream):
                    yield record

    @property
    def io_log(self):
        caller_frame, filename, lineno = inspect.stack(0)[1][:3]
        logger.warning(
            # TRANSLATORS: please keep DiskJobResult.io_log untranslated
            _("Expensive DiskJobResult.io_log property access from %s:%d"),
            filename, lineno)
        return super(DiskJobResult, self).io_log


class IOLogRecordWriter:
    """
    Class for writing :class:`IOLogRecord` instances to a text stream
    """

    def __init__(self, stream):
        self.stream = stream

    def close(self):
        self.stream.close()

    def write_record(self, record):
        """
        Write an :class:`IOLogRecord` to the stream.
        """
        text = json.dumps([
            record[0], record[1],
            base64.standard_b64encode(record[2]).decode("ASCII")],
            check_circular=False, ensure_ascii=True, indent=None,
            separators=(',', ':'))
        logger.debug(_("Encoded %r into string %r"), record, text)
        assert "\n" not in text
        self.stream.write(text)
        self.stream.write('\n')


class IOLogRecordReader:
    """
    Class for streaming :class`IOLogRecord` instances from a text stream
    """

    def __init__(self, stream):
        self.stream = stream

    def close(self):
        self.stream.close()

    def read_record(self):
        """
        Read the next record from the stream.

        :returns: None if the stream is empty
        :returns: next :class:`IOLogRecord` as found in the stream.
        """
        text = self.stream.readline()
        if len(text) == 0:
            return
        data = json.loads(text)
        return IOLogRecord(
            data[0], data[1],
            base64.standard_b64decode(data[2].encode("ASCII")))

    def __iter__(self):
        """
        Iterate over the entire stream generating subsequent
        :class:`IOLogRecord` entries.
        """
        while True:
            record = self.read_record()
            if record is None:
                break
            yield record