/usr/lib/python3/dist-packages/plainbox/impl/result.py is in python3-plainbox 0.5.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | # This file is part of Checkbox.
#
# Copyright 2012 Canonical Ltd.
# Written by:
# Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
"""
:mod:`plainbox.impl.result` -- job result
=========================================
This module has two basic implementation of :class:`IJobResult`:
:class:`MemoryJobResult` and :class:`DiskJobResult`.
"""
from collections import namedtuple
import base64
import gzip
import io
import json
import logging
import inspect
from plainbox.abc import IJobResult
from plainbox.i18n import gettext as _
from plainbox.impl.signal import Signal
logger = logging.getLogger("plainbox.result")
# Tuple representing entries in the JobResult.io_log
# Each entry has three fields:
#
# delay - time elapsed since the previous record was created (in seconds,
# floating point unit represent fractional parts)
#
# stream_name - name of the stream the IO was observed on, currently
# 'stdout' and 'stderr' are supported.
#
# data - the actual IO seen (bytes)
IOLogRecord = namedtuple("IOLogRecord", "delay stream_name data".split())
class _JobResultBase(IJobResult):
"""
Base class for :`IJobResult` implementations.
This class defines base properties common to all variants of `IJobResult`
"""
def __init__(self, data):
"""
Initialize a new result with the specified data
Data is a dictionary that can hold arbitrary values. At least some
values are explicitly used, such as 'outcome', 'comments' and
'return_code' but all of those are optional.
"""
# Filter out boring items so that stuff that is rally identical,
# behaves as if it was identical. This is especially important for
# __eq__() below as various types of IJobResult are constructed and
# compared with default entries that should not compare differently.
self._data = {
key: value for key, value in data.items()
if value is not None and value != []}
def __eq__(self, other):
if not isinstance(other, _JobResultBase):
return NotImplemented
return self._data == other._data
def __str__(self):
return str(self.outcome)
def __repr__(self):
return "<{}>".format(
' '.join([self.__class__.__name__] + [
"{}:{!r}".format(key, self._data[key])
for key in sorted(self._data.keys())]))
@Signal.define
def on_outcome_changed(self, old, new):
"""
Signal sent when ``outcome`` property value is changed
"""
@property
def outcome(self):
"""
outcome of running this job.
The outcome ultimately classifies jobs (tests) as failures or
successes. There are several other types of outcome that all basically
mean that the job did not run for some particular reason.
"""
return self._data.get('outcome', self.OUTCOME_NONE)
@outcome.setter
def outcome(self, new):
old = self.outcome
if old != new:
self._data['outcome'] = new
self.on_outcome_changed(old, new)
@property
def execution_duration(self):
"""
The amount of time in seconds it took to run this
jobs command.
"""
return self._data.get('execution_duration')
@property
def comments(self):
"""
comments of the test operator
"""
return self._data.get('comments')
@comments.setter
def comments(self, new):
old = self.comments
if old != new:
self._data['comments'] = new
self.on_comments_changed(old, new)
@Signal.define
def on_comments_changed(self, old, new):
"""
Signal sent when ``comments`` property value is changed
"""
@property
def return_code(self):
"""
return code of the command associated with the job, if any
"""
return self._data.get('return_code')
@property
def io_log(self):
return tuple(self.get_io_log())
class MemoryJobResult(_JobResultBase):
"""
A :class:`IJobResult` that keeps IO logs in memory.
This type of JobResult is indented for writing unit tests where the hassle
of going through the filesystem would make them needlessly complicated.
"""
def get_io_log(self):
io_log_data = self._data.get('io_log', ())
for entry in io_log_data:
if isinstance(entry, IOLogRecord):
yield entry
elif isinstance(entry, tuple):
yield IOLogRecord(*entry)
else:
raise TypeError(
"each item in io_log must be either a tuple"
" or special the IOLogRecord tuple")
class GzipFile(gzip.GzipFile):
"""
Subclass of GzipFile that works around missing read1() on python3.2
See: http://bugs.python.org/issue10791
"""
def read1(self, n):
return self.read(n)
class DiskJobResult(_JobResultBase):
"""
A :class:`IJobResult` that keeps IO logs on disk.
This type of JobResult is intended for working with most results. It does
not store IO logs in memory so it is scalable to arbitrary IO log sizes.
Each instance just knows where the log file is located (using the
'io_log_filename' attribute for that) and offers streaming API for
accessing particular parts of the log.
"""
@property
def io_log_filename(self):
"""
pathname of the file containing serialized IO log records
"""
return self._data.get("io_log_filename")
def get_io_log(self):
record_path = self.io_log_filename
if record_path:
with GzipFile(record_path, mode='rb') as gzip_stream, \
io.TextIOWrapper(gzip_stream, encoding='UTF-8') as stream:
for record in IOLogRecordReader(stream):
yield record
@property
def io_log(self):
caller_frame, filename, lineno = inspect.stack(0)[1][:3]
logger.warning(
# TRANSLATORS: please keep DiskJobResult.io_log untranslated
_("Expensive DiskJobResult.io_log property access from %s:%d"),
filename, lineno)
return super(DiskJobResult, self).io_log
class IOLogRecordWriter:
"""
Class for writing :class:`IOLogRecord` instances to a text stream
"""
def __init__(self, stream):
self.stream = stream
def close(self):
self.stream.close()
def write_record(self, record):
"""
Write an :class:`IOLogRecord` to the stream.
"""
text = json.dumps([
record[0], record[1],
base64.standard_b64encode(record[2]).decode("ASCII")],
check_circular=False, ensure_ascii=True, indent=None,
separators=(',', ':'))
logger.debug(_("Encoded %r into string %r"), record, text)
assert "\n" not in text
self.stream.write(text)
self.stream.write('\n')
class IOLogRecordReader:
"""
Class for streaming :class`IOLogRecord` instances from a text stream
"""
def __init__(self, stream):
self.stream = stream
def close(self):
self.stream.close()
def read_record(self):
"""
Read the next record from the stream.
:returns: None if the stream is empty
:returns: next :class:`IOLogRecord` as found in the stream.
"""
text = self.stream.readline()
if len(text) == 0:
return
data = json.loads(text)
return IOLogRecord(
data[0], data[1],
base64.standard_b64decode(data[2].encode("ASCII")))
def __iter__(self):
"""
Iterate over the entire stream generating subsequent
:class:`IOLogRecord` entries.
"""
while True:
record = self.read_record()
if record is None:
break
yield record
|