/usr/lib/python2.7/dist-packages/testtools/content.py is in python-testtools 0.9.39-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 | # Copyright (c) 2009-2012 testtools developers. See LICENSE for details.
"""Content - a MIME-like Content object."""
__all__ = [
'attach_file',
'Content',
'content_from_file',
'content_from_stream',
'json_content',
'text_content',
'TracebackContent',
]
import codecs
import inspect
import json
import os
import sys
import traceback
from extras import try_import
from testtools.compat import (
_b,
_format_exception_only,
_format_stack_list,
_isbytes,
_TB_HEADER,
_u,
str_is_unicode,
)
from testtools.content_type import ContentType, JSON, UTF8_TEXT
functools = try_import('functools')
_join_b = _b("").join
DEFAULT_CHUNK_SIZE = 4096
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
"""Read 'stream' in chunks of 'chunk_size'.
:param stream: A file-like object to read from.
:param chunk_size: The size of each read from 'stream'.
:param seek_offset: If non-None, seek before iterating.
:param seek_whence: Pass through to the seek call, if seeking.
"""
if seek_offset is not None:
stream.seek(seek_offset, seek_whence)
chunk = stream.read(chunk_size)
while chunk:
yield chunk
chunk = stream.read(chunk_size)
class Content(object):
"""A MIME-like Content object.
Content objects can be serialised to bytes using the iter_bytes method.
If the Content-Type is recognised by other code, they are welcome to
look for richer contents that mere byte serialisation - for example in
memory object graphs etc. However, such code MUST be prepared to receive
a generic Content object that has been reconstructed from a byte stream.
:ivar content_type: The content type of this Content.
"""
def __init__(self, content_type, get_bytes):
"""Create a ContentType."""
if None in (content_type, get_bytes):
raise ValueError("None not permitted in %r, %r" % (
content_type, get_bytes))
self.content_type = content_type
self._get_bytes = get_bytes
def __eq__(self, other):
return (self.content_type == other.content_type and
_join_b(self.iter_bytes()) == _join_b(other.iter_bytes()))
def as_text(self):
"""Return all of the content as text.
This is only valid where ``iter_text`` is. It will load all of the
content into memory. Where this is a concern, use ``iter_text``
instead.
"""
return _u('').join(self.iter_text())
def iter_bytes(self):
"""Iterate over bytestrings of the serialised content."""
return self._get_bytes()
def iter_text(self):
"""Iterate over the text of the serialised content.
This is only valid for text MIME types, and will use ISO-8859-1 if
no charset parameter is present in the MIME type. (This is somewhat
arbitrary, but consistent with RFC2617 3.7.1).
:raises ValueError: If the content type is not text/\*.
"""
if self.content_type.type != "text":
raise ValueError("Not a text type %r" % self.content_type)
return self._iter_text()
def _iter_text(self):
"""Worker for iter_text - does the decoding."""
encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
decoder = codecs.getincrementaldecoder(encoding)()
for bytes in self.iter_bytes():
yield decoder.decode(bytes)
final = decoder.decode(_b(''), True)
if final:
yield final
def __repr__(self):
return "<Content type=%r, value=%r>" % (
self.content_type, _join_b(self.iter_bytes()))
class StackLinesContent(Content):
"""Content object for stack lines.
This adapts a list of "preprocessed" stack lines into a content object.
The stack lines are most likely produced from ``traceback.extract_stack``
or ``traceback.extract_tb``.
text/x-traceback;language=python is used for the mime type, in order to
provide room for other languages to format their tracebacks differently.
"""
# Whether or not to hide layers of the stack trace that are
# unittest/testtools internal code. Defaults to True since the
# system-under-test is rarely unittest or testtools.
HIDE_INTERNAL_STACK = True
def __init__(self, stack_lines, prefix_content="", postfix_content=""):
"""Create a StackLinesContent for ``stack_lines``.
:param stack_lines: A list of preprocessed stack lines, probably
obtained by calling ``traceback.extract_stack`` or
``traceback.extract_tb``.
:param prefix_content: If specified, a unicode string to prepend to the
text content.
:param postfix_content: If specified, a unicode string to append to the
text content.
"""
content_type = ContentType('text', 'x-traceback',
{"language": "python", "charset": "utf8"})
value = prefix_content + \
self._stack_lines_to_unicode(stack_lines) + \
postfix_content
super(StackLinesContent, self).__init__(
content_type, lambda: [value.encode("utf8")])
def _stack_lines_to_unicode(self, stack_lines):
"""Converts a list of pre-processed stack lines into a unicode string.
"""
# testtools customization. When str is unicode (e.g. IronPython,
# Python 3), traceback.format_exception returns unicode. For Python 2,
# it returns bytes. We need to guarantee unicode.
if str_is_unicode:
format_stack_lines = traceback.format_list
else:
format_stack_lines = _format_stack_list
msg_lines = format_stack_lines(stack_lines)
return ''.join(msg_lines)
def TracebackContent(err, test):
"""Content object for tracebacks.
This adapts an exc_info tuple to the Content interface.
text/x-traceback;language=python is used for the mime type, in order to
provide room for other languages to format their tracebacks differently.
"""
if err is None:
raise ValueError("err may not be None")
exctype, value, tb = err
# Skip test runner traceback levels
if StackLinesContent.HIDE_INTERNAL_STACK:
while tb and '__unittest' in tb.tb_frame.f_globals:
tb = tb.tb_next
# testtools customization. When str is unicode (e.g. IronPython,
# Python 3), traceback.format_exception_only returns unicode. For Python 2,
# it returns bytes. We need to guarantee unicode.
if str_is_unicode:
format_exception_only = traceback.format_exception_only
else:
format_exception_only = _format_exception_only
limit = None
# Disabled due to https://bugs.launchpad.net/testtools/+bug/1188420
if (False
and StackLinesContent.HIDE_INTERNAL_STACK
and test.failureException
and isinstance(value, test.failureException)):
# Skip assert*() traceback levels
limit = 0
while tb and not self._is_relevant_tb_level(tb):
limit += 1
tb = tb.tb_next
prefix = _TB_HEADER
stack_lines = traceback.extract_tb(tb, limit)
postfix = ''.join(format_exception_only(exctype, value))
return StackLinesContent(stack_lines, prefix, postfix)
def StacktraceContent(prefix_content="", postfix_content=""):
"""Content object for stack traces.
This function will create and return a content object that contains a
stack trace.
The mime type is set to 'text/x-traceback;language=python', so other
languages can format their stack traces differently.
:param prefix_content: A unicode string to add before the stack lines.
:param postfix_content: A unicode string to add after the stack lines.
"""
stack = inspect.stack()[1:]
if StackLinesContent.HIDE_INTERNAL_STACK:
limit = 1
while limit < len(stack) and '__unittest' not in stack[limit][0].f_globals:
limit += 1
else:
limit = -1
frames_only = [line[0] for line in stack[:limit]]
processed_stack = [ ]
for frame in reversed(frames_only):
filename, line, function, context, _ = inspect.getframeinfo(frame)
context = ''.join(context)
processed_stack.append((filename, line, function, context))
return StackLinesContent(processed_stack, prefix_content, postfix_content)
def json_content(json_data):
"""Create a JSON `Content` object from JSON-encodeable data."""
data = json.dumps(json_data)
if str_is_unicode:
# The json module perversely returns native str not bytes
data = data.encode('utf8')
return Content(JSON, lambda: [data])
def text_content(text):
"""Create a `Content` object from some text.
This is useful for adding details which are short strings.
"""
if _isbytes(text):
raise TypeError('text_content must be given a string, not bytes.')
return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
def maybe_wrap(wrapper, func):
"""Merge metadata for func into wrapper if functools is present."""
if functools is not None:
wrapper = functools.update_wrapper(wrapper, func)
return wrapper
def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
buffer_now=False, seek_offset=None, seek_whence=0):
"""Create a `Content` object from a file on disk.
Note that unless 'read_now' is explicitly passed in as True, the file
will only be read from when ``iter_bytes`` is called.
:param path: The path to the file to be used as content.
:param content_type: The type of content. If not specified, defaults
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to ``DEFAULT_CHUNK_SIZE``.
:param buffer_now: If True, read the file from disk now and keep it in
memory. Otherwise, only read when the content is serialized.
:param seek_offset: If non-None, seek within the stream before reading it.
:param seek_whence: If supplied, pass to stream.seek() when seeking.
"""
if content_type is None:
content_type = UTF8_TEXT
def reader():
with open(path, 'rb') as stream:
for chunk in _iter_chunks(stream,
chunk_size,
seek_offset,
seek_whence):
yield chunk
return content_from_reader(reader, content_type, buffer_now)
def content_from_stream(stream, content_type=None,
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False,
seek_offset=None, seek_whence=0):
"""Create a `Content` object from a file-like stream.
Note that the stream will only be read from when ``iter_bytes`` is
called.
:param stream: A file-like object to read the content from. The stream
is not closed by this function or the content object it returns.
:param content_type: The type of content. If not specified, defaults
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to ``DEFAULT_CHUNK_SIZE``.
:param buffer_now: If True, reads from the stream right now. Otherwise,
only reads when the content is serialized. Defaults to False.
:param seek_offset: If non-None, seek within the stream before reading it.
:param seek_whence: If supplied, pass to stream.seek() when seeking.
"""
if content_type is None:
content_type = UTF8_TEXT
reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence)
return content_from_reader(reader, content_type, buffer_now)
def content_from_reader(reader, content_type, buffer_now):
"""Create a Content object that will obtain the content from reader.
:param reader: A callback to read the content. Should return an iterable of
bytestrings.
:param content_type: The content type to create.
:param buffer_now: If True the reader is evaluated immediately and
buffered.
"""
if content_type is None:
content_type = UTF8_TEXT
if buffer_now:
contents = list(reader())
reader = lambda: contents
return Content(content_type, reader)
def attach_file(detailed, path, name=None, content_type=None,
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True):
"""Attach a file to this test as a detail.
This is a convenience method wrapping around ``addDetail``.
Note that unless 'read_now' is explicitly passed in as True, the file
*must* exist when the test result is called with the results of this
test, after the test has been torn down.
:param detailed: An object with details
:param path: The path to the file to attach.
:param name: The name to give to the detail for the attached file.
:param content_type: The content type of the file. If not provided,
defaults to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file. Defaults
to something sensible.
:param buffer_now: If False the file content is read when the content
object is evaluated rather than when attach_file is called.
Note that this may be after any cleanups that obj_with_details has, so
if the file is a temporary file disabling buffer_now may cause the file
to be read after it is deleted. To handle those cases, using
attach_file as a cleanup is recommended because it guarantees a
sequence for when the attach_file call is made::
detailed.addCleanup(attach_file, 'foo.txt', detailed)
"""
if name is None:
name = os.path.basename(path)
content_object = content_from_file(
path, content_type, chunk_size, buffer_now)
detailed.addDetail(name, content_object)
|