/usr/lib/python3/dist-packages/defusedxml/xmlrpc.py is in python3-defusedxml 0.4.1-2build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | # defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See http://www.python.org/psf/license for licensing details.
"""Defused xmlrpclib
Also defuses gzip bomb
"""
from __future__ import print_function, absolute_import
import io
from .common import (DTDForbidden, EntitiesForbidden,
ExternalReferenceForbidden, PY3, PY31, PY26)
if PY3:
__origin__ = "xmlrpc.client"
from xmlrpc.client import ExpatParser
from xmlrpc import client as xmlrpc_client
from xmlrpc import server as xmlrpc_server
if not PY31:
from xmlrpc.client import gzip_decode as _orig_gzip_decode
from xmlrpc.client import GzipDecodedResponse as _OrigGzipDecodedResponse
else:
__origin__ = "xmlrpclib"
from xmlrpclib import ExpatParser
import xmlrpclib as xmlrpc_client
xmlrpc_server = None
if not PY26:
from xmlrpclib import gzip_decode as _orig_gzip_decode
from xmlrpclib import GzipDecodedResponse as _OrigGzipDecodedResponse
try:
import gzip
except ImportError:
gzip = None
# Limit maximum request size to prevent resource exhaustion DoS
# Also used to limit maximum amount of gzip decoded data in order to prevent
# decompression bombs
# A value of -1 or smaller disables the limit
MAX_DATA = 30 * 1024 * 1024 # 30 MB
def defused_gzip_decode(data, limit=None):
"""gzip encoded data -> unencoded data
Decode data using the gzip content encoding as described in RFC 1952
"""
if not gzip:
raise NotImplementedError
if limit is None:
limit = MAX_DATA
f = io.BytesIO(data)
gzf = gzip.GzipFile(mode="rb", fileobj=f)
try:
if limit < 0: # no limit
decoded = gzf.read()
else:
decoded = gzf.read(limit + 1)
except IOError:
raise ValueError("invalid data")
f.close()
gzf.close()
if limit >= 0 and len(decoded) > limit:
raise ValueError("max gzipped payload length exceeded")
return decoded
class DefusedGzipDecodedResponse(gzip.GzipFile if gzip else object):
"""a file-like object to decode a response encoded with the gzip
method, as described in RFC 1952.
"""
def __init__(self, response, limit=None):
#response doesn't support tell() and read(), required by
#GzipFile
if not gzip:
raise NotImplementedError
self.limit = limit = limit if limit is not None else MAX_DATA
if limit < 0: # no limit
data = response.read()
self.readlength = None
else:
data = response.read(limit + 1)
self.readlength = 0
if limit >= 0 and len(data) > limit:
raise ValueError("max payload length exceeded")
self.stringio = io.BytesIO(data)
gzip.GzipFile.__init__(self, mode="rb", fileobj=self.stringio)
def read(self, n):
if self.limit >= 0:
left = self.limit - self.readlength
n = min(n, left + 1)
data = gzip.GzipFile.read(self, n)
self.readlength += len(data)
if self.readlength > self.limit:
raise ValueError("max payload length exceeded")
return data
else:
return gzip.GzipFile.read(self, n)
def close(self):
gzip.GzipFile.close(self)
self.stringio.close()
class DefusedExpatParser(ExpatParser):
def __init__(self, target, forbid_dtd=False, forbid_entities=True,
forbid_external=True):
ExpatParser.__init__(self, target)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
parser = self._parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def defused_start_doctype_decl(self, name, sysid, pubid,
has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(self, name, is_parameter_entity, value, base,
sysid, pubid, notation_name):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid,
notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name)
def defused_external_entity_ref_handler(self, context, base, sysid,
pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def monkey_patch():
xmlrpc_client.FastParser = DefusedExpatParser
if PY26 or PY31:
# Python 2.6 and 3.1 have no gzip support in xmlrpc
return
xmlrpc_client.GzipDecodedResponse = DefusedGzipDecodedResponse
xmlrpc_client.gzip_decode = defused_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = defused_gzip_decode
def unmonkey_patch():
xmlrpc_client.FastParser = None
if PY26 or PY31:
return
xmlrpc_client.GzipDecodedResponse = _OrigGzipDecodedResponse
xmlrpc_client.gzip_decode = _orig_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = _orig_gzip_decode
|