/usr/share/pyshared/circuits/web/utils.py is in python-circuits 2.1.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | # Module: utils
# Date: 13th September 2007
# Author: James Mills, prologic at shortcircuit dot net dot au
"""Utilities
This module implements utility functions.
"""
import re
import zlib
import time
import struct
from io import TextIOWrapper
from cgi import FieldStorage
try:
from urllib.parse import urljoin as _urljoin
except ImportError:
from urlparse import urljoin as _urljoin # NOQA
try:
from urllib.parse import parse_qs as _parse_qs
except ImportError:
from cgi import parse_qs as _parse_qs # NOQA
from .exceptions import RequestEntityTooLarge
quoted_slash = re.compile("(?i)%2F")
image_map_pattern = re.compile("[0-9]+,[0-9]+")
def parse_body(request, response, params):
if "Content-Type" not in request.headers:
request.headers["Content-Type"] = ""
try:
form = FieldStorage(
environ={"REQUEST_METHOD": "POST"},
fp=request.body,
headers=request.headers,
keep_blank_values=True
)
except Exception as e:
if e.__class__.__name__ == 'MaxSizeExceeded':
# Post data is too big
raise RequestEntityTooLarge()
raise
if form.file:
request.body = form.file
else:
params.update(dictform(form))
def parse_qs(query_string, keep_blank_values=True):
"""parse_qs(query_string) -> dict
Build a params dictionary from a query_string.
If keep_blank_values is True (the default), keep
values that are blank.
"""
if image_map_pattern.match(query_string):
# Server-side image map. Map the coords to "x" and "y"
# (like CGI::Request does).
pm = query_string.split(",")
return {"x": int(pm[0]), "y": int(pm[1])}
else:
pm = _parse_qs(query_string, keep_blank_values)
return dict((k, v[0]) for k, v in pm.items() if v)
def dictform(form):
d = {}
for key in list(form.keys()):
values = form[key]
if isinstance(values, list):
d[key] = []
for item in values:
if item.filename is not None:
value = item # It's a file upload
else:
value = item.value # It's a regular field
d[key].append(value)
else:
if values.filename is not None:
value = values # It's a file upload
else:
value = values.value # It's a regular field
d[key] = value
return d
def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
# Header
yield b"\037\213\010\0" \
+ struct.pack("<L", int(time.time())) \
+ b"\002\377"
size = 0
crc = zlib.crc32(b"")
zobj = zlib.compressobj(
compress_level,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0,
)
for chunk in body:
if not isinstance(chunk, bytes):
chunk = chunk.encode("utf-8")
size += len(chunk)
crc = zlib.crc32(chunk, crc)
yield zobj.compress(chunk)
yield zobj.flush() \
+ struct.pack("<l", crc) \
+ struct.pack("<L", size & 0xFFFFFFFF)
def url(request, path="", qs="", script_name=None, base=None, relative=None):
"""Create an absolute URL for the given path.
If 'path' starts with a slash ('/'), this will return
- (base + script_name + path + qs).
If it does not start with a slash, this returns
- (base + script_name [+ request.path] + path + qs).
If script_name is None, request will be used
to find a script_name, if available.
If base is None, request.base will be used (if available).
Finally, note that this function can be used to obtain an absolute URL
for the current request path (minus the querystring) by passing no args.
If you call url(qs=request.qs), you should get the
original browser URL (assuming no internal redirections).
If relative is False the output will be an absolute URL
(including the scheme, host, vhost, and script_name).
If True, the output will instead be a URL that is relative to the
current request path, perhaps including '..' atoms. If relative is
the string 'server', the output will instead be a URL that is
relative to the server root; i.e., it will start with a slash.
"""
if qs:
qs = '?' + qs
if not path.startswith("/"):
# Append/remove trailing slash from request.path as needed
# (this is to support mistyped URL's without redirecting;
# if you want to redirect, use tools.trailing_slash).
pi = request.path
if request.index is True:
if not pi.endswith('/'):
pi = pi + '/'
elif request.index is False:
if pi.endswith('/') and pi != '/':
pi = pi[:-1]
if path == "":
path = pi
else:
path = _urljoin(pi, path)
if script_name is None:
script_name = request.script_name
if base is None:
base = request.base
newurl = base + script_name + path + qs
if './' in newurl:
# Normalize the URL by removing ./ and ../
atoms = []
for atom in newurl.split('/'):
if atom == '.':
pass
elif atom == '..':
atoms.pop()
else:
atoms.append(atom)
newurl = '/'.join(atoms)
# At this point, we should have a fully-qualified absolute URL.
# See http://www.ietf.org/rfc/rfc2396.txt
if relative == 'server':
# "A relative reference beginning with a single slash character is
# termed an absolute-path reference, as defined by <abs_path>..."
# This is also sometimes called "server-relative".
newurl = '/' + '/'.join(newurl.split('/', 3)[3:])
elif relative:
# "A relative reference that does not begin with a scheme name
# or a slash character is termed a relative-path reference."
old = url().split('/')[:-1]
new = newurl.split('/')
while old and new:
a, b = old[0], new[0]
if a != b:
break
old.pop(0)
new.pop(0)
new = (['..'] * len(old)) + new
newurl = '/'.join(new)
return newurl
def get_ranges(headervalue, content_length):
"""Return a list of (start, stop) indices from a Range header, or None.
Each (start, stop) tuple will be composed of two ints, which are suitable
for use in a slicing operation. That is, the header "Range: bytes=3-6",
if applied against a Python string, is requesting resource[3:7]. This
function will return the list [(3, 7)].
If this function returns an empty list, you should return HTTP 416.
"""
if not headervalue:
return None
result = []
bytesunit, byteranges = headervalue.split("=", 1)
for brange in byteranges.split(","):
start, stop = [x.strip() for x in brange.split("-", 1)]
if start:
if not stop:
stop = content_length - 1
start, stop = list(map(int, (start, stop)))
if start >= content_length:
# From rfc 2616 sec 14.16:
# "If the server receives a request (other than one
# including an If-Range request-header field) with an
# unsatisfiable Range request-header field (that is,
# all of whose byte-range-spec values have a first-byte-pos
# value greater than the current length of the selected
# resource), it SHOULD return a response code of 416
# (Requested range not satisfiable)."
continue
if stop < start:
# From rfc 2616 sec 14.16:
# "If the server ignores a byte-range-spec because it
# is syntactically invalid, the server SHOULD treat
# the request as if the invalid Range header field
# did not exist. (Normally, this means return a 200
# response containing the full entity)."
return None
result.append((start, stop + 1))
else:
if not stop:
# See rfc quote above.
return None
# Negative subscript (last N bytes)
result.append((content_length - int(stop), content_length))
return result
|